Class TP

All Implemented Interfaces:
Lifecycle, DiagnosticsHandler.ProbeHandler
Direct Known Subclasses:
BasicTCP, SHARED_LOOPBACK, SimpleTCP, TUNNEL, UDP

public abstract class TP extends Protocol implements DiagnosticsHandler.ProbeHandler
Generic transport - specific implementations should extend this abstract class. Features which are provided to the subclasses include
  • version checking
  • marshalling and unmarshalling
  • message bundling (handling single messages, and message lists)
  • incoming packet handler
A subclass has to override The create() or start() method has to create a local address.
The receive(Address, byte[], int, int) method must be called by subclasses when a unicast or multicast message has been received.
  • Field Details

    • LIST

      public static final byte LIST
      See Also:
    • MULTICAST

      public static final byte MULTICAST
      See Also:
    • MSG_OVERHEAD

      public static final int MSG_OVERHEAD
      See Also:
    • MIN_WAIT_BETWEEN_DISCOVERIES

      protected static final long MIN_WAIT_BETWEEN_DISCOVERIES
    • bind_addr

      protected InetAddress bind_addr
    • external_addr

      protected InetAddress external_addr
    • external_port

      protected int external_port
    • is_trace

      protected boolean is_trace
    • receive_on_all_interfaces

      protected boolean receive_on_all_interfaces
    • receive_interfaces

      protected List<NetworkInterface> receive_interfaces
      List of interfaces to receive multicasts on. The multicast receive socket will listen on all of these interfaces. This is a comma-separated list of IP addresses or interface names. E.g. "192.168.5.1,eth1,127.0.0.1". Duplicates are discarded; we only bind to an interface once. If this property is set, it overrides receive_on_all_interfaces.
    • logical_addr_cache_max_size

      protected int logical_addr_cache_max_size
    • logical_addr_cache_expiration

      protected long logical_addr_cache_expiration
    • logical_addr_cache_reaper_interval

      protected long logical_addr_cache_reaper_interval
    • bind_port

      protected int bind_port
      The port to which the transport binds. 0 means to bind to any (ephemeral) port. See also port_range
    • port_range

      protected int port_range
    • loopback_copy

      protected boolean loopback_copy
    • loopback_separate_thread

      protected boolean loopback_separate_thread
    • message_processing_policy

      protected String message_processing_policy
    • local_transport_class

      protected String local_transport_class
    • use_virtual_threads

      protected boolean use_virtual_threads
    • thread_naming_pattern

      protected String thread_naming_pattern
    • time_service_interval

      protected long time_service_interval
    • log_discard_msgs

      protected boolean log_discard_msgs
      Whether warnings about messages from different groups are logged - private flag, not for common use
    • log_discard_msgs_version

      protected boolean log_discard_msgs_version
    • who_has_cache_timeout

      protected long who_has_cache_timeout
    • suppress_time_different_version_warnings

      protected long suppress_time_different_version_warnings
    • suppress_time_different_cluster_warnings

      protected long suppress_time_different_cluster_warnings
    • msg_factory_class

      protected String msg_factory_class
    • msg_factory

      protected MessageFactory msg_factory
    • bundler_type

      protected String bundler_type
    • msg_stats

      protected final MsgStats msg_stats
    • cluster_name

      protected AsciiString cluster_name
      The name of the group to which this member is connected. With a shared transport, the channel name is in TP.ProtocolAdapter (cluster_name), and this field is not used
    • timer_handle_non_blocking_tasks

      protected boolean timer_handle_non_blocking_tasks
    • local_physical_addr

      protected PhysicalAddress local_physical_addr
      The address (host and port) of this member
    • view

      protected volatile View view
    • members

      protected final Set<Address> members
      The members of this group (updated when a member joins or leaves). With a shared transport, members contains _all_ members from all channels sitting on the shared transport
    • connectLock

      protected final ReentrantLock connectLock
    • thread_pool

      protected ThreadPool thread_pool
      The thread pool which handles unmarshalling, version checks and dispatching of messages
    • async_executor

      protected AsyncExecutor<Object> async_executor
    • thread_factory

      protected ThreadFactory thread_factory
      Factory which is used by the thread pool
    • timer

      protected TimeScheduler timer
    • time_service

      protected TimeService time_service
    • socket_factory

      protected SocketFactory socket_factory
    • bundler

      protected Bundler bundler
    • msg_processing_policy

      protected MessageProcessingPolicy msg_processing_policy
    • local_transport

      protected LocalTransport local_transport
    • diag_handler

      protected DiagnosticsHandler diag_handler
    • rtt

      protected RTT rtt
    • logical_addr_cache

      protected LazyRemovalCache<Address,PhysicalAddress> logical_addr_cache
      Cache which maintains mappings between logical and physical addresses. When sending a message to a logical address, we look up the physical address from logical_addr_cache and send the message to the physical address

      The keys are logical addresses, the values physical addresses

    • last_discovery_request

      protected long last_discovery_request
    • logical_addr_cache_reaper

      protected Future<?> logical_addr_cache_reaper
    • who_has_cache

      protected ExpiryCache<Address> who_has_cache
      Cache keeping track of WHO_HAS requests for physical addresses (given a logical address) and expiring them after who_has_cache_timeout ms
    • suppress_log_different_version

      protected SuppressLog<Address> suppress_log_different_version
      Log to suppress identical warnings for messages from members with different (incompatible) versions
    • suppress_log_different_cluster

      protected SuppressLog<Address> suppress_log_different_cluster
      Log to suppress identical warnings for messages from members in different clusters
  • Constructor Details

    • TP

      protected TP()
  • Method Details

    • getBundlerClass

      public String getBundlerClass()
    • getMessageFactory

      public MessageFactory getMessageFactory()
    • setMessageFactory

      public <T extends TP> T setMessageFactory(MessageFactory m)
    • getBindAddr

      public InetAddress getBindAddr()
    • setBindAddr

      public <T extends TP> T setBindAddr(InetAddress b)
    • getExternalAddr

      public InetAddress getExternalAddr()
    • setExternalAddr

      public <T extends TP> T setExternalAddr(InetAddress e)
    • getExternalPort

      public int getExternalPort()
    • setExternalPort

      public <T extends TP> T setExternalPort(int e)
    • isTrace

      public boolean isTrace()
    • isTrace

      public <T extends TP> T isTrace(boolean i)
    • receiveOnAllInterfaces

      public boolean receiveOnAllInterfaces()
    • receiveOnAllInterfaces

      public <T extends TP> T receiveOnAllInterfaces(boolean r)
    • getLogicalAddrCacheMaxSize

      public int getLogicalAddrCacheMaxSize()
    • setLogicalAddrCacheMaxSize

      public <T extends TP> T setLogicalAddrCacheMaxSize(int l)
    • getLogicalAddrCacheExpiration

      public long getLogicalAddrCacheExpiration()
    • setLogicalAddrCacheExpiration

      public <T extends TP> T setLogicalAddrCacheExpiration(long l)
    • getLogicalAddrCacheReaperInterval

      public long getLogicalAddrCacheReaperInterval()
    • setLogicalAddrCacheReaperInterval

      public <T extends TP> T setLogicalAddrCacheReaperInterval(long l)
    • loopbackCopy

      public boolean loopbackCopy()
    • loopbackCopy

      public <T extends TP> T loopbackCopy(boolean l)
    • loopbackSeparateThread

      public boolean loopbackSeparateThread()
    • loopbackSeparateThread

      public <T extends TP> T loopbackSeparateThread(boolean l)
    • useVirtualThreads

      public boolean useVirtualThreads()
    • useVirtualThreads

      public <T extends TP> T useVirtualThreads(boolean b)
    • getTimeServiceInterval

      public long getTimeServiceInterval()
    • setTimeServiceInterval

      public <T extends TP> T setTimeServiceInterval(long t)
    • logDiscardMsgs

      public boolean logDiscardMsgs()
    • logDiscardMsgs

      public <T extends TP> T logDiscardMsgs(boolean l)
    • logDiscardMsgsVersion

      public boolean logDiscardMsgsVersion()
    • logDiscardMsgsVersion

      public <T extends TP> T logDiscardMsgsVersion(boolean l)
    • getWhoHasCacheTimeout

      public long getWhoHasCacheTimeout()
    • setWhoHasCacheTimeout

      public <T extends TP> T setWhoHasCacheTimeout(long w)
    • getSuppressTimeDifferentVersionWarnings

      public long getSuppressTimeDifferentVersionWarnings()
    • setSuppressTimeDifferentVersionWarnings

      public <T extends TP> T setSuppressTimeDifferentVersionWarnings(long s)
    • getSuppressTimeDifferentClusterWarnings

      public long getSuppressTimeDifferentClusterWarnings()
    • setSuppressTimeDifferentClusterWarnings

      public <T extends TP> T setSuppressTimeDifferentClusterWarnings(long s)
    • getMsgFactoryClass

      public String getMsgFactoryClass()
    • setMsgFactoryClass

      public <T extends TP> T setMsgFactoryClass(String m)
    • getBundlerType

      public String getBundlerType()
    • setBundlerType

      public <T extends TP> T setBundlerType(String b)
    • getMessageFactoryClass

      public String getMessageFactoryClass()
    • isLogicalAddressCacheReaperRunning

      public boolean isLogicalAddressCacheReaperRunning()
    • setLevel

      public <T extends Protocol> T setLevel(String level)
      Description copied from class: Protocol
      Sets the level of a logger. This method is used to dynamically change the logging level of a running system, e.g. via JMX. The appender of a level needs to exist.
      Overrides:
      setLevel in class Protocol
      Parameters:
      level - The new level. Valid values are "fatal", "error", "warn", "info", "debug", "trace" (capitalization not relevant)
    • setMessageProcessingPolicy

      public void setMessageProcessingPolicy(String policy)
    • getMessageProcessingPolicy

      public MessageProcessingPolicy getMessageProcessingPolicy()
    • getTimerClass

      public String getTimerClass()
    • getClusterName

      public String getClusterName()
    • getClusterNameAscii

      public AsciiString getClusterNameAscii()
    • getDifferentClusterMessages

      public int getDifferentClusterMessages()
    • getDifferentVersionMessages

      public int getDifferentVersionMessages()
    • clearDifferentClusterCache

      public <T extends TP> T clearDifferentClusterCache()
    • clearDifferentVersionCache

      public <T extends TP> T clearDifferentVersionCache()
    • loggerType

      public static String loggerType()
    • enableBlockingTimerTasks

      public <T extends TP> T enableBlockingTimerTasks(boolean flag)
    • getMessageStats

      public MsgStats getMessageStats()
    • getRTT

      public RTT getRTT()
    • enableStats

      public void enableStats(boolean flag)
      Overrides:
      enableStats in class Protocol
    • supportsMulticasting

      public abstract boolean supportsMulticasting()
      Whether hardware multicasting is supported
    • isMulticastCapable

      public boolean isMulticastCapable()
    • getLogicalAddressCache

      public LazyRemovalCache<Address,PhysicalAddress> getLogicalAddressCache()
    • toString

      public String toString()
      Overrides:
      toString in class Protocol
    • setAddress

      public <T extends Protocol> T setAddress(Address addr)
      Overrides:
      setAddress in class Protocol
    • localPhysicalAddress

      public PhysicalAddress localPhysicalAddress()
    • view

      public View view()
    • getLocalPhysicalAddress

      public String getLocalPhysicalAddress()
    • resetStats

      public void resetStats()
      Overrides:
      resetStats in class Protocol
    • registerProbeHandler

      public <T extends TP> T registerProbeHandler(DiagnosticsHandler.ProbeHandler handler)
    • unregisterProbeHandler

      public <T extends TP> T unregisterProbeHandler(DiagnosticsHandler.ProbeHandler handler)
    • getDiagnosticsHandler

      public DiagnosticsHandler getDiagnosticsHandler()
    • setDiagnosticsHandler

      public <T extends TP> T setDiagnosticsHandler(DiagnosticsHandler handler) throws Exception
      Sets a DiagnosticsHandler. Should be set before the stack is started
      Throws:
      Exception
    • getLocalTransport

      public LocalTransport getLocalTransport()
    • setLocalTransport

      public <T extends TP> T setLocalTransport(LocalTransport l)
    • setLocalTransport

      public <T extends TP> T setLocalTransport(String tp_class) throws Exception
      Throws:
      Exception
    • getBundler

      public Bundler getBundler()
    • setBundler

      public <T extends TP> T setBundler(Bundler bundler)
      Installs a bundler
    • getThreadPool

      public ThreadPool getThreadPool()
    • setThreadPool

      public <T extends TP> T setThreadPool(Executor thread_pool)
    • getNumRejectedMsgs

      @Deprecated public long getNumRejectedMsgs()
      Deprecated.
      Don't remove! https://issues.redhat.com/browse/JGRP-2814
    • getNumberOfThreadDumps

      @Deprecated public long getNumberOfThreadDumps()
      Deprecated.
      Don't remove! https://issues.redhat.com/browse/JGRP-2814
    • getNumUcastMsgsSent

      @Deprecated public long getNumUcastMsgsSent()
      Deprecated.
      Don't remove! https://issues.redhat.com/browse/JGRP-2814
    • getNumMcastMsgsSent

      @Deprecated public long getNumMcastMsgsSent()
      Deprecated.
      Don't remove! https://issues.redhat.com/browse/JGRP-2814
    • getNumUcastMsgsReceived

      @Deprecated public long getNumUcastMsgsReceived()
      Deprecated.
      Don't remove! https://issues.redhat.com/browse/JGRP-2814
    • getNumMcastMsgsReceived

      @Deprecated public long getNumMcastMsgsReceived()
      Deprecated.
      Don't remove! https://issues.redhat.com/browse/JGRP-2814
    • getThreadFactory

      public ThreadFactory getThreadFactory()
      Description copied from class: Protocol
      Supposed to be overwritten by subclasses. Usually the transport returns a valid non-null thread factory, but thread factories can also be created by individual protocols
      Overrides:
      getThreadFactory in class Protocol
      Returns:
    • setThreadFactory

      public <T extends TP> T setThreadFactory(ThreadFactory factory)
    • getAsyncExecutor

      public AsyncExecutor<Object> getAsyncExecutor()
    • setAsyncExecutor

      public <T extends TP> T setAsyncExecutor(AsyncExecutor<Object> e)
    • getTimer

      public TimeScheduler getTimer()
    • setTimer

      public <T extends TP> T setTimer(TimeScheduler timer)
      Sets a new timer. This should be done before the transport is initialized; be very careful, as replacing a running timer with tasks in it can wreak havoc !
      Parameters:
      timer -
    • getTimeService

      public TimeService getTimeService()
    • setTimeService

      public <T extends TP> T setTimeService(TimeService ts)
    • getSocketFactory

      public SocketFactory getSocketFactory()
      Description copied from class: Protocol
      Returns the SocketFactory associated with this protocol, if overridden in a subclass, or passes the call down
      Overrides:
      getSocketFactory in class Protocol
      Returns:
      SocketFactory
    • setSocketFactory

      public void setSocketFactory(SocketFactory factory)
      Description copied from class: Protocol
      Sets a SocketFactory. Socket factories are typically provided by the transport (TP)
      Overrides:
      setSocketFactory in class Protocol
      Parameters:
      factory -
    • getThreadNamingPattern

      public String getThreadNamingPattern()
      Names the current thread. Valid values are "pcl": p: include the previous (original) name, e.g. "Incoming thread-1", "UDP ucast receiver" c: include the cluster name, e.g. "MyCluster" l: include the local address of the current member, e.g. "192.168.5.1:5678"
    • getBindAddress

      public InetAddress getBindAddress()
    • setBindAddress

      public <T extends TP> T setBindAddress(InetAddress a)
    • getBindPort

      public int getBindPort()
    • setBindPort

      public <T extends TP> T setBindPort(int port)
    • setBindToAllInterfaces

      public <T extends TP> T setBindToAllInterfaces(boolean f)
    • isReceiveOnAllInterfaces

      public boolean isReceiveOnAllInterfaces()
    • getReceiveInterfaces

      public List<NetworkInterface> getReceiveInterfaces()
    • setPortRange

      public <T extends TP> T setPortRange(int range)
    • getPortRange

      public int getPortRange()
    • getNumTimerTasks

      public int getNumTimerTasks()
    • dumpTimerTasks

      public String dumpTimerTasks()
    • removeCancelledTimerTasks

      public void removeCancelledTimerTasks()
    • getTimerThreads

      public int getTimerThreads()
    • getNumThreads

      public static int getNumThreads()
    • setLogDiscardMessages

      public <T extends TP> T setLogDiscardMessages(boolean flag)
    • getLogDiscardMessages

      public boolean getLogDiscardMessages()
    • setLogDiscardMessagesVersion

      public <T extends TP> T setLogDiscardMessagesVersion(boolean f)
    • getLogDiscardMessagesVersion

      public boolean getLogDiscardMessagesVersion()
    • printLogicalAddressCache

      public String printLogicalAddressCache()
    • printWhoHasCache

      public String printWhoHasCache()
    • evictLogicalAddressCache

      public void evictLogicalAddressCache()
    • evictLogicalAddressCache

      public void evictLogicalAddressCache(boolean force)
    • defaultHeaders

      public String defaultHeaders(boolean detailed)
    • sendUnicast

      public abstract void sendUnicast(PhysicalAddress dest, byte[] data, int offset, int length) throws Exception
      Send a unicast to a member. Note that the destination address is a *physical*, not a logical address
      Parameters:
      dest - Must be a non-null unicast address
      data - The data to be sent. This is not a copy, so don't modify it
      Throws:
      Exception
    • getInfo

      public abstract String getInfo()
    • init

      public void init() throws Exception
      Description copied from class: Protocol
      Called after a protocol has been created and before the protocol is started. Attributes are already set. Other protocols are not yet connected and events cannot yet be sent.
      Specified by:
      init in interface Lifecycle
      Overrides:
      init in class Protocol
      Throws:
      Exception - Thrown if protocol cannot be initialized successfully. This will cause the ProtocolStack to fail, so the the channel constructor will throw an exception
    • start

      public void start() throws Exception
      Creates the unicast and multicast sockets and starts the unicast and multicast receiver threads
      Specified by:
      start in interface Lifecycle
      Overrides:
      start in class Protocol
      Throws:
      Exception - Thrown if protocol cannot be started successfully. This will cause the ProtocolStack to fail, so JChannel.connect(String) will throw an exception
    • stop

      public void stop()
      Description copied from class: Protocol
      Called on a JChannel.disconnect(); stops work (e.g. by closing multicast socket). Will be called from top to bottom.
      Specified by:
      stop in interface Lifecycle
      Overrides:
      stop in class Protocol
    • destroy

      public void destroy()
      Description copied from class: Protocol
      This method is called on a JChannel.close(). Does some cleanup; after the call, the VM will terminate
      Specified by:
      destroy in interface Lifecycle
      Overrides:
      destroy in class Protocol
    • bundler

      public <T extends TP> T bundler(String type) throws Exception
      Throws:
      Exception
    • enableDiagnostics

      public <T extends TP> T enableDiagnostics()
    • disableDiagnostics

      public void disableDiagnostics()
    • startDiagnostics

      protected void startDiagnostics() throws Exception
      Throws:
      Exception
    • stopDiagnostics

      protected void stopDiagnostics()
    • handleProbe

      public Map<String,String> handleProbe(String... keys)
      Description copied from interface: DiagnosticsHandler.ProbeHandler
      Handles a probe. For each key that is handled, the key and its result should be in the returned map.
      Specified by:
      handleProbe in interface DiagnosticsHandler.ProbeHandler
      Returns:
      A map of keys and values. A null return value is permissible.
    • supportedKeys

      public String[] supportedKeys()
      Description copied from interface: DiagnosticsHandler.ProbeHandler
      Returns a list of supported keys
      Specified by:
      supportedKeys in interface DiagnosticsHandler.ProbeHandler
    • handleConnect

      protected void handleConnect() throws Exception
      Throws:
      Exception
    • handleDisconnect

      protected void handleDisconnect()
    • down

      public Object down(Event evt)
      Description copied from class: Protocol
      An event is to be sent down the stack. A protocol may want to examine its type and perform some action on it, depending on the event's type. If the event is a message MSG, then the protocol may need to add a header to it (or do nothing at all) before sending it down the stack using down_prot.down().
      Overrides:
      down in class Protocol
    • down

      public Object down(Message msg)
      A message needs to be sent to a single member or all members
      Overrides:
      down in class Protocol
    • createDiagnosticsHandler

      protected DiagnosticsHandler createDiagnosticsHandler()
    • createBundler

      public static Bundler createBundler(String type, Class<?> cl) throws Exception
      Throws:
      Exception
    • loopback

      protected void loopback(Message msg, boolean multicast)
    • _send

      protected void _send(Message msg, Address dest)
    • setSourceAddress

      protected void setSourceAddress(Message msg)
      If the sender is null, set our own address. We cannot just go ahead and set the address anyway, as we might be sending a message on behalf of someone else ! E.g. in case of retransmission, when the original sender has crashed, or in a FLUSH protocol when we have to return all unstable messages with the FLUSH_OK response.
    • passMessageUp

      public void passMessageUp(Message msg, byte[] cluster_name, boolean perform_cluster_name_matching, boolean multicast, boolean discard_own_mcast)
    • passBatchUp

      public void passBatchUp(MessageBatch batch, boolean perform_cluster_name_matching, boolean discard_own_mcast)
    • sameCluster

      protected boolean sameCluster(String req)
    • receive

      public void receive(Address sender, byte[] data, int offset, int length)
      Subclasses must call this method when a unicast or multicast message has been received.
    • receive

      public void receive(Address sender, DataInput in, int ignoredLength) throws Exception
      Throws:
      Exception
    • handleMessageBatch

      protected void handleMessageBatch(DataInput in, boolean multicast, MessageFactory factory)
    • handleSingleMessage

      protected void handleSingleMessage(DataInput in, boolean multicast)
    • processBatch

      protected void processBatch(MessageBatch batch, boolean oob)
    • unicastDestMismatch

      public boolean unicastDestMismatch(Address dest)
    • versionMatch

      protected boolean versionMatch(short version, Address sender)
    • doSend

      public void doSend(byte[] buf, int offset, int length, Address dest) throws Exception
      Throws:
      Exception
    • sendTo

      protected void sendTo(Address dest, byte[] buf, int offset, int length) throws Exception
      Throws:
      Exception
    • sendToAll

      protected void sendToAll(byte[] buf, int offset, int length) throws Exception
      Fetches the physical addrs for all mbrs and sends the msg to each physical address. Asks discovery for missing members' physical addresses if needed
      Throws:
      Exception
    • fetchPhysicalAddrs

      protected void fetchPhysicalAddrs(List<Address> missing)
    • fetchResponsesFromDiscoveryProtocol

      protected Responses fetchResponsesFromDiscoveryProtocol(List<Address> missing)
    • timestamp

      protected long timestamp()
    • registerLocalAddress

      protected void registerLocalAddress(Address addr)
      Associates the address with the physical address fetched from the cache
      Parameters:
      addr -
    • fetchLocalAddresses

      protected void fetchLocalAddresses()
      Grabs the local address (or addresses in the shared transport case) and registers them with the physical address in the transport's cache
    • setThreadNames

      protected void setThreadNames()
    • unsetThreadNames

      protected void unsetThreadNames()
    • setInAllThreadFactories

      protected void setInAllThreadFactories(String cluster_name, Address local_address, String pattern)
    • addPhysicalAddressToCache

      public boolean addPhysicalAddressToCache(Address logical_addr, PhysicalAddress physical_addr)
    • addPhysicalAddressToCache

      protected boolean addPhysicalAddressToCache(Address logical_addr, PhysicalAddress physical_addr, boolean overwrite)
    • getPhysicalAddressFromCache

      public PhysicalAddress getPhysicalAddressFromCache(Address logical_addr)
    • getAllPhysicalAddressesFromCache

      protected Collection<PhysicalAddress> getAllPhysicalAddressesFromCache()
    • removeLogicalAddressFromCache

      protected void removeLogicalAddressFromCache(Address logical_addr)
    • clearLogicalAddressCache

      public void clearLogicalAddressCache()
      Clears the cache. Do not use, this is only for unit testing !
    • getPhysicalAddress

      protected abstract PhysicalAddress getPhysicalAddress()