Class STABLE

java.lang.Object
org.jgroups.stack.Protocol
org.jgroups.protocols.pbcast.STABLE
All Implemented Interfaces:
Lifecycle

public class STABLE extends Protocol
Computes the broadcast messages that are stable; i.e., have been delivered by all members. Sends STABLE events down the stack when this is the case. This allows NAKACK{2,3} to garbage collect messages that have been seen by all members.

Works as follows: periodically (desired_avg_gossip) or when having received a number of bytes (max_bytes), every member sends its digest (highest seqno delivered, received) to the current coordinator

The coordinator updates a stability vector, which maintains the highest seqno delivered/receive for each member and initially contains no data, when such a message is received.

When messages from all members have been received, a stability message is mcast, which causes all members to send a STABLE event down the stack (triggering garbage collection in the NAKACK{2,3} layer).

  • Field Details

    • MAX_SUSPEND_TIME

      protected static final long MAX_SUSPEND_TIME
      See Also:
    • desired_avg_gossip

      protected long desired_avg_gossip
      Sends a STABLE gossip every 20 seconds on average. 0 disables gossiping of STABLE messages
    • max_bytes

      protected long max_bytes
      Total amount of bytes from incoming messages (default = 0 = disabled). When exceeded, a STABLE message will be broadcast andnum_bytes_received reset to 0 . If this is > 0, then ideally stability_delay should be set to a low number as well
    • num_stable_msgs_sent

      protected final LongAdder num_stable_msgs_sent
    • num_stable_msgs_received

      protected final LongAdder num_stable_msgs_received
    • num_stability_msgs_sent

      protected final LongAdder num_stability_msgs_sent
    • num_stability_msgs_received

      protected final LongAdder num_stability_msgs_received
    • view

      protected volatile View view
    • digest

      protected volatile MutableDigest digest
    • votes

      protected FixedSizeBitSet votes
      Keeps track of who we already heard from (STABLE_GOSSIP msgs). This is all 0's, and we set the sender when a STABLE message is received. When the bitset is all 1's (responses from all members), we send a STABILITY message
    • lock

      protected final Lock lock
    • stable_task_future

      protected Future<?> stable_task_future
    • stable_task_lock

      protected final Lock stable_task_lock
    • timer

      protected TimeScheduler timer
    • num_bytes_received

      protected long num_bytes_received
      The total number of bytes received from unicast and multicast messages
    • received

      protected final Lock received
    • suspended

      protected volatile boolean suspended
      When true, don't take part in garbage collection: neither send STABLE messages nor handle STABILITY messages
    • initialized

      protected boolean initialized
    • resume_task_future

      protected Future<?> resume_task_future
    • resume_task_mutex

      protected final Object resume_task_mutex
    • coordinator

      protected volatile Address coordinator
  • Constructor Details

    • STABLE

      public STABLE()
  • Method Details

    • getDesiredAverageGossip

      public long getDesiredAverageGossip()
    • setDesiredAverageGossip

      public STABLE setDesiredAverageGossip(long g)
    • getMaxBytes

      public long getMaxBytes()
    • setMaxBytes

      public STABLE setMaxBytes(long m)
    • getBytes

      public long getBytes()
    • getNumVotes

      public int getNumVotes()
    • getStableReceived

      public long getStableReceived()
    • getStableSent

      public long getStableSent()
    • getStabilityReceived

      public long getStabilityReceived()
    • getStabilitySent

      public long getStabilitySent()
    • getStableTaskRunning

      public boolean getStableTaskRunning()
    • gc

      public void gc()
    • printDigest

      public String printDigest()
    • printVotes

      public String printVotes()
    • resetStats

      public void resetStats()
      Overrides:
      resetStats in class Protocol
    • requiredDownServices

      public List<Integer> requiredDownServices()
      Description copied from class: Protocol
      List of events that are required to be answered by some layer below
      Overrides:
      requiredDownServices in class Protocol
    • suspend

      protected void suspend(long timeout)
    • resume

      protected void resume()
    • init

      public void init() throws Exception
      Description copied from class: Protocol
      Called after a protocol has been created and before the protocol is started. Attributes are already set. Other protocols are not yet connected and events cannot yet be sent.
      Specified by:
      init in interface Lifecycle
      Overrides:
      init in class Protocol
      Throws:
      Exception - Thrown if protocol cannot be initialized successfully. This will cause the ProtocolStack to fail, so the the channel constructor will throw an exception
    • start

      public void start() throws Exception
      Description copied from class: Protocol
      This method is called on a JChannel.connect(String); starts work. Protocols are connected ready to receive events. Will be called from bottom to top.
      Specified by:
      start in interface Lifecycle
      Overrides:
      start in class Protocol
      Throws:
      Exception - Thrown if protocol cannot be started successfully. This will cause the ProtocolStack to fail, so JChannel.connect(String) will throw an exception
    • stop

      public void stop()
      Description copied from class: Protocol
      Called on a JChannel.disconnect(); stops work (e.g. by closing multicast socket). Will be called from top to bottom.
      Specified by:
      stop in interface Lifecycle
      Overrides:
      stop in class Protocol
    • up

      public Object up(Event evt)
      Description copied from class: Protocol
      An event was received from the protocol below. Usually the current protocol will want to examine the event type and - depending on its type - perform some computation (e.g. removing headers from a MSG event type, or updating the internal membership list when receiving a VIEW_CHANGE event). Finally, the event is either a) discarded, or b) an event is sent down the stack using down_prot.down() or c) the event (or another event) is sent up the stack using up_prot.up().
      Overrides:
      up in class Protocol
    • up

      public Object up(Message msg)
      Description copied from class: Protocol
      A single message was received. Protocols may examine the message and do something (e.g. add a header) with it before passing it up.
      Overrides:
      up in class Protocol
    • up

      public void up(MessageBatch batch)
      Description copied from class: Protocol
      Sends up a multiple messages in a MessageBatch. The sender of the batch is always the same, and so is the destination (null == multicast messages). Messages in a batch can be OOB messages, regular messages, or mixed messages, although the transport itself will create initial MessageBatches that contain only either OOB or regular messages.

      The default processing below sends messages up the stack individually, based on a matching criteria (calling Protocol.accept(Message)), and - if true - calls Protocol.up(org.jgroups.Event) for that message and removes the message. If the batch is not empty, it is passed up, or else it is dropped.

      Subclasses should check if there are any messages destined for them (e.g. using MessageBatch.iterator(Predicate)), then possibly remove and process them and finally pass the batch up to the next protocol. Protocols can also modify messages in place, e.g. ENCRYPT could decrypt all encrypted messages in the batch, not remove them, and pass the batch up when done.

      Overrides:
      up in class Protocol
      Parameters:
      batch - The message batch
    • down

      public Object down(Message msg)
      Description copied from class: Protocol
      A message is sent down the stack. Protocols may examine the message and do something (e.g. add a header) with it, before passing it down.
      Overrides:
      down in class Protocol
    • down

      public Object down(Event evt)
      Description copied from class: Protocol
      An event is to be sent down the stack. A protocol may want to examine its type and perform some action on it, depending on the event's type. If the event is a message MSG, then the protocol may need to add a header to it (or do nothing at all) before sending it down the stack using down_prot.down().
      Overrides:
      down in class Protocol
    • handle

      protected Object handle(STABLE.StableHeader hdr, Address sender, Digest digest)
    • handleRegularMessage

      protected void handleRegularMessage(Message msg)
    • maxBytesExceeded

      protected boolean maxBytesExceeded(int len)
    • handleViewChange

      protected void handleViewChange(View v)
    • updateLocalDigest

      protected void updateLocalDigest(Digest d, Address sender)
      Update my own digest from a digest received by somebody else. Returns whether the update was successful. Needs to be called with a lock on digest
    • resetDigest

      protected void resetDigest()
    • addVote

      protected boolean addVote(int rank)
      Adds mbr to votes and returns true if we have all the votes, otherwise false.
      Parameters:
      rank -
    • allVotesReceived

      protected static boolean allVotesReceived(FixedSizeBitSet votes)
      Votes is already locked and guaranteed to be non-null
    • getRank

      protected static int getRank(Address member, View v)
    • startStableTask

      protected void startStableTask()
    • stopStableTask

      protected void stopStableTask()
    • startResumeTask

      protected void startResumeTask(long max_suspend_time)
    • stopResumeTask

      protected void stopResumeTask()
    • handleStableMessage

      protected void handleStableMessage(Digest d, Address sender, ViewId view_id)
      Digest d contains (a) the highest seqnos deliverable for each sender and (b) the highest seqnos seen for each member. (Difference: with 1,2,4,5, the highest seqno seen is 5, whereas the highest seqno deliverable is 2). The minimum of all highest seqnos deliverable will be taken to send a stability message, which results in garbage collection of messages lower than the ones in the stability vector. The maximum of all seqnos will be taken to trigger possible retransmission of last missing seqno (see DESIGN for details).
    • resetNumBytes

      protected void resetNumBytes()
    • handleStabilityMessage

      protected void handleStabilityMessage(Digest stable_digest, Address sender, ViewId view_id)
    • sendStableMessage

      protected void sendStableMessage(boolean send_in_background)
      Broadcasts a STABLE message of the current digest to all members (or the coordinator only). The message contains the highest seqno delivered and received for all members. The seqnos are retrieved from the NAKACK layer below.
    • sendStabilityMessage

      protected void sendStabilityMessage(Digest d, ViewId view_id)
      Sends a stability message to all members except self.
      Parameters:
      d - A copy of the stability digest, so we don't need to copy it again
    • getDigest

      protected Digest getDigest()
    • printDigest

      protected String printDigest(Digest digest)