Class NAKACK2

java.lang.Object
org.jgroups.stack.Protocol
org.jgroups.protocols.pbcast.NAKACK2
All Implemented Interfaces:
Lifecycle, DiagnosticsHandler.ProbeHandler

public class NAKACK2 extends Protocol implements DiagnosticsHandler.ProbeHandler
Negative AcKnowledgement layer (NAKs). Messages are assigned a monotonically increasing sequence number (seqno). Receivers deliver messages ordered according to seqno and request retransmission of missing messages.

Retransmit requests are usually sent to the original sender of a message, but this can be changed by xmit_from_random_member (send to random member) or use_mcast_xmit_req (send to everyone). Responses can also be sent to everyone instead of the requester by setting use_mcast_xmit to true.

  • Field Details

    • NUM_REBROADCAST_MSGS

      protected static final int NUM_REBROADCAST_MSGS
      See Also:
    • use_mcast_xmit

      protected boolean use_mcast_xmit
      Retransmit messages using multicast rather than unicast. This has the advantage that, if many receivers lost a message, the sender only retransmits once
    • use_mcast_xmit_req

      protected boolean use_mcast_xmit_req
      Use a multicast to request retransmission of missing messages. This may be costly as every member in the cluster will send a response
    • xmit_from_random_member

      protected boolean xmit_from_random_member
      Ask a random member for retransmission of a missing message. If set to true, discard_delivered_msgs will be set to false
    • discard_delivered_msgs

      protected boolean discard_delivered_msgs
      Messages that have been received in order are sent up the stack (= delivered to the application). Delivered messages are removed from the retransmit table, so they can get GC'ed by the JVM. When this property is true, everyone (except the sender of a message) removes the message from their retransmit table as soon as it has been delivered to the application
    • max_rebroadcast_timeout

      protected long max_rebroadcast_timeout
    • log_discard_msgs

      protected boolean log_discard_msgs
      If true, logs messages discarded because received from other members
    • log_not_found_msgs

      protected boolean log_not_found_msgs
    • xmit_interval

      protected long xmit_interval
    • xmit_table_num_rows

      protected int xmit_table_num_rows
    • xmit_table_msgs_per_row

      protected int xmit_table_msgs_per_row
    • xmit_table_resize_factor

      protected double xmit_table_resize_factor
    • xmit_table_max_compaction_time

      protected long xmit_table_max_compaction_time
    • become_server_queue_size

      protected int become_server_queue_size
    • suppress_time_non_member_warnings

      protected long suppress_time_non_member_warnings
    • max_xmit_req_size

      protected int max_xmit_req_size
    • max_batch_size

      protected int max_batch_size
    • resend_last_seqno

      protected boolean resend_last_seqno
    • resend_last_seqno_max_times

      protected int resend_last_seqno_max_times
    • sends_can_block

      protected boolean sends_can_block
    • num_messages_sent

      protected int num_messages_sent
    • num_messages_received

      protected int num_messages_received
    • DUMMY_OOB_MSG

      protected static final Message DUMMY_OOB_MSG
    • no_dummy_and_no_oob_delivered_msgs_and_no_dont_loopback_msgs

      protected final Predicate<Message> no_dummy_and_no_oob_delivered_msgs_and_no_dont_loopback_msgs
    • dont_loopback_filter

      protected static final Predicate<Message> dont_loopback_filter
    • BATCH_ACCUMULATOR

      protected static final BiConsumer<MessageBatch,Message> BATCH_ACCUMULATOR
    • SEQNO_GETTER

      protected final Function<Message,Long> SEQNO_GETTER
    • HAS_HEADER

      protected final Predicate<Message> HAS_HEADER
    • xmit_reqs_received

      protected final LongAdder xmit_reqs_received
    • xmit_reqs_sent

      protected final LongAdder xmit_reqs_sent
    • xmit_rsps_received

      protected final LongAdder xmit_rsps_received
    • xmit_rsps_sent

      protected final LongAdder xmit_rsps_sent
    • avg_batch_size

      protected final AverageMinMax avg_batch_size
      The average number of messages in a received MessageBatch
    • is_trace

      protected boolean is_trace
    • is_server

      protected volatile boolean is_server
    • members

      protected volatile List<Address> members
    • view

      protected volatile View view
    • seqno

      private final AtomicLong seqno
    • xmit_table

      protected final ConcurrentMap<Address,Table<Message>> xmit_table
      Map to store sent and received messages (keyed by sender)
    • local_xmit_table

      protected Table<Message> local_xmit_table
    • xmit_task

      protected Future<?> xmit_task
      RetransmitTask running every xmit_interval ms
    • xmit_task_map

      protected final Map<Address,Long> xmit_task_map
      Used by the retransmit task to keep the last retransmitted seqno per sender (https://issues.redhat.com/browse/JGRP-1539)
    • stable_xmit_map

      protected final Map<Address,Long> stable_xmit_map
    • leaving

      protected volatile boolean leaving
    • running

      protected volatile boolean running
    • timer

      protected TimeScheduler timer
    • last_seqno_resender

      protected NAKACK2.LastSeqnoResender last_seqno_resender
    • rebroadcast_lock

      protected final Lock rebroadcast_lock
    • rebroadcast_done

      protected final Condition rebroadcast_done
    • rebroadcasting

      protected volatile boolean rebroadcasting
    • rebroadcast_digest_lock

      protected final Lock rebroadcast_digest_lock
    • rebroadcast_digest

      protected Digest rebroadcast_digest
    • stability_msgs

      protected final BoundedList<String> stability_msgs
      Keeps the last N stability messages
    • digest_history

      protected final BoundedList<String> digest_history
      Keeps a bounded list of the last N digest sets
    • become_server_queue

      protected Queue<Message> become_server_queue
    • suppress_log_non_member

      protected SuppressLog<Address> suppress_log_non_member
      Log to suppress identical warnings for messages from non-members
  • Constructor Details

    • NAKACK2

      public NAKACK2()
  • Method Details

    • isXmitTaskRunning

      public boolean isXmitTaskRunning()
    • getNonMemberMessages

      public int getNonMemberMessages()
    • clearNonMemberCache

      public void clearNonMemberCache()
    • setResendLastSeqno

      public void setResendLastSeqno(boolean flag)
    • resendLastSeqno

      public NAKACK2 resendLastSeqno(boolean flag)
    • resendTaskRunning

      public boolean resendTaskRunning()
    • getXmitRequestsReceived

      public long getXmitRequestsReceived()
    • getXmitRequestsSent

      public long getXmitRequestsSent()
    • getXmitResponsesReceived

      public long getXmitResponsesReceived()
    • getXmitResponsesSent

      public long getXmitResponsesSent()
    • useMcastXmit

      public boolean useMcastXmit()
    • useMcastXmit

      public NAKACK2 useMcastXmit(boolean u)
    • useMcastXmitReq

      public boolean useMcastXmitReq()
    • useMcastXmitReq

      public NAKACK2 useMcastXmitReq(boolean flag)
    • xmitFromRandomMember

      public boolean xmitFromRandomMember()
    • xmitFromRandomMember

      public NAKACK2 xmitFromRandomMember(boolean x)
    • discardDeliveredMsgs

      public boolean discardDeliveredMsgs()
    • discardDeliveredMsgs

      public NAKACK2 discardDeliveredMsgs(boolean d)
    • logDiscardMessages

      public boolean logDiscardMessages()
    • logDiscardMessages

      public NAKACK2 logDiscardMessages(boolean l)
    • logNotFoundMessages

      public boolean logNotFoundMessages()
    • logNotFoundMessages

      public NAKACK2 logNotFoundMessages(boolean flag)
    • setResendLastSeqnoMaxTimes

      public NAKACK2 setResendLastSeqnoMaxTimes(int n)
    • getResendLastSeqnoMaxTimes

      public int getResendLastSeqnoMaxTimes()
    • setXmitFromRandomMember

      public NAKACK2 setXmitFromRandomMember(boolean r)
    • setDiscardDeliveredMsgs

      public NAKACK2 setDiscardDeliveredMsgs(boolean d)
    • getMaxRebroadcastTimeout

      public long getMaxRebroadcastTimeout()
    • setMaxRebroadcastTimeout

      public NAKACK2 setMaxRebroadcastTimeout(long m)
    • getXmitInterval

      public long getXmitInterval()
    • setXmitInterval

      public NAKACK2 setXmitInterval(long x)
    • getXmitTableNumRows

      public int getXmitTableNumRows()
    • setXmitTableNumRows

      public NAKACK2 setXmitTableNumRows(int x)
    • getXmitTableMsgsPerRow

      public int getXmitTableMsgsPerRow()
    • setXmitTableMsgsPerRow

      public NAKACK2 setXmitTableMsgsPerRow(int x)
    • getXmitTableResizeFactor

      public double getXmitTableResizeFactor()
    • setXmitTableResizeFactor

      public NAKACK2 setXmitTableResizeFactor(double x)
    • getXmitTableMaxCompactionTime

      public long getXmitTableMaxCompactionTime()
    • setXmitTableMaxCompactionTime

      public NAKACK2 setXmitTableMaxCompactionTime(long x)
    • getBecomeServerQueueSize

      public int getBecomeServerQueueSize()
    • setBecomeServerQueueSize

      public NAKACK2 setBecomeServerQueueSize(int b)
    • getSuppressTimeNonMemberWarnings

      public long getSuppressTimeNonMemberWarnings()
    • setSuppressTimeNonMemberWarnings

      public NAKACK2 setSuppressTimeNonMemberWarnings(long s)
    • getMaxXmitReqSize

      public int getMaxXmitReqSize()
    • setMaxXmitReqSize

      public NAKACK2 setMaxXmitReqSize(int m)
    • sendsCanBlock

      public boolean sendsCanBlock()
    • sendsCanBlock

      public NAKACK2 sendsCanBlock(boolean s)
    • getNumMessagesSent

      public int getNumMessagesSent()
    • setNumMessagesSent

      public NAKACK2 setNumMessagesSent(int n)
    • getNumMessagesReceived

      public int getNumMessagesReceived()
    • setNumMessagesReceived

      public NAKACK2 setNumMessagesReceived(int n)
    • isTrace

      public boolean isTrace()
    • isTrace

      public NAKACK2 isTrace(boolean i)
    • setLevel

      public <T extends Protocol> T setLevel(String level)
      Description copied from class: Protocol
      Sets the level of a logger. This method is used to dynamically change the logging level of a running system, e.g. via JMX. The appender of a level needs to exist.
      Overrides:
      setLevel in class Protocol
      Parameters:
      level - The new level. Valid values are "fatal", "error", "warn", "info", "debug", "trace" (capitalization not relevant)
    • getBecomeServerQueueSizeActual

      public int getBecomeServerQueueSizeActual()
    • getWindow

      public Table<Message> getWindow(Address sender)
      Returns the receive window for sender; only used for testing. Do not use !
    • setTimer

      public void setTimer(TimeScheduler timer)
      Only used for unit tests, don't use !
    • getXmitTableUndeliveredMsgs

      public int getXmitTableUndeliveredMsgs()
    • getXmitTableMissingMessages

      public int getXmitTableMissingMessages()
    • getXmitTableCapacity

      public long getXmitTableCapacity()
    • getXmitTableNumCurrentRows

      public int getXmitTableNumCurrentRows()
    • getSizeOfAllMessages

      public long getSizeOfAllMessages()
    • getSizeOfAllMessagesInclHeaders

      public long getSizeOfAllMessagesInclHeaders()
    • getXmitTableNumCompactions

      public int getXmitTableNumCompactions()
    • getXmitTableNumMoves

      public int getXmitTableNumMoves()
    • getXmitTableNumResizes

      public int getXmitTableNumResizes()
    • getXmitTableNumPurges

      public int getXmitTableNumPurges()
    • printMessages

      public String printMessages()
    • getCurrentSeqno

      public long getCurrentSeqno()
    • printStabilityMessages

      public String printStabilityMessages()
    • printDigestHistory

      public String printDigestHistory()
    • compact

      public void compact()
    • dumpXmitTablesNumCurrentRows

      public String dumpXmitTablesNumCurrentRows()
    • resetStats

      public void resetStats()
      Overrides:
      resetStats in class Protocol
    • init

      public void init() throws Exception
      Description copied from class: Protocol
      Called after a protocol has been created and before the protocol is started. Attributes are already set. Other protocols are not yet connected and events cannot yet be sent.
      Specified by:
      init in interface Lifecycle
      Overrides:
      init in class Protocol
      Throws:
      Exception - Thrown if protocol cannot be initialized successfully. This will cause the ProtocolStack to fail, so the the channel constructor will throw an exception
    • providedUpServices

      public List<Integer> providedUpServices()
      Description copied from class: Protocol
      List of events that are provided to layers above (they will be handled when sent down from above)
      Overrides:
      providedUpServices in class Protocol
    • start

      public void start() throws Exception
      Description copied from class: Protocol
      This method is called on a JChannel.connect(String); starts work. Protocols are connected ready to receive events. Will be called from bottom to top.
      Specified by:
      start in interface Lifecycle
      Overrides:
      start in class Protocol
      Throws:
      Exception - Thrown if protocol cannot be started successfully. This will cause the ProtocolStack to fail, so JChannel.connect(String) will throw an exception
    • stop

      public void stop()
      Description copied from class: Protocol
      Called on a JChannel.disconnect(); stops work (e.g. by closing multicast socket). Will be called from top to bottom.
      Specified by:
      stop in interface Lifecycle
      Overrides:
      stop in class Protocol
    • down

      public Object down(Event evt)
      Callback. Called by superclass when event may be handled.

      Do not use down_prot.down() in this method as the event is passed down by default by the superclass after this method returns !

      Overrides:
      down in class Protocol
    • down

      public Object down(Message msg)
      Description copied from class: Protocol
      A message is sent down the stack. Protocols may examine the message and do something (e.g. add a header) with it, before passing it down.
      Overrides:
      down in class Protocol
    • up

      public Object up(Event evt)
      Callback. Called by superclass when event may be handled.

      Do not use passUp in this method as the event is passed up by default by the superclass after this method returns !

      Overrides:
      up in class Protocol
    • up

      public Object up(Message msg)
      Description copied from class: Protocol
      A single message was received. Protocols may examine the message and do something (e.g. add a header) with it before passing it up.
      Overrides:
      up in class Protocol
    • up

      public void up(MessageBatch mb)
      Description copied from class: Protocol
      Sends up a multiple messages in a MessageBatch. The sender of the batch is always the same, and so is the destination (null == multicast messages). Messages in a batch can be OOB messages, regular messages, or mixed messages, although the transport itself will create initial MessageBatches that contain only either OOB or regular messages.

      The default processing below sends messages up the stack individually, based on a matching criteria (calling Protocol.accept(Message)), and - if true - calls Protocol.up(org.jgroups.Event) for that message and removes the message. If the batch is not empty, it is passed up, or else it is dropped.

      Subclasses should check if there are any messages destined for them (e.g. using MessageBatch.iterator(Predicate)), then possibly remove and process them and finally pass the batch up to the next protocol. Protocols can also modify messages in place, e.g. ENCRYPT could decrypt all encrypted messages in the batch, not remove them, and pass the batch up when done.

      Overrides:
      up in class Protocol
      Parameters:
      mb - The message batch
    • handleProbe

      public Map<String,String> handleProbe(String... keys)
      Description copied from interface: DiagnosticsHandler.ProbeHandler
      Handles a probe. For each key that is handled, the key and its result should be in the returned map.
      Specified by:
      handleProbe in interface DiagnosticsHandler.ProbeHandler
      Returns:
      A map of keys and values. A null return value is permissible.
    • supportedKeys

      public String[] supportedKeys()
      Description copied from interface: DiagnosticsHandler.ProbeHandler
      Returns a list of supported keys
      Specified by:
      supportedKeys in interface DiagnosticsHandler.ProbeHandler
    • queueMessage

      protected void queueMessage(Message msg, long seqno)
    • unknownMember

      protected void unknownMember(Address sender)
    • send

      protected void send(Message msg)
      Adds the message to the sent_msgs table and then passes it down the stack. Change Bela Ban May 26 2002: we don't store a copy of the message, but a reference ! This saves us a lot of memory. However, this also means that a message should not be changed after storing it in the sent-table ! See protocols/DESIGN for details. Made seqno increment and adding to sent_msgs atomic, e.g. seqno won't get incremented if adding to sent_msgs fails e.g. due to an OOM (see https://issues.redhat.com/browse/JGRP-179). bela Jan 13 2006
    • resend

      protected void resend(Message msg)
    • handleMessage

      protected void handleMessage(Message msg, NakAckHeader2 hdr)
      Finds the corresponding retransmit buffer and adds the message to it (according to seqno). Then removes as many messages as possible and passes them up the stack. Discards messages from non-members.
    • handleMessageBatch

      protected void handleMessageBatch(MessageBatch mb)
    • removeAndDeliver

      protected void removeAndDeliver(Table<Message> buf, Address sender, boolean loopback, AsciiString cluster_name)
      Efficient way of checking whether another thread is already processing messages from sender. If that's the case, we return immediately and let the existing thread process our message (https://issues.redhat.com/browse/JGRP-829). Benefit: fewer threads blocked on the same lock, these threads can be returned to the thread pool
    • handleXmitReq

      protected void handleXmitReq(Address xmit_requester, SeqnoList missing_msgs, Address original_sender)
      Retransmits messsages first_seqno to last_seqno from original_sender from xmit_table to xmit_requester, called when XMIT_REQ is received.
      Parameters:
      xmit_requester - The sender of the XMIT_REQ, we have to send the requested copy of the message to this address
      missing_msgs - A list of seqnos that have to be retransmitted
      original_sender - The member who originally sent the messsage. Guaranteed to be non-null
    • deliver

      protected void deliver(Message msg, Address sender, long seqno, String error_msg)
    • deliverBatch

      protected void deliverBatch(MessageBatch batch)
    • flushBecomeServerQueue

      protected void flushBecomeServerQueue()
      Flushes the queue. Done in a separate thread as we don't want to block the GMS.installView(org.jgroups.View,org.jgroups.util.Digest) method (called when a view is installed).
    • cancelRebroadcasting

      protected void cancelRebroadcasting()
    • sendXmitRsp

      protected void sendXmitRsp(Address dest, Message msg)
      Sends a message msg to the requester. We have to wrap the original message into a retransmit message, as we need to preserve the original message's properties, such as src, headers etc.
      Parameters:
      dest -
      msg -
    • handleXmitRsp

      protected void handleXmitRsp(Message msg, NakAckHeader2 hdr)
    • handleHighestSeqno

      protected void handleHighestSeqno(Address sender, long seqno)
      Compares the sender's highest seqno with my highest seqno: if the sender's is higher, ask sender for retransmission
      Parameters:
      sender - The sender
      seqno - The highest seqno sent by sender
    • msgFromXmitRsp

      protected Message msgFromXmitRsp(Message msg, NakAckHeader2 hdr)
    • rebroadcastMessages

      protected void rebroadcastMessages()
      Takes the argument highest_seqnos and compares it to the current digest. If the current digest has fewer messages, then send retransmit messages for the missing messages. Return when all missing messages have been received. If we're waiting for a missing message from P, and P crashes while waiting, we need to exclude P from the wait set.
    • checkForRebroadcasts

      protected void checkForRebroadcasts()
    • isGreaterThanOrEqual

      protected static boolean isGreaterThanOrEqual(Digest first, Digest other)
      Returns true if all senders of the current digest have their seqnos >= the ones from other
    • isCallerRunsHandler

      protected static boolean isCallerRunsHandler(RejectedExecutionHandler h)
    • adjustReceivers

      protected void adjustReceivers(List<Address> members)
      Removes old members from xmit-table and adds new members to xmit-table (at seqnos hd=0, hr=0). This method is not called concurrently
    • getDigest

      public Digest getDigest()
      Returns a message digest: for each member P the highest delivered and received seqno is added
    • getDigest

      public Digest getDigest(Address mbr)
    • setDigest

      protected void setDigest(Digest digest)
      Creates a retransmit buffer for each sender in the digest according to the sender's seqno. If a buffer already exists, it resets it.
    • mergeDigest

      protected void mergeDigest(Digest digest)
      For all members of the digest, adjust the retransmit buffers in xmit_table. If no entry exists, create one with the initial seqno set to the seqno of the member in the digest. If the member already exists, and is not the local address, replace it with the new entry (https://issues.redhat.com/browse/JGRP-699) if the digest's seqno is greater than the seqno in the window.
    • overwriteDigest

      protected void overwriteDigest(Digest digest)
      Overwrites existing entries, but does NOT remove entries not found in the digest
      Parameters:
      digest -
    • setDigest

      protected void setDigest(Digest digest, boolean merge)
      Sets or merges the digest. If there is no entry for a given member in xmit_table, create a new buffer. Else skip the existing entry, unless it is a merge. In this case, skip the existing entry if its seqno is greater than or equal to the one in the digest, or reset the window and create a new one if not.
      Parameters:
      digest - The digest
      merge - Whether to merge the new digest with our own, or not
    • createTable

      protected Table<Message> createTable(long initial_seqno)
    • stable

      protected void stable(Digest digest)
      Garbage collect messages that have been seen by all members. Update sent_msgs: for the sender P in the digest which is equal to the local address, garbage collect all messages <= seqno at digest[P]. Update xmit_table: for each sender P in the digest and its highest seqno seen SEQ, garbage collect all delivered_msgs in the retransmit buffer corresponding to P which are <= seqno at digest[P].
    • retransmit

      protected void retransmit(long first_seqno, long last_seqno, Address sender, boolean multicast_xmit_request)
    • retransmit

      protected void retransmit(SeqnoList missing_msgs, Address sender, boolean multicast_xmit_request)
    • reset

      protected void reset()
    • sizeOfAllMessages

      protected static long sizeOfAllMessages(Table<Message> buf, boolean include_headers)
    • startRetransmitTask

      protected void startRetransmitTask()
    • stopRetransmitTask

      protected void stopRetransmitTask()
    • triggerXmit

      public void triggerXmit()