Package org.jgroups.protocols
Class UNICAST3
java.lang.Object
org.jgroups.stack.Protocol
org.jgroups.protocols.UNICAST3
- All Implemented Interfaces:
Lifecycle,AgeOutCache.Handler<Address>
Reliable unicast protocol using a combination of positive and negative acks. See docs/design/UNICAST3.txt for details.
- Since:
- 3.3
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionprotected classprotected final classprotected classRetransmitter task which periodically (every xmit_interval ms): If any of the receiver windows have the ack flag set, clears the flag and sends an ack for the highest delivered seqno to the sender Checks all receiver windows for missing messages and asks senders for retransmission For all sender windows, checks if highest acked (HA) < highest sent (HS).protected final classprotected static enum -
Field Summary
FieldsModifier and TypeFieldDescriptionprotected intprotected final AverageMinMaxprotected static final BiConsumer<MessageBatch, Message> protected AgeOutCache<Address> protected longprotected longprotected static final longprotected static final longprotected static final Messageprotected booleanprotected shortprotected ExpiryCache<Address> Keep track of when a SEND_FIRST_SEQNO message was sent to a given senderprotected booleanprotected booleanprotected intprotected longprotected intprotected final MessageCacheprotected final LongAdderprotected final LongAdderprotected final LongAdderprotected final LongAdderprotected final LongAdderprotected final LongAdderprotected final ConcurrentMap<Address, UNICAST3.ReceiverEntry> protected final ReentrantLockprotected booleanprotected booleanprotected final ConcurrentMap<Address, UNICAST3.SenderEntry> protected booleanprotected longprotected TimeServiceprotected TimeSchedulerprotected final AtomicIntegerprotected longprotected final LongAdderprotected final LongAdderprotected final LongAdderprotected longprotected intprotected intprotected doubleprotected Future<?> RetransmitTask running every xmit_interval msUsed by the retransmit task to keep the last retransmitted seqno per member (applicable only for received messages (ReceiverEntry)): https://issues.redhat.com/browse/JGRP-1539protected booleanFields inherited from class org.jgroups.stack.Protocol
after_creation_hook, down_prot, ergonomics, id, local_addr, log, policies, stack, stats, up_prot -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionprotected static intaccumulate(ToIntFunction<Table<Message>> func, Collection<? extends UNICAST3.Entry>... entries) protected voidaddMessage(UNICAST3.ReceiverEntry entry, Address sender, long seqno, Message msg) protected voidaddQueuedMessages(Address sender, UNICAST3.ReceiverEntry entry, Collection<Message> queued_msgs) voidcloseConnection(Address mbr) Removes and resets from connection table (which is already locked).voidvoidvoidprotected static intcompare(int ts1, int ts2) Compares 2 timestamps, handles numeric overflowprotected UNICAST3.ReceiverEntrycreateReceiverEntry(Address sender, long seqno, short conn_id, Address dest) createTable(long seqno) protected voiddeliverBatch(MessageBatch batch) protected voiddeliverMessage(Message msg, Address sender, long seqno) An event is to be sent down the stack.A message is sent down the stack.voidCalled by AgeOutCache, to removed expired connectionsintintlonglonglongintprotected shortlonglongintlonglongDeprecated.longDeprecated.intintintThe number of messages in all Entry.sent_msgs tables (haven't received an ACK yet)longprotected UNICAST3.ReceiverEntrygetReceiverEntry(Address sender, long seqno, boolean first, short conn_id, Address dest) protected UNICAST3.SenderEntrygetSenderEntry(Address dst) getSendWindow(Address target) Used for testing only!longprotected longintlongintlongintintintintintintintdoubleintprotected voidhandleAckReceived(Address sender, long seqno, short conn_id, int timestamp) Add the ACK to hashtable.sender.sent_msgsprotected voidhandleBatchFromSelf(MessageBatch batch, UNICAST3.Entry entry) protected voidhandleBatchReceived(UNICAST3.ReceiverEntry entry, Address sender, List<LongTuple<Message>> msgs, boolean oob, Address original_dest) protected voidhandleDataReceived(Address sender, long seqno, short conn_id, boolean first, Message msg) Check whether the hashtable contains an entry e forsender(create if not).protected voidhandleDataReceivedFromSelf(Address sender, long seqno, Message msg) Called when the sender of a message is the local member.protected voidhandleResendingOfFirstMessage(Address sender, int timestamp) We need to resend the first message with our conn_idprotected voidhandleUpEvent(Address sender, Message msg, UnicastHeader3 hdr) protected voidhandleXmitRequest(Address sender, SeqnoList missing) booleanhasSendConnectionTo(Address dest) Used for testing onlyvoidinit()Called after a protocol has been created and before the protocol is started.protected static booleanprotected booleanprotected booleanisLocalSiteMaster(Address dest) booleanbooleanbooleanlogNotFoundMsgs(boolean l) booleanloopback()loopback(boolean b) protected StringprintMessageList(List<LongTuple<Message>> list) voidThis method is public only so it can be invoked by unit testing, but should not otherwise be used !protected voidremoveAndDeliver(Table<Message> win, Address sender) Try to remove as many messages as possible from the table as pass them up.intremoveConnections(boolean remove_send_connections, boolean remove_receive_connections) Removes send- and/or receive-connections whose state is not OPEN (CLOSING or CLOSED).intvoidvoidremoveSendConnection(Predicate<Address> pred) voidprotected voidvoidprotected voidretransmit(Message msg) Called by the sender to resend messages for which no ACK has been received yetprotected voidretransmit(SeqnoList missing, Address sender, Address real_dest) Sends a retransmit request to the given senderprotected voidvoidvoidprotected voidsendRequestForFirstSeqno(Address dest, Address original_dest) booleansendsCanBlock(boolean s) setAckThreshold(int a) setConnCloseTimeout(long c) setConnExpiryTimeout(long c) <T extends Protocol>
TSets the level of a logger.setMaxRetransmitTime(long max_retransmit_time) setMaxXmitReqSize(int m) setSyncMinInterval(long s) setXmitInterval(long i) setXmitsEnabled(boolean b) setXmitTableMaxCompactionTime(long x) setXmitTableMsgsPerRow(int n) setXmitTableNumRows(int n) setXmitTableResizeFactor(double x) voidstart()This method is called on aJChannel.connect(String); starts work.voidvoidstop()Called on aJChannel.disconnect(); stops work (e.g.voidvoidA single message was received.voidup(MessageBatch batch) Sends up a multiple messages in aMessageBatch.protected voidupdate(UNICAST3.Entry entry, int num_received) Methods inherited from class org.jgroups.stack.Protocol
accept, addPolicy, addr, addr, afterCreationHook, destroy, down, enableStats, getAddress, getComponents, getDownProtocol, getDownServices, getId, getIdsAbove, getLevel, getLog, getName, getPolicies, getProtocolStack, getSocketFactory, getThreadFactory, getTransport, getUpProtocol, getUpServices, getValue, isErgonomics, level, parse, policies, providedDownServices, providedUpServices, removePolicy, requiredDownServices, requiredUpServices, resetStatistics, setAddress, setDownProtocol, setErgonomics, setId, setPolicies, setProtocolStack, setSocketFactory, setUpProtocol, setValue, statsEnabled, toString, up
-
Field Details
-
DEFAULT_FIRST_SEQNO
protected static final long DEFAULT_FIRST_SEQNO- See Also:
-
DEFAULT_XMIT_INTERVAL
protected static final long DEFAULT_XMIT_INTERVAL- See Also:
-
conn_expiry_timeout
protected long conn_expiry_timeout -
conn_close_timeout
protected long conn_close_timeout -
xmit_table_num_rows
protected int xmit_table_num_rows -
xmit_table_msgs_per_row
protected int xmit_table_msgs_per_row -
xmit_table_resize_factor
protected double xmit_table_resize_factor -
xmit_table_max_compaction_time
protected long xmit_table_max_compaction_time -
max_retransmit_time
protected long max_retransmit_time -
xmit_interval
protected long xmit_interval -
xmits_enabled
protected boolean xmits_enabled -
log_not_found_msgs
protected boolean log_not_found_msgs -
ack_threshold
protected int ack_threshold -
sync_min_interval
protected long sync_min_interval -
max_xmit_req_size
protected int max_xmit_req_size -
max_batch_size
protected int max_batch_size -
loopback
protected boolean loopback -
num_msgs_sent
-
num_msgs_received
-
num_acks_sent
-
num_acks_received
-
num_xmits
-
xmit_reqs_received
-
xmit_reqs_sent
-
xmit_rsps_sent
-
avg_delivery_batch_size
-
sends_can_block
protected boolean sends_can_block -
is_trace
protected boolean is_trace -
relay_present
protected boolean relay_present -
send_table
-
recv_table
-
recv_table_lock
-
xmit_task_map
Used by the retransmit task to keep the last retransmitted seqno per member (applicable only for received messages (ReceiverEntry)): https://issues.redhat.com/browse/JGRP-1539 -
xmit_task
RetransmitTask running every xmit_interval ms -
members
-
timer
-
running
protected volatile boolean running -
last_conn_id
protected short last_conn_id -
cache
-
time_service
-
timestamper
-
last_sync_sent
Keep track of when a SEND_FIRST_SEQNO message was sent to a given sender -
num_loopbacks
-
msg_cache
-
DUMMY_OOB_MSG
-
drop_oob_and_dont_loopback_msgs_filter
-
dont_loopback_filter
-
BATCH_ACCUMULATOR
-
-
Constructor Details
-
UNICAST3
public UNICAST3()
-
-
Method Details
-
getNumLoopbacks
public long getNumLoopbacks() -
getSendWindow
Used for testing only! -
getNumSendConnections
public int getNumSendConnections() -
getNumReceiveConnections
public int getNumReceiveConnections() -
getNumConnections
public int getNumConnections() -
getTimestamper
public int getTimestamper() -
getAckThreshold
public int getAckThreshold() -
setAckThreshold
-
setLevel
Description copied from class:ProtocolSets the level of a logger. This method is used to dynamically change the logging level of a running system, e.g. via JMX. The appender of a level needs to exist. -
getXmitInterval
public long getXmitInterval() -
setXmitInterval
-
isXmitsEnabled
public boolean isXmitsEnabled() -
setXmitsEnabled
-
getXmitTableNumRows
public int getXmitTableNumRows() -
setXmitTableNumRows
-
getXmitTableMsgsPerRow
public int getXmitTableMsgsPerRow() -
setXmitTableMsgsPerRow
-
getConnExpiryTimeout
public long getConnExpiryTimeout() -
setConnExpiryTimeout
-
getConnCloseTimeout
public long getConnCloseTimeout() -
setConnCloseTimeout
-
getXmitTableResizeFactor
public double getXmitTableResizeFactor() -
setXmitTableResizeFactor
-
getXmitTableMaxCompactionTime
public long getXmitTableMaxCompactionTime() -
setXmitTableMaxCompactionTime
-
logNotFoundMsgs
public boolean logNotFoundMsgs() -
logNotFoundMsgs
-
getSyncMinInterval
public long getSyncMinInterval() -
setSyncMinInterval
-
getMaxXmitReqSize
public int getMaxXmitReqSize() -
setMaxXmitReqSize
-
sendsCanBlock
public boolean sendsCanBlock() -
sendsCanBlock
-
loopback
public boolean loopback() -
loopback
-
printConnections
-
getNumMessagesSent
Deprecated.Don't remove! https://issues.redhat.com/browse/JGRP-2814 -
getNumMessagesReceived
Deprecated.Don't remove! https://issues.redhat.com/browse/JGRP-2814 -
getNumAcksSent
public long getNumAcksSent() -
getNumAcksReceived
public long getNumAcksReceived() -
getNumXmits
public long getNumXmits() -
getMaxRetransmitTime
public long getMaxRetransmitTime() -
setMaxRetransmitTime
-
isXmitTaskRunning
public boolean isXmitTaskRunning() -
getAgeOutCacheSize
public int getAgeOutCacheSize() -
printAgeOutCache
-
getAgeOutCache
-
hasSendConnectionTo
Used for testing only -
getNumUnackedMessages
public int getNumUnackedMessages()The number of messages in all Entry.sent_msgs tables (haven't received an ACK yet) -
getXmitTableUndeliveredMessages
public int getXmitTableUndeliveredMessages() -
getXmitTableMissingMessages
public int getXmitTableMissingMessages() -
getXmitTableDeliverableMessages
public int getXmitTableDeliverableMessages() -
getXmitTableNumCompactions
public int getXmitTableNumCompactions() -
getXmitTableNumMoves
public int getXmitTableNumMoves() -
getXmitTableNumResizes
public int getXmitTableNumResizes() -
getXmitTableNumPurges
public int getXmitTableNumPurges() -
printReceiveWindowMessages
-
printSendWindowMessages
-
resetStats
public void resetStats()- Overrides:
resetStatsin classProtocol
-
init
Description copied from class:ProtocolCalled after a protocol has been created and before the protocol is started. Attributes are already set. Other protocols are not yet connected and events cannot yet be sent. -
start
Description copied from class:ProtocolThis method is called on aJChannel.connect(String); starts work. Protocols are connected ready to receive events. Will be called from bottom to top. -
stop
public void stop()Description copied from class:ProtocolCalled on aJChannel.disconnect(); stops work (e.g. by closing multicast socket). Will be called from top to bottom. -
up
Description copied from class:ProtocolA single message was received. Protocols may examine the message and do something (e.g. add a header) with it before passing it up. -
handleUpEvent
-
up
Description copied from class:ProtocolSends up a multiple messages in aMessageBatch. The sender of the batch is always the same, and so is the destination (null == multicast messages). Messages in a batch can be OOB messages, regular messages, or mixed messages, although the transport itself will create initial MessageBatches that contain only either OOB or regular messages.The default processing below sends messages up the stack individually, based on a matching criteria (calling
Protocol.accept(Message)), and - if true - callsProtocol.up(org.jgroups.Event)for that message and removes the message. If the batch is not empty, it is passed up, or else it is dropped.Subclasses should check if there are any messages destined for them (e.g. using
MessageBatch.iterator(Predicate)), then possibly remove and process them and finally pass the batch up to the next protocol. Protocols can also modify messages in place, e.g. ENCRYPT could decrypt all encrypted messages in the batch, not remove them, and pass the batch up when done. -
handleBatchFromSelf
-
down
Description copied from class:ProtocolAn event is to be sent down the stack. A protocol may want to examine its type and perform some action on it, depending on the event's type. If the event is a message MSG, then the protocol may need to add a header to it (or do nothing at all) before sending it down the stack usingdown_prot.down(). -
down
Description copied from class:ProtocolA message is sent down the stack. Protocols may examine the message and do something (e.g. add a header) with it, before passing it down. -
isLocalSiteMaster
-
isLocal
-
closeConnection
Removes and resets from connection table (which is already locked). Returns true if member was found, otherwise false. This method is public only so it can be invoked by unit testing, but should not be used ! -
closeSendConnection
-
closeReceiveConnection
-
removeSendConnection
-
removeSendConnection
-
removeReceiveConnection
-
removeAllConnections
public void removeAllConnections()This method is public only so it can be invoked by unit testing, but should not otherwise be used ! -
retransmit
Sends a retransmit request to the given sender -
retransmit
Called by the sender to resend messages for which no ACK has been received yet -
expired
Called by AgeOutCache, to removed expired connections- Specified by:
expiredin interfaceAgeOutCache.Handler<Address>- Parameters:
key-
-
handleDataReceived
protected void handleDataReceived(Address sender, long seqno, short conn_id, boolean first, Message msg) Check whether the hashtable contains an entry e forsender(create if not). If e.received_msgs is null andfirstis true: create a new AckReceiverWindow(seqno) and add message. Set e.received_msgs to the new window. Else just add the message. -
addMessage
-
addQueuedMessages
protected void addQueuedMessages(Address sender, UNICAST3.ReceiverEntry entry, Collection<Message> queued_msgs) -
handleDataReceivedFromSelf
Called when the sender of a message is the local member. In this case, we don't need to add the message to the table as the sender already did that -
handleBatchReceived
-
removeAndDeliver
Try to remove as many messages as possible from the table as pass them up. Prevents concurrent passing up of messages by different threads (https://issues.redhat.com/browse/JGRP-198); lots of threads can come up to this point concurrently, but only 1 is allowed to pass at a time. We *can* deliver messages from *different* senders concurrently, e.g. reception of P1, Q1, P2, Q2 can result in delivery of P1, Q1, Q2, P2: FIFO (implemented by UNICAST) says messages need to be delivered in the order in which they were sent -
printMessageList
-
getReceiverEntry
protected UNICAST3.ReceiverEntry getReceiverEntry(Address sender, long seqno, boolean first, short conn_id, Address dest) -
getSenderEntry
-
createReceiverEntry
protected UNICAST3.ReceiverEntry createReceiverEntry(Address sender, long seqno, short conn_id, Address dest) -
createTable
-
handleAckReceived
Add the ACK to hashtable.sender.sent_msgs -
handleResendingOfFirstMessage
We need to resend the first message with our conn_id- Parameters:
sender-
-
handleXmitRequest
-
resend
-
deliverMessage
-
deliverBatch
-
getTimestamp
protected long getTimestamp() -
startRetransmitTask
public void startRetransmitTask() -
stopRetransmitTask
public void stopRetransmitTask() -
isCallerRunsHandler
-
sendAck
-
getNewConnectionId
protected short getNewConnectionId() -
sendRequestForFirstSeqno
-
sendClose
-
closeIdleConnections
public void closeIdleConnections() -
removeExpiredConnections
public int removeExpiredConnections() -
removeConnections
public int removeConnections(boolean remove_send_connections, boolean remove_receive_connections) Removes send- and/or receive-connections whose state is not OPEN (CLOSING or CLOSED).- Parameters:
remove_send_connections- If true, send connections whose state is !OPEN are destroyed and removedremove_receive_connections- If true, receive connections with state !OPEN are destroyed and removed- Returns:
- The number of connections which were removed
-
triggerXmit
public void triggerXmit() -
sendPendingAcks
public void sendPendingAcks() -
update
-
compare
protected static int compare(int ts1, int ts2) Compares 2 timestamps, handles numeric overflow -
accumulate
@SafeVarargs protected static int accumulate(ToIntFunction<Table<Message>> func, Collection<? extends UNICAST3.Entry>... entries)
-