Class SEQUENCER

java.lang.Object
org.jgroups.stack.Protocol
org.jgroups.protocols.SEQUENCER
All Implemented Interfaces:
Lifecycle

public class SEQUENCER extends Protocol
Implementation of total order protocol using a sequencer. Consult SEQUENCER.txt for details
  • Field Details

    • coord

      protected volatile Address coord
    • view

      protected volatile View view
    • is_coord

      protected volatile boolean is_coord
    • seqno

      protected final AtomicLong seqno
    • forward_table

      protected final NavigableMap<Long,Message> forward_table
      Maintains messages forwarded to the coord which which no ack has been received yet. Needs to be sorted so we resend them in the right order
    • send_lock

      protected final Lock send_lock
    • send_cond

      protected final Condition send_cond
    • ack_mode

      protected volatile boolean ack_mode
      When ack_mode is set, we need to wait for an ack for each forwarded message until we can send the next one
    • flushing

      protected volatile boolean flushing
      Set when we block all sending threads to resend all messages from forward_table
    • running

      protected volatile boolean running
    • in_flight_sends

      protected final AtomicInteger in_flight_sends
      Keeps track of the threads sending messages
    • delivery_table

      protected final ConcurrentMap<Address,BoundedHashMap<Long,Long>> delivery_table
    • flusher

      protected volatile SEQUENCER.Flusher flusher
    • ack_promise

      protected final Promise<Long> ack_promise
      Used for each resent message to wait until the message has been received
    • msg_factory

      protected MessageFactory msg_factory
    • delivery_table_max_size

      protected int delivery_table_max_size
    • threshold

      protected int threshold
    • flush_forward_table

      protected boolean flush_forward_table
    • num_acks

      protected int num_acks
    • forwarded_msgs

      protected long forwarded_msgs
    • bcast_msgs

      protected long bcast_msgs
    • received_forwards

      protected long received_forwards
    • received_bcasts

      protected long received_bcasts
    • delivered_bcasts

      protected long delivered_bcasts
  • Constructor Details

    • SEQUENCER

      public SEQUENCER()
  • Method Details

    • isCoordinator

      public boolean isCoordinator()
    • getCoordinator

      public Address getCoordinator()
    • getForwardTableSize

      public int getForwardTableSize()
    • setThreshold

      public SEQUENCER setThreshold(int new_threshold)
    • setDeliveryTableMaxSize

      public SEQUENCER setDeliveryTableMaxSize(int size)
    • resetStats

      public void resetStats()
      Overrides:
      resetStats in class Protocol
    • init

      public void init() throws Exception
      Description copied from class: Protocol
      Called after a protocol has been created and before the protocol is started. Attributes are already set. Other protocols are not yet connected and events cannot yet be sent.
      Specified by:
      init in interface Lifecycle
      Overrides:
      init in class Protocol
      Throws:
      Exception - Thrown if protocol cannot be initialized successfully. This will cause the ProtocolStack to fail, so the the channel constructor will throw an exception
    • start

      public void start() throws Exception
      Description copied from class: Protocol
      This method is called on a JChannel.connect(String); starts work. Protocols are connected ready to receive events. Will be called from bottom to top.
      Specified by:
      start in interface Lifecycle
      Overrides:
      start in class Protocol
      Throws:
      Exception - Thrown if protocol cannot be started successfully. This will cause the ProtocolStack to fail, so JChannel.connect(String) will throw an exception
    • stop

      public void stop()
      Description copied from class: Protocol
      Called on a JChannel.disconnect(); stops work (e.g. by closing multicast socket). Will be called from top to bottom.
      Specified by:
      stop in interface Lifecycle
      Overrides:
      stop in class Protocol
    • down

      public Object down(Event evt)
      Description copied from class: Protocol
      An event is to be sent down the stack. A protocol may want to examine its type and perform some action on it, depending on the event's type. If the event is a message MSG, then the protocol may need to add a header to it (or do nothing at all) before sending it down the stack using down_prot.down().
      Overrides:
      down in class Protocol
    • down

      public Object down(Message msg)
      Description copied from class: Protocol
      A message is sent down the stack. Protocols may examine the message and do something (e.g. add a header) with it, before passing it down.
      Overrides:
      down in class Protocol
    • up

      public Object up(Event evt)
      Description copied from class: Protocol
      An event was received from the protocol below. Usually the current protocol will want to examine the event type and - depending on its type - perform some computation (e.g. removing headers from a MSG event type, or updating the internal membership list when receiving a VIEW_CHANGE event). Finally, the event is either a) discarded, or b) an event is sent down the stack using down_prot.down() or c) the event (or another event) is sent up the stack using up_prot.up().
      Overrides:
      up in class Protocol
    • up

      public Object up(Message msg)
      Description copied from class: Protocol
      A single message was received. Protocols may examine the message and do something (e.g. add a header) with it before passing it up.
      Overrides:
      up in class Protocol
    • up

      public void up(MessageBatch batch)
      Description copied from class: Protocol
      Sends up a multiple messages in a MessageBatch. The sender of the batch is always the same, and so is the destination (null == multicast messages). Messages in a batch can be OOB messages, regular messages, or mixed messages, although the transport itself will create initial MessageBatches that contain only either OOB or regular messages.

      The default processing below sends messages up the stack individually, based on a matching criteria (calling Protocol.accept(Message)), and - if true - calls Protocol.up(org.jgroups.Event) for that message and removes the message. If the batch is not empty, it is passed up, or else it is dropped.

      Subclasses should check if there are any messages destined for them (e.g. using MessageBatch.iterator(Predicate)), then possibly remove and process them and finally pass the batch up to the next protocol. Protocols can also modify messages in place, e.g. ENCRYPT could decrypt all encrypted messages in the batch, not remove them, and pass the batch up when done.

      Overrides:
      up in class Protocol
      Parameters:
      batch - The message batch
    • handleViewChange

      protected void handleViewChange(View v)
    • flush

      protected void flush(Address new_coord) throws InterruptedException
      Throws:
      InterruptedException
    • handleTmpView

      private void handleTmpView(View v)
    • flushMessagesInForwardTable

      protected void flushMessagesInForwardTable()
      Sends all messages currently in forward_table to the new coordinator (changing the dest field). This needs to be done, so the underlying reliable unicast protocol (e.g. UNICAST) adds these messages to its retransmission mechanism

      Note that we need to resend the messages in order of their seqnos! We also need to prevent other message from being inserted until we're done, that's why there's synchronization.

      Access to the forward_table doesn't need to be synchronized as there won't be any insertions during flushing (all down-threads are blocked)

    • forwardToCoord

      protected void forwardToCoord(long seqno, Message msg)
    • forward

      protected void forward(Message msg, long seqno, boolean flush)
    • broadcast

      protected void broadcast(Message msg, boolean copy, Address original_sender, long seqno, boolean resend)
    • unwrapAndDeliver

      protected void unwrapAndDeliver(Message msg, boolean flush_ack)
      Unmarshal the original message (in the payload) and then pass it up (unless already delivered)
    • deliver

      protected void deliver(Message msg, SEQUENCER.SequencerHeader hdr)
    • canDeliver

      protected boolean canDeliver(Address sender, long seqno)
      Checks if seqno has already been received from sender. This weeds out duplicates. Note that this method is never called concurrently for the same sender, as the sender in NAKACK will always be the coordinator.
    • block

      protected void block()
    • unblockAll

      protected void unblockAll()
    • startFlusher

      protected void startFlusher(Address new_coord)
    • stopFlusher

      protected void stopFlusher()