Class StreamingStateTransfer

java.lang.Object
org.jgroups.stack.Protocol
org.jgroups.protocols.pbcast.StreamingStateTransfer
All Implemented Interfaces:
Lifecycle, ProcessingQueue.Handler<Address>
Direct Known Subclasses:
STATE, STATE_SOCK

public abstract class StreamingStateTransfer extends Protocol implements ProcessingQueue.Handler<Address>
Base class for state transfer protocols which use streaming (or chunking) to transfer state between two members.

The major advantage of this approach is that transferring application state to a joining member of a group does not entail loading of the complete application state into memory. The application state, for example, might be located entirely on some form of disk based storage. The default STATE_TRANSFER protocol requires this state to be loaded entirely into memory before being transferred to a group member while the streaming state transfer protocols do not. Thus the streaming state transfer protocols are able to transfer application state that is very large (< 1Gb) without a likelihood of the such transfer resulting in OutOfMemoryException.

Note that prior to 3.0, there was only 1 streaming protocol: STATE. In 3.0 the functionality was split between STATE and STATE_SOCK, and common functionality moved up into StreamingStateTransfer.

Since:
3.0
See Also:
  • Field Details

    • buffer_size

      protected int buffer_size
    • max_pool

      protected int max_pool
    • pool_thread_keep_alive

      protected long pool_thread_keep_alive
    • num_state_reqs

      protected final LongAdder num_state_reqs
    • num_bytes_sent

      protected final LongAdder num_bytes_sent
    • avg_state_size

      protected double avg_state_size
    • state_provider

      protected volatile Address state_provider
    • members

      protected final List<Address> members
    • flushProtocolInStack

      protected volatile boolean flushProtocolInStack
    • thread_pool

      protected ThreadPoolExecutor thread_pool
      Thread pool (configured with max_pool and pool_thread_keep_alive) to run StreamingStateTransfer.StateGetter threads on
    • state_requesters

      protected final ProcessingQueue<Address> state_requesters
      List of members requesting state. Only a single state request is handled at any time
  • Constructor Details

    • StreamingStateTransfer

      public StreamingStateTransfer()
  • Method Details

    • getNumberOfStateRequests

      public long getNumberOfStateRequests()
    • getNumberOfStateBytesSent

      public long getNumberOfStateBytesSent()
    • getAverageStateSize

      public double getAverageStateSize()
    • getThreadPoolSize

      public int getThreadPoolSize()
    • getThreadPoolCompletedTasks

      public long getThreadPoolCompletedTasks()
    • requiredDownServices

      public List<Integer> requiredDownServices()
      Description copied from class: Protocol
      List of events that are required to be answered by some layer below
      Overrides:
      requiredDownServices in class Protocol
    • resetStats

      public void resetStats()
      Overrides:
      resetStats in class Protocol
    • init

      public void init() throws Exception
      Description copied from class: Protocol
      Called after a protocol has been created and before the protocol is started. Attributes are already set. Other protocols are not yet connected and events cannot yet be sent.
      Specified by:
      init in interface Lifecycle
      Overrides:
      init in class Protocol
      Throws:
      Exception - Thrown if protocol cannot be initialized successfully. This will cause the ProtocolStack to fail, so the the channel constructor will throw an exception
    • destroy

      public void destroy()
      Description copied from class: Protocol
      This method is called on a JChannel.close(). Does some cleanup; after the call, the VM will terminate
      Specified by:
      destroy in interface Lifecycle
      Overrides:
      destroy in class Protocol
    • start

      public void start() throws Exception
      Description copied from class: Protocol
      This method is called on a JChannel.connect(String); starts work. Protocols are connected ready to receive events. Will be called from bottom to top.
      Specified by:
      start in interface Lifecycle
      Overrides:
      start in class Protocol
      Throws:
      Exception - Thrown if protocol cannot be started successfully. This will cause the ProtocolStack to fail, so JChannel.connect(String) will throw an exception
    • stop

      public void stop()
      Description copied from class: Protocol
      Called on a JChannel.disconnect(); stops work (e.g. by closing multicast socket). Will be called from top to bottom.
      Specified by:
      stop in interface Lifecycle
      Overrides:
      stop in class Protocol
    • down

      public Object down(Event evt)
      Description copied from class: Protocol
      An event is to be sent down the stack. A protocol may want to examine its type and perform some action on it, depending on the event's type. If the event is a message MSG, then the protocol may need to add a header to it (or do nothing at all) before sending it down the stack using down_prot.down().
      Overrides:
      down in class Protocol
    • up

      public Object up(Event evt)
      Description copied from class: Protocol
      An event was received from the protocol below. Usually the current protocol will want to examine the event type and - depending on its type - perform some computation (e.g. removing headers from a MSG event type, or updating the internal membership list when receiving a VIEW_CHANGE event). Finally, the event is either a) discarded, or b) an event is sent down the stack using down_prot.down() or c) the event (or another event) is sent up the stack using up_prot.up().
      Overrides:
      up in class Protocol
    • up

      public Object up(Message msg)
      Description copied from class: Protocol
      A single message was received. Protocols may examine the message and do something (e.g. add a header) with it before passing it up.
      Overrides:
      up in class Protocol
    • up

      public void up(MessageBatch batch)
      Description copied from class: Protocol
      Sends up a multiple messages in a MessageBatch. The sender of the batch is always the same, and so is the destination (null == multicast messages). Messages in a batch can be OOB messages, regular messages, or mixed messages, although the transport itself will create initial MessageBatches that contain only either OOB or regular messages.

      The default processing below sends messages up the stack individually, based on a matching criteria (calling Protocol.accept(Message)), and - if true - calls Protocol.up(org.jgroups.Event) for that message and removes the message. If the batch is not empty, it is passed up, or else it is dropped.

      Subclasses should check if there are any messages destined for them (e.g. using MessageBatch.iterator(Predicate)), then possibly remove and process them and finally pass the batch up to the next protocol. Protocols can also modify messages in place, e.g. ENCRYPT could decrypt all encrypted messages in the batch, not remove them, and pass the batch up when done.

      Overrides:
      up in class Protocol
      Parameters:
      batch - The message batch
    • handle

      protected Object handle(StreamingStateTransfer.StateHeader hdr, Message msg)
    • isDigestNeeded

      protected boolean isDigestNeeded()
      When FLUSH is used we do not need to pass digests between members (see JGroups/doc/design/FLUSH.txt)
      Returns:
      true if use of digests is required, false otherwise
    • handleConfig

      protected void handleConfig(Map<String,Object> config)
    • handleStateChunk

      protected void handleStateChunk(Address sender, byte[] buffer, int offset, int length)
    • handleEOF

      protected void handleEOF(Address sender)
    • handleException

      protected void handleException(Throwable exception)
    • getStateFromApplication

      protected void getStateFromApplication(Address requester, OutputStream out, boolean use_separate_thread)
    • setStateInApplication

      protected void setStateInApplication(InputStream in, Object resource, Address provider)
    • closeBarrierAndSuspendStable

      public void closeBarrierAndSuspendStable()
    • openBarrierAndResumeStable

      public void openBarrierAndResumeStable()
    • openBarrier

      protected void openBarrier()
    • resumeStable

      protected void resumeStable()
    • sendEof

      protected void sendEof(Address requester)
    • sendException

      protected void sendException(Address requester, Throwable exception)
    • createThreadPool

      protected ThreadPoolExecutor createThreadPool()
    • determineCoordinator

      protected Address determineCoordinator()
    • handleViewChange

      protected void handleViewChange(View v)
    • handle

      public void handle(Address state_requester) throws Exception
      Specified by:
      handle in interface ProcessingQueue.Handler<Address>
      Throws:
      Exception
    • handleStateReq

      protected void handleStateReq(Address requester) throws Exception
      Throws:
      Exception
    • createStreamToRequester

      protected void createStreamToRequester(Address requester)
      Creates an OutputStream to the state requester to write the state
    • createStreamToProvider

      protected abstract Tuple<InputStream,Object> createStreamToProvider(Address provider, StreamingStateTransfer.StateHeader hdr) throws Exception
      Creates an InputStream to the state provider to read the state. Return the input stream and a handback object as a tuple. The handback object is handed back to the subclass when done, or in case of an error (e.g. to clean up resources)
      Throws:
      Exception
    • close

      protected void close(Object resource)
    • useAsyncStateDelivery

      protected boolean useAsyncStateDelivery()
    • modifyStateResponseHeader

      protected void modifyStateResponseHeader(StreamingStateTransfer.StateHeader hdr)
    • handleStateRsp

      protected void handleStateRsp(Address provider, StreamingStateTransfer.StateHeader hdr)
    • punchHoleFor

      protected void punchHoleFor(Address member)
    • closeHoleFor

      protected void closeHoleFor(Address member)