Class NAKACK2
- All Implemented Interfaces:
Lifecycle,DiagnosticsHandler.ProbeHandler
Retransmit requests are usually sent to the original sender of a message, but this can be changed by xmit_from_random_member (send to random member) or use_mcast_xmit_req (send to everyone). Responses can also be sent to everyone instead of the requester by setting use_mcast_xmit to true.
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionprotected classClass which is called by RetransmitTask to resend the last seqno sent (if resend_last_seqno is enabled)protected classRetransmitter task which periodically (every xmit_interval ms) looks at all the retransmit tables and sends retransmit request to all members from which we have missing messages -
Field Summary
FieldsModifier and TypeFieldDescriptionprotected final AverageMinMaxThe average number of messages in a receivedMessageBatchprotected static final BiConsumer<MessageBatch, Message> protected intprotected final BoundedList<String> Keeps a bounded list of the last N digest setsprotected booleanMessages that have been received in order are sent up the stack (= delivered to the application).protected static final Messageprotected booleanprotected booleanprotected NAKACK2.LastSeqnoResenderprotected booleanprotected booleanIf true, logs messages discarded because received from other membersprotected booleanprotected intprotected longprotected intprotected intprotected intprotected static final intprotected Digestprotected final Lockprotected final Conditionprotected final Lockprotected booleanprotected booleanprotected intprotected booleanprotected booleanprivate final AtomicLongprotected final BoundedList<String> Keeps the last N stability messagesprotected SuppressLog<Address> Log to suppress identical warnings for messages from non-membersprotected longprotected TimeSchedulerprotected booleanRetransmit messages using multicast rather than unicast.protected booleanUse a multicast to request retransmission of missing messages.protected Viewprotected booleanAsk a random member for retransmission of a missing message.protected longprotected final LongAdderprotected final LongAdderprotected final LongAdderprotected final LongAdderprotected final ConcurrentMap<Address, Table<Message>> Map to store sent and received messages (keyed by sender)protected longprotected intprotected intprotected doubleprotected Future<?> RetransmitTask running every xmit_interval msUsed by the retransmit task to keep the last retransmitted seqno per sender (https://issues.redhat.com/browse/JGRP-1539)Fields inherited from class org.jgroups.stack.Protocol
after_creation_hook, down_prot, ergonomics, id, local_addr, log, policies, stack, stats, up_prot -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionprotected voidadjustReceivers(List<Address> members) Removes old members from xmit-table and adds new members to xmit-table (at seqnos hd=0, hr=0).protected voidprotected voidvoidvoidcompact()createTable(long initial_seqno) protected voidprotected voiddeliverBatch(MessageBatch batch) booleandiscardDeliveredMsgs(boolean d) Callback.A message is sent down the stack.protected voidFlushes the queue.intintlongReturns a message digest: for each member P the highest delivered and received seqno is addedlongintintintintintlonglonglongReturns the receive window for sender; only used for testing.longlonglonglonglonglonglongintintintintintintintintdoubleintprotected voidhandleHighestSeqno(Address sender, long seqno) Compares the sender's highest seqno with my highest seqno: if the sender's is higher, ask sender for retransmissionprotected voidhandleMessage(Message msg, NakAckHeader2 hdr) Finds the corresponding retransmit buffer and adds the message to it (according to seqno).protected voidhandleProbe(String... keys) Handles a probe.protected voidhandleXmitReq(Address xmit_requester, SeqnoList missing_msgs, Address original_sender) Retransmits messsages first_seqno to last_seqno from original_sender from xmit_table to xmit_requester, called when XMIT_REQ is received.protected voidhandleXmitRsp(Message msg, NakAckHeader2 hdr) voidinit()Called after a protocol has been created and before the protocol is started.protected static booleanprotected static booleanisGreaterThanOrEqual(Digest first, Digest other) Returns true if all senders of the current digest have their seqnos >= the ones from otherbooleanisTrace()isTrace(boolean i) booleanbooleanlogDiscardMessages(boolean l) booleanlogNotFoundMessages(boolean flag) protected voidmergeDigest(Digest digest) For all members of the digest, adjust the retransmit buffers in xmit_table.protected MessagemsgFromXmitRsp(Message msg, NakAckHeader2 hdr) protected voidoverwriteDigest(Digest digest) Overwrites existing entries, but does NOT remove entries not found in the digestList of events that are provided to layers above (they will be handled when sent down from above)protected voidqueueMessage(Message msg, long seqno) protected voidTakes the argument highest_seqnos and compares it to the current digest.protected voidremoveAndDeliver(Table<Message> buf, Address sender, boolean loopback, AsciiString cluster_name) Efficient way of checking whether another thread is already processing messages from sender.protected voidresendLastSeqno(boolean flag) booleanprotected voidreset()voidprotected voidretransmit(long first_seqno, long last_seqno, Address sender, boolean multicast_xmit_request) protected voidretransmit(SeqnoList missing_msgs, Address sender, boolean multicast_xmit_request) protected voidAdds the message to the sent_msgs table and then passes it down the stack.booleansendsCanBlock(boolean s) protected voidsendXmitRsp(Address dest, Message msg) Sends a message msg to the requester.setBecomeServerQueueSize(int b) protected voidCreates a retransmit buffer for each sender in the digest according to the sender's seqno.protected voidSets or merges the digest.setDiscardDeliveredMsgs(boolean d) <T extends Protocol>
TSets the level of a logger.setMaxRebroadcastTimeout(long m) setMaxXmitReqSize(int m) setNumMessagesReceived(int n) setNumMessagesSent(int n) voidsetResendLastSeqno(boolean flag) setResendLastSeqnoMaxTimes(int n) setSuppressTimeNonMemberWarnings(long s) voidsetTimer(TimeScheduler timer) Only used for unit tests, don't use !setXmitFromRandomMember(boolean r) setXmitInterval(long x) setXmitTableMaxCompactionTime(long x) setXmitTableMsgsPerRow(int x) setXmitTableNumRows(int x) setXmitTableResizeFactor(double x) protected static longsizeOfAllMessages(Table<Message> buf, boolean include_headers) protected voidGarbage collect messages that have been seen by all members.voidstart()This method is called on aJChannel.connect(String); starts work.protected voidvoidstop()Called on aJChannel.disconnect(); stops work (e.g.protected voidString[]Returns a list of supported keysvoidprotected voidunknownMember(Address sender) Callback.A single message was received.voidup(MessageBatch mb) Sends up a multiple messages in aMessageBatch.booleanuseMcastXmit(boolean u) booleanuseMcastXmitReq(boolean flag) booleanxmitFromRandomMember(boolean x) Methods inherited from class org.jgroups.stack.Protocol
accept, addPolicy, addr, addr, afterCreationHook, destroy, down, enableStats, getAddress, getComponents, getDownProtocol, getDownServices, getId, getIdsAbove, getLevel, getLog, getName, getPolicies, getProtocolStack, getSocketFactory, getThreadFactory, getTransport, getUpProtocol, getUpServices, getValue, isErgonomics, level, parse, policies, providedDownServices, removePolicy, requiredDownServices, requiredUpServices, resetStatistics, setAddress, setDownProtocol, setErgonomics, setId, setPolicies, setProtocolStack, setSocketFactory, setUpProtocol, setValue, statsEnabled, toString
-
Field Details
-
NUM_REBROADCAST_MSGS
protected static final int NUM_REBROADCAST_MSGS- See Also:
-
use_mcast_xmit
protected boolean use_mcast_xmitRetransmit messages using multicast rather than unicast. This has the advantage that, if many receivers lost a message, the sender only retransmits once -
use_mcast_xmit_req
protected boolean use_mcast_xmit_reqUse a multicast to request retransmission of missing messages. This may be costly as every member in the cluster will send a response -
xmit_from_random_member
protected boolean xmit_from_random_memberAsk a random member for retransmission of a missing message. If set to true, discard_delivered_msgs will be set to false -
discard_delivered_msgs
protected boolean discard_delivered_msgsMessages that have been received in order are sent up the stack (= delivered to the application). Delivered messages are removed from the retransmit table, so they can get GC'ed by the JVM. When this property is true, everyone (except the sender of a message) removes the message from their retransmit table as soon as it has been delivered to the application -
max_rebroadcast_timeout
protected long max_rebroadcast_timeout -
log_discard_msgs
protected boolean log_discard_msgsIf true, logs messages discarded because received from other members -
log_not_found_msgs
protected boolean log_not_found_msgs -
xmit_interval
protected long xmit_interval -
xmit_table_num_rows
protected int xmit_table_num_rows -
xmit_table_msgs_per_row
protected int xmit_table_msgs_per_row -
xmit_table_resize_factor
protected double xmit_table_resize_factor -
xmit_table_max_compaction_time
protected long xmit_table_max_compaction_time -
become_server_queue_size
protected int become_server_queue_size -
suppress_time_non_member_warnings
protected long suppress_time_non_member_warnings -
max_xmit_req_size
protected int max_xmit_req_size -
max_batch_size
protected int max_batch_size -
resend_last_seqno
protected boolean resend_last_seqno -
resend_last_seqno_max_times
protected int resend_last_seqno_max_times -
sends_can_block
protected boolean sends_can_block -
num_messages_sent
protected int num_messages_sent -
num_messages_received
protected int num_messages_received -
DUMMY_OOB_MSG
-
no_dummy_and_no_oob_delivered_msgs_and_no_dont_loopback_msgs
-
dont_loopback_filter
-
BATCH_ACCUMULATOR
-
SEQNO_GETTER
-
HAS_HEADER
-
xmit_reqs_received
-
xmit_reqs_sent
-
xmit_rsps_received
-
xmit_rsps_sent
-
avg_batch_size
The average number of messages in a receivedMessageBatch -
is_trace
protected boolean is_trace -
is_server
protected volatile boolean is_server -
members
-
view
-
seqno
-
xmit_table
Map to store sent and received messages (keyed by sender) -
local_xmit_table
-
xmit_task
RetransmitTask running every xmit_interval ms -
xmit_task_map
Used by the retransmit task to keep the last retransmitted seqno per sender (https://issues.redhat.com/browse/JGRP-1539) -
stable_xmit_map
-
leaving
protected volatile boolean leaving -
running
protected volatile boolean running -
timer
-
last_seqno_resender
-
rebroadcast_lock
-
rebroadcast_done
-
rebroadcasting
protected volatile boolean rebroadcasting -
rebroadcast_digest_lock
-
rebroadcast_digest
-
stability_msgs
Keeps the last N stability messages -
digest_history
Keeps a bounded list of the last N digest sets -
become_server_queue
-
suppress_log_non_member
Log to suppress identical warnings for messages from non-members
-
-
Constructor Details
-
NAKACK2
public NAKACK2()
-
-
Method Details
-
isXmitTaskRunning
public boolean isXmitTaskRunning() -
getNonMemberMessages
public int getNonMemberMessages() -
clearNonMemberCache
public void clearNonMemberCache() -
setResendLastSeqno
public void setResendLastSeqno(boolean flag) -
resendLastSeqno
-
resendTaskRunning
public boolean resendTaskRunning() -
getXmitRequestsReceived
public long getXmitRequestsReceived() -
getXmitRequestsSent
public long getXmitRequestsSent() -
getXmitResponsesReceived
public long getXmitResponsesReceived() -
getXmitResponsesSent
public long getXmitResponsesSent() -
useMcastXmit
public boolean useMcastXmit() -
useMcastXmit
-
useMcastXmitReq
public boolean useMcastXmitReq() -
useMcastXmitReq
-
xmitFromRandomMember
public boolean xmitFromRandomMember() -
xmitFromRandomMember
-
discardDeliveredMsgs
public boolean discardDeliveredMsgs() -
discardDeliveredMsgs
-
logDiscardMessages
public boolean logDiscardMessages() -
logDiscardMessages
-
logNotFoundMessages
public boolean logNotFoundMessages() -
logNotFoundMessages
-
setResendLastSeqnoMaxTimes
-
getResendLastSeqnoMaxTimes
public int getResendLastSeqnoMaxTimes() -
setXmitFromRandomMember
-
setDiscardDeliveredMsgs
-
getMaxRebroadcastTimeout
public long getMaxRebroadcastTimeout() -
setMaxRebroadcastTimeout
-
getXmitInterval
public long getXmitInterval() -
setXmitInterval
-
getXmitTableNumRows
public int getXmitTableNumRows() -
setXmitTableNumRows
-
getXmitTableMsgsPerRow
public int getXmitTableMsgsPerRow() -
setXmitTableMsgsPerRow
-
getXmitTableResizeFactor
public double getXmitTableResizeFactor() -
setXmitTableResizeFactor
-
getXmitTableMaxCompactionTime
public long getXmitTableMaxCompactionTime() -
setXmitTableMaxCompactionTime
-
getBecomeServerQueueSize
public int getBecomeServerQueueSize() -
setBecomeServerQueueSize
-
getSuppressTimeNonMemberWarnings
public long getSuppressTimeNonMemberWarnings() -
setSuppressTimeNonMemberWarnings
-
getMaxXmitReqSize
public int getMaxXmitReqSize() -
setMaxXmitReqSize
-
sendsCanBlock
public boolean sendsCanBlock() -
sendsCanBlock
-
getNumMessagesSent
public int getNumMessagesSent() -
setNumMessagesSent
-
getNumMessagesReceived
public int getNumMessagesReceived() -
setNumMessagesReceived
-
isTrace
public boolean isTrace() -
isTrace
-
setLevel
Description copied from class:ProtocolSets the level of a logger. This method is used to dynamically change the logging level of a running system, e.g. via JMX. The appender of a level needs to exist. -
getBecomeServerQueueSizeActual
public int getBecomeServerQueueSizeActual() -
getWindow
Returns the receive window for sender; only used for testing. Do not use ! -
setTimer
Only used for unit tests, don't use ! -
getXmitTableUndeliveredMsgs
public int getXmitTableUndeliveredMsgs() -
getXmitTableMissingMessages
public int getXmitTableMissingMessages() -
getXmitTableCapacity
public long getXmitTableCapacity() -
getXmitTableNumCurrentRows
public int getXmitTableNumCurrentRows() -
getSizeOfAllMessages
public long getSizeOfAllMessages() -
getSizeOfAllMessagesInclHeaders
public long getSizeOfAllMessagesInclHeaders() -
getXmitTableNumCompactions
public int getXmitTableNumCompactions() -
getXmitTableNumMoves
public int getXmitTableNumMoves() -
getXmitTableNumResizes
public int getXmitTableNumResizes() -
getXmitTableNumPurges
public int getXmitTableNumPurges() -
printMessages
-
getCurrentSeqno
public long getCurrentSeqno() -
printStabilityMessages
-
printDigestHistory
-
compact
public void compact() -
dumpXmitTablesNumCurrentRows
-
resetStats
public void resetStats()- Overrides:
resetStatsin classProtocol
-
init
Description copied from class:ProtocolCalled after a protocol has been created and before the protocol is started. Attributes are already set. Other protocols are not yet connected and events cannot yet be sent. -
providedUpServices
Description copied from class:ProtocolList of events that are provided to layers above (they will be handled when sent down from above)- Overrides:
providedUpServicesin classProtocol
-
start
Description copied from class:ProtocolThis method is called on aJChannel.connect(String); starts work. Protocols are connected ready to receive events. Will be called from bottom to top. -
stop
public void stop()Description copied from class:ProtocolCalled on aJChannel.disconnect(); stops work (e.g. by closing multicast socket). Will be called from top to bottom. -
down
Callback. Called by superclass when event may be handled.Do not use
down_prot.down()in this method as the event is passed down by default by the superclass after this method returns ! -
down
Description copied from class:ProtocolA message is sent down the stack. Protocols may examine the message and do something (e.g. add a header) with it, before passing it down. -
up
Callback. Called by superclass when event may be handled.Do not use
passUpin this method as the event is passed up by default by the superclass after this method returns ! -
up
Description copied from class:ProtocolA single message was received. Protocols may examine the message and do something (e.g. add a header) with it before passing it up. -
up
Description copied from class:ProtocolSends up a multiple messages in aMessageBatch. The sender of the batch is always the same, and so is the destination (null == multicast messages). Messages in a batch can be OOB messages, regular messages, or mixed messages, although the transport itself will create initial MessageBatches that contain only either OOB or regular messages.The default processing below sends messages up the stack individually, based on a matching criteria (calling
Protocol.accept(Message)), and - if true - callsProtocol.up(org.jgroups.Event)for that message and removes the message. If the batch is not empty, it is passed up, or else it is dropped.Subclasses should check if there are any messages destined for them (e.g. using
MessageBatch.iterator(Predicate)), then possibly remove and process them and finally pass the batch up to the next protocol. Protocols can also modify messages in place, e.g. ENCRYPT could decrypt all encrypted messages in the batch, not remove them, and pass the batch up when done. -
handleProbe
Description copied from interface:DiagnosticsHandler.ProbeHandlerHandles a probe. For each key that is handled, the key and its result should be in the returned map.- Specified by:
handleProbein interfaceDiagnosticsHandler.ProbeHandler- Returns:
- A map of keys and values. A null return value is permissible.
-
supportedKeys
Description copied from interface:DiagnosticsHandler.ProbeHandlerReturns a list of supported keys- Specified by:
supportedKeysin interfaceDiagnosticsHandler.ProbeHandler
-
queueMessage
-
unknownMember
-
send
Adds the message to the sent_msgs table and then passes it down the stack. Change Bela Ban May 26 2002: we don't store a copy of the message, but a reference ! This saves us a lot of memory. However, this also means that a message should not be changed after storing it in the sent-table ! See protocols/DESIGN for details. Made seqno increment and adding to sent_msgs atomic, e.g. seqno won't get incremented if adding to sent_msgs fails e.g. due to an OOM (see https://issues.redhat.com/browse/JGRP-179). bela Jan 13 2006 -
resend
-
handleMessage
Finds the corresponding retransmit buffer and adds the message to it (according to seqno). Then removes as many messages as possible and passes them up the stack. Discards messages from non-members. -
handleMessageBatch
-
removeAndDeliver
protected void removeAndDeliver(Table<Message> buf, Address sender, boolean loopback, AsciiString cluster_name) Efficient way of checking whether another thread is already processing messages from sender. If that's the case, we return immediately and let the existing thread process our message (https://issues.redhat.com/browse/JGRP-829). Benefit: fewer threads blocked on the same lock, these threads can be returned to the thread pool -
handleXmitReq
protected void handleXmitReq(Address xmit_requester, SeqnoList missing_msgs, Address original_sender) Retransmits messsages first_seqno to last_seqno from original_sender from xmit_table to xmit_requester, called when XMIT_REQ is received.- Parameters:
xmit_requester- The sender of the XMIT_REQ, we have to send the requested copy of the message to this addressmissing_msgs- A list of seqnos that have to be retransmittedoriginal_sender- The member who originally sent the messsage. Guaranteed to be non-null
-
deliver
-
deliverBatch
-
flushBecomeServerQueue
protected void flushBecomeServerQueue()Flushes the queue. Done in a separate thread as we don't want to block theGMS.installView(org.jgroups.View,org.jgroups.util.Digest)method (called when a view is installed). -
cancelRebroadcasting
protected void cancelRebroadcasting() -
sendXmitRsp
Sends a message msg to the requester. We have to wrap the original message into a retransmit message, as we need to preserve the original message's properties, such as src, headers etc.- Parameters:
dest-msg-
-
handleXmitRsp
-
handleHighestSeqno
Compares the sender's highest seqno with my highest seqno: if the sender's is higher, ask sender for retransmission- Parameters:
sender- The senderseqno- The highest seqno sent by sender
-
msgFromXmitRsp
-
rebroadcastMessages
protected void rebroadcastMessages()Takes the argument highest_seqnos and compares it to the current digest. If the current digest has fewer messages, then send retransmit messages for the missing messages. Return when all missing messages have been received. If we're waiting for a missing message from P, and P crashes while waiting, we need to exclude P from the wait set. -
checkForRebroadcasts
protected void checkForRebroadcasts() -
isGreaterThanOrEqual
Returns true if all senders of the current digest have their seqnos >= the ones from other -
isCallerRunsHandler
-
adjustReceivers
Removes old members from xmit-table and adds new members to xmit-table (at seqnos hd=0, hr=0). This method is not called concurrently -
getDigest
Returns a message digest: for each member P the highest delivered and received seqno is added -
getDigest
-
setDigest
Creates a retransmit buffer for each sender in the digest according to the sender's seqno. If a buffer already exists, it resets it. -
mergeDigest
For all members of the digest, adjust the retransmit buffers in xmit_table. If no entry exists, create one with the initial seqno set to the seqno of the member in the digest. If the member already exists, and is not the local address, replace it with the new entry (https://issues.redhat.com/browse/JGRP-699) if the digest's seqno is greater than the seqno in the window. -
overwriteDigest
Overwrites existing entries, but does NOT remove entries not found in the digest- Parameters:
digest-
-
setDigest
Sets or merges the digest. If there is no entry for a given member in xmit_table, create a new buffer. Else skip the existing entry, unless it is a merge. In this case, skip the existing entry if its seqno is greater than or equal to the one in the digest, or reset the window and create a new one if not.- Parameters:
digest- The digestmerge- Whether to merge the new digest with our own, or not
-
createTable
-
stable
Garbage collect messages that have been seen by all members. Update sent_msgs: for the sender P in the digest which is equal to the local address, garbage collect all messages <= seqno at digest[P]. Update xmit_table: for each sender P in the digest and its highest seqno seen SEQ, garbage collect all delivered_msgs in the retransmit buffer corresponding to P which are <= seqno at digest[P]. -
retransmit
protected void retransmit(long first_seqno, long last_seqno, Address sender, boolean multicast_xmit_request) -
retransmit
-
reset
protected void reset() -
sizeOfAllMessages
-
startRetransmitTask
protected void startRetransmitTask() -
stopRetransmitTask
protected void stopRetransmitTask() -
triggerXmit
public void triggerXmit()
-