Class UNICAST3

java.lang.Object
org.jgroups.stack.Protocol
org.jgroups.protocols.UNICAST3
All Implemented Interfaces:
Lifecycle, AgeOutCache.Handler<Address>

public class UNICAST3 extends Protocol implements AgeOutCache.Handler<Address>
Reliable unicast protocol using a combination of positive and negative acks. See docs/design/UNICAST3.txt for details.
Since:
3.3
  • Field Details

    • DEFAULT_FIRST_SEQNO

      protected static final long DEFAULT_FIRST_SEQNO
      See Also:
    • DEFAULT_XMIT_INTERVAL

      protected static final long DEFAULT_XMIT_INTERVAL
      See Also:
    • conn_expiry_timeout

      protected long conn_expiry_timeout
    • conn_close_timeout

      protected long conn_close_timeout
    • xmit_table_num_rows

      protected int xmit_table_num_rows
    • xmit_table_msgs_per_row

      protected int xmit_table_msgs_per_row
    • xmit_table_resize_factor

      protected double xmit_table_resize_factor
    • xmit_table_max_compaction_time

      protected long xmit_table_max_compaction_time
    • max_retransmit_time

      protected long max_retransmit_time
    • xmit_interval

      protected long xmit_interval
    • xmits_enabled

      protected boolean xmits_enabled
    • log_not_found_msgs

      protected boolean log_not_found_msgs
    • ack_threshold

      protected int ack_threshold
    • sync_min_interval

      protected long sync_min_interval
    • max_xmit_req_size

      protected int max_xmit_req_size
    • max_batch_size

      protected int max_batch_size
    • loopback

      protected boolean loopback
    • num_msgs_sent

      protected final LongAdder num_msgs_sent
    • num_msgs_received

      protected final LongAdder num_msgs_received
    • num_acks_sent

      protected final LongAdder num_acks_sent
    • num_acks_received

      protected final LongAdder num_acks_received
    • num_xmits

      protected final LongAdder num_xmits
    • xmit_reqs_received

      protected final LongAdder xmit_reqs_received
    • xmit_reqs_sent

      protected final LongAdder xmit_reqs_sent
    • xmit_rsps_sent

      protected final LongAdder xmit_rsps_sent
    • avg_delivery_batch_size

      protected final AverageMinMax avg_delivery_batch_size
    • sends_can_block

      protected boolean sends_can_block
    • is_trace

      protected boolean is_trace
    • relay_present

      protected boolean relay_present
    • send_table

      protected final ConcurrentMap<Address,UNICAST3.SenderEntry> send_table
    • recv_table

      protected final ConcurrentMap<Address,UNICAST3.ReceiverEntry> recv_table
    • recv_table_lock

      protected final ReentrantLock recv_table_lock
    • xmit_task_map

      protected final Map<Address,Long> xmit_task_map
      Used by the retransmit task to keep the last retransmitted seqno per member (applicable only for received messages (ReceiverEntry)): https://issues.redhat.com/browse/JGRP-1539
    • xmit_task

      protected Future<?> xmit_task
      RetransmitTask running every xmit_interval ms
    • members

      protected volatile List<Address> members
    • timer

      protected TimeScheduler timer
    • running

      protected volatile boolean running
    • last_conn_id

      protected short last_conn_id
    • cache

      protected AgeOutCache<Address> cache
    • time_service

      protected TimeService time_service
    • timestamper

      protected final AtomicInteger timestamper
    • last_sync_sent

      protected ExpiryCache<Address> last_sync_sent
      Keep track of when a SEND_FIRST_SEQNO message was sent to a given sender
    • num_loopbacks

      protected final LongAdder num_loopbacks
    • msg_cache

      protected final MessageCache msg_cache
    • DUMMY_OOB_MSG

      protected static final Message DUMMY_OOB_MSG
    • drop_oob_and_dont_loopback_msgs_filter

      protected final Predicate<Message> drop_oob_and_dont_loopback_msgs_filter
    • dont_loopback_filter

      protected static final Predicate<Message> dont_loopback_filter
    • BATCH_ACCUMULATOR

      protected static final BiConsumer<MessageBatch,Message> BATCH_ACCUMULATOR
  • Constructor Details

    • UNICAST3

      public UNICAST3()
  • Method Details

    • getNumLoopbacks

      public long getNumLoopbacks()
    • getSendWindow

      public Table<Message> getSendWindow(Address target)
      Used for testing only!
    • getNumSendConnections

      public int getNumSendConnections()
    • getNumReceiveConnections

      public int getNumReceiveConnections()
    • getNumConnections

      public int getNumConnections()
    • getTimestamper

      public int getTimestamper()
    • getAckThreshold

      public int getAckThreshold()
    • setAckThreshold

      public UNICAST3 setAckThreshold(int a)
    • setLevel

      public <T extends Protocol> T setLevel(String level)
      Description copied from class: Protocol
      Sets the level of a logger. This method is used to dynamically change the logging level of a running system, e.g. via JMX. The appender of a level needs to exist.
      Overrides:
      setLevel in class Protocol
      Parameters:
      level - The new level. Valid values are "fatal", "error", "warn", "info", "debug", "trace" (capitalization not relevant)
    • getXmitInterval

      public long getXmitInterval()
    • setXmitInterval

      public UNICAST3 setXmitInterval(long i)
    • isXmitsEnabled

      public boolean isXmitsEnabled()
    • setXmitsEnabled

      public UNICAST3 setXmitsEnabled(boolean b)
    • getXmitTableNumRows

      public int getXmitTableNumRows()
    • setXmitTableNumRows

      public UNICAST3 setXmitTableNumRows(int n)
    • getXmitTableMsgsPerRow

      public int getXmitTableMsgsPerRow()
    • setXmitTableMsgsPerRow

      public UNICAST3 setXmitTableMsgsPerRow(int n)
    • getConnExpiryTimeout

      public long getConnExpiryTimeout()
    • setConnExpiryTimeout

      public UNICAST3 setConnExpiryTimeout(long c)
    • getConnCloseTimeout

      public long getConnCloseTimeout()
    • setConnCloseTimeout

      public UNICAST3 setConnCloseTimeout(long c)
    • getXmitTableResizeFactor

      public double getXmitTableResizeFactor()
    • setXmitTableResizeFactor

      public UNICAST3 setXmitTableResizeFactor(double x)
    • getXmitTableMaxCompactionTime

      public long getXmitTableMaxCompactionTime()
    • setXmitTableMaxCompactionTime

      public UNICAST3 setXmitTableMaxCompactionTime(long x)
    • logNotFoundMsgs

      public boolean logNotFoundMsgs()
    • logNotFoundMsgs

      public UNICAST3 logNotFoundMsgs(boolean l)
    • getSyncMinInterval

      public long getSyncMinInterval()
    • setSyncMinInterval

      public UNICAST3 setSyncMinInterval(long s)
    • getMaxXmitReqSize

      public int getMaxXmitReqSize()
    • setMaxXmitReqSize

      public UNICAST3 setMaxXmitReqSize(int m)
    • sendsCanBlock

      public boolean sendsCanBlock()
    • sendsCanBlock

      public UNICAST3 sendsCanBlock(boolean s)
    • loopback

      public boolean loopback()
    • loopback

      public UNICAST3 loopback(boolean b)
    • printConnections

      public String printConnections()
    • getNumMessagesSent

      @Deprecated public long getNumMessagesSent()
      Deprecated.
      Don't remove! https://issues.redhat.com/browse/JGRP-2814
    • getNumMessagesReceived

      @Deprecated public long getNumMessagesReceived()
      Deprecated.
      Don't remove! https://issues.redhat.com/browse/JGRP-2814
    • getNumAcksSent

      public long getNumAcksSent()
    • getNumAcksReceived

      public long getNumAcksReceived()
    • getNumXmits

      public long getNumXmits()
    • getMaxRetransmitTime

      public long getMaxRetransmitTime()
    • setMaxRetransmitTime

      public UNICAST3 setMaxRetransmitTime(long max_retransmit_time)
    • isXmitTaskRunning

      public boolean isXmitTaskRunning()
    • getAgeOutCacheSize

      public int getAgeOutCacheSize()
    • printAgeOutCache

      public String printAgeOutCache()
    • getAgeOutCache

      public AgeOutCache<Address> getAgeOutCache()
    • hasSendConnectionTo

      public boolean hasSendConnectionTo(Address dest)
      Used for testing only
    • getNumUnackedMessages

      public int getNumUnackedMessages()
      The number of messages in all Entry.sent_msgs tables (haven't received an ACK yet)
    • getXmitTableUndeliveredMessages

      public int getXmitTableUndeliveredMessages()
    • getXmitTableMissingMessages

      public int getXmitTableMissingMessages()
    • getXmitTableDeliverableMessages

      public int getXmitTableDeliverableMessages()
    • getXmitTableNumCompactions

      public int getXmitTableNumCompactions()
    • getXmitTableNumMoves

      public int getXmitTableNumMoves()
    • getXmitTableNumResizes

      public int getXmitTableNumResizes()
    • getXmitTableNumPurges

      public int getXmitTableNumPurges()
    • printReceiveWindowMessages

      public String printReceiveWindowMessages()
    • printSendWindowMessages

      public String printSendWindowMessages()
    • resetStats

      public void resetStats()
      Overrides:
      resetStats in class Protocol
    • init

      public void init() throws Exception
      Description copied from class: Protocol
      Called after a protocol has been created and before the protocol is started. Attributes are already set. Other protocols are not yet connected and events cannot yet be sent.
      Specified by:
      init in interface Lifecycle
      Overrides:
      init in class Protocol
      Throws:
      Exception - Thrown if protocol cannot be initialized successfully. This will cause the ProtocolStack to fail, so the the channel constructor will throw an exception
    • start

      public void start() throws Exception
      Description copied from class: Protocol
      This method is called on a JChannel.connect(String); starts work. Protocols are connected ready to receive events. Will be called from bottom to top.
      Specified by:
      start in interface Lifecycle
      Overrides:
      start in class Protocol
      Throws:
      Exception - Thrown if protocol cannot be started successfully. This will cause the ProtocolStack to fail, so JChannel.connect(String) will throw an exception
    • stop

      public void stop()
      Description copied from class: Protocol
      Called on a JChannel.disconnect(); stops work (e.g. by closing multicast socket). Will be called from top to bottom.
      Specified by:
      stop in interface Lifecycle
      Overrides:
      stop in class Protocol
    • up

      public Object up(Message msg)
      Description copied from class: Protocol
      A single message was received. Protocols may examine the message and do something (e.g. add a header) with it before passing it up.
      Overrides:
      up in class Protocol
    • handleUpEvent

      protected void handleUpEvent(Address sender, Message msg, UnicastHeader3 hdr)
    • up

      public void up(MessageBatch batch)
      Description copied from class: Protocol
      Sends up a multiple messages in a MessageBatch. The sender of the batch is always the same, and so is the destination (null == multicast messages). Messages in a batch can be OOB messages, regular messages, or mixed messages, although the transport itself will create initial MessageBatches that contain only either OOB or regular messages.

      The default processing below sends messages up the stack individually, based on a matching criteria (calling Protocol.accept(Message)), and - if true - calls Protocol.up(org.jgroups.Event) for that message and removes the message. If the batch is not empty, it is passed up, or else it is dropped.

      Subclasses should check if there are any messages destined for them (e.g. using MessageBatch.iterator(Predicate)), then possibly remove and process them and finally pass the batch up to the next protocol. Protocols can also modify messages in place, e.g. ENCRYPT could decrypt all encrypted messages in the batch, not remove them, and pass the batch up when done.

      Overrides:
      up in class Protocol
      Parameters:
      batch - The message batch
    • handleBatchFromSelf

      protected void handleBatchFromSelf(MessageBatch batch, UNICAST3.Entry entry)
    • down

      public Object down(Event evt)
      Description copied from class: Protocol
      An event is to be sent down the stack. A protocol may want to examine its type and perform some action on it, depending on the event's type. If the event is a message MSG, then the protocol may need to add a header to it (or do nothing at all) before sending it down the stack using down_prot.down().
      Overrides:
      down in class Protocol
    • down

      public Object down(Message msg)
      Description copied from class: Protocol
      A message is sent down the stack. Protocols may examine the message and do something (e.g. add a header) with it, before passing it down.
      Overrides:
      down in class Protocol
    • isLocalSiteMaster

      protected boolean isLocalSiteMaster(Address dest)
    • isLocal

      protected boolean isLocal(Address addr)
    • closeConnection

      public void closeConnection(Address mbr)
      Removes and resets from connection table (which is already locked). Returns true if member was found, otherwise false. This method is public only so it can be invoked by unit testing, but should not be used !
    • closeSendConnection

      public void closeSendConnection(Address mbr)
    • closeReceiveConnection

      public void closeReceiveConnection(Address mbr)
    • removeSendConnection

      public void removeSendConnection(Address mbr)
    • removeSendConnection

      public void removeSendConnection(Predicate<Address> pred)
    • removeReceiveConnection

      public void removeReceiveConnection(Address mbr)
    • removeAllConnections

      public void removeAllConnections()
      This method is public only so it can be invoked by unit testing, but should not otherwise be used !
    • retransmit

      protected void retransmit(SeqnoList missing, Address sender, Address real_dest)
      Sends a retransmit request to the given sender
    • retransmit

      protected void retransmit(Message msg)
      Called by the sender to resend messages for which no ACK has been received yet
    • expired

      public void expired(Address key)
      Called by AgeOutCache, to removed expired connections
      Specified by:
      expired in interface AgeOutCache.Handler<Address>
      Parameters:
      key -
    • handleDataReceived

      protected void handleDataReceived(Address sender, long seqno, short conn_id, boolean first, Message msg)
      Check whether the hashtable contains an entry e for sender (create if not). If e.received_msgs is null and first is true: create a new AckReceiverWindow(seqno) and add message. Set e.received_msgs to the new window. Else just add the message.
    • addMessage

      protected void addMessage(UNICAST3.ReceiverEntry entry, Address sender, long seqno, Message msg)
    • addQueuedMessages

      protected void addQueuedMessages(Address sender, UNICAST3.ReceiverEntry entry, Collection<Message> queued_msgs)
    • handleDataReceivedFromSelf

      protected void handleDataReceivedFromSelf(Address sender, long seqno, Message msg)
      Called when the sender of a message is the local member. In this case, we don't need to add the message to the table as the sender already did that
    • handleBatchReceived

      protected void handleBatchReceived(UNICAST3.ReceiverEntry entry, Address sender, List<LongTuple<Message>> msgs, boolean oob, Address original_dest)
    • removeAndDeliver

      protected void removeAndDeliver(Table<Message> win, Address sender)
      Try to remove as many messages as possible from the table as pass them up. Prevents concurrent passing up of messages by different threads (https://issues.redhat.com/browse/JGRP-198); lots of threads can come up to this point concurrently, but only 1 is allowed to pass at a time. We *can* deliver messages from *different* senders concurrently, e.g. reception of P1, Q1, P2, Q2 can result in delivery of P1, Q1, Q2, P2: FIFO (implemented by UNICAST) says messages need to be delivered in the order in which they were sent
    • printMessageList

      protected String printMessageList(List<LongTuple<Message>> list)
    • getReceiverEntry

      protected UNICAST3.ReceiverEntry getReceiverEntry(Address sender, long seqno, boolean first, short conn_id, Address dest)
    • getSenderEntry

      protected UNICAST3.SenderEntry getSenderEntry(Address dst)
    • createReceiverEntry

      protected UNICAST3.ReceiverEntry createReceiverEntry(Address sender, long seqno, short conn_id, Address dest)
    • createTable

      protected Table<Message> createTable(long seqno)
    • handleAckReceived

      protected void handleAckReceived(Address sender, long seqno, short conn_id, int timestamp)
      Add the ACK to hashtable.sender.sent_msgs
    • handleResendingOfFirstMessage

      protected void handleResendingOfFirstMessage(Address sender, int timestamp)
      We need to resend the first message with our conn_id
      Parameters:
      sender -
    • handleXmitRequest

      protected void handleXmitRequest(Address sender, SeqnoList missing)
    • resend

      protected void resend(Message msg)
    • deliverMessage

      protected void deliverMessage(Message msg, Address sender, long seqno)
    • deliverBatch

      protected void deliverBatch(MessageBatch batch)
    • getTimestamp

      protected long getTimestamp()
    • startRetransmitTask

      public void startRetransmitTask()
    • stopRetransmitTask

      public void stopRetransmitTask()
    • isCallerRunsHandler

      protected static boolean isCallerRunsHandler(RejectedExecutionHandler h)
    • sendAck

      protected void sendAck(Address dst, long seqno, short conn_id, Address real_dest)
    • getNewConnectionId

      protected short getNewConnectionId()
    • sendRequestForFirstSeqno

      protected void sendRequestForFirstSeqno(Address dest, Address original_dest)
    • sendClose

      public void sendClose(Address dest, short conn_id)
    • closeIdleConnections

      public void closeIdleConnections()
    • removeExpiredConnections

      public int removeExpiredConnections()
    • removeConnections

      public int removeConnections(boolean remove_send_connections, boolean remove_receive_connections)
      Removes send- and/or receive-connections whose state is not OPEN (CLOSING or CLOSED).
      Parameters:
      remove_send_connections - If true, send connections whose state is !OPEN are destroyed and removed
      remove_receive_connections - If true, receive connections with state !OPEN are destroyed and removed
      Returns:
      The number of connections which were removed
    • triggerXmit

      public void triggerXmit()
    • sendPendingAcks

      public void sendPendingAcks()
    • update

      protected void update(UNICAST3.Entry entry, int num_received)
    • compare

      protected static int compare(int ts1, int ts2)
      Compares 2 timestamps, handles numeric overflow
    • accumulate

      @SafeVarargs protected static int accumulate(ToIntFunction<Table<Message>> func, Collection<? extends UNICAST3.Entry>... entries)