Class RingBufferBundlerLockless

java.lang.Object
org.jgroups.protocols.BaseBundler
org.jgroups.protocols.RingBufferBundlerLockless
All Implemented Interfaces:
Bundler

public class RingBufferBundlerLockless extends BaseBundler
Bundler which doesn't use locks but relies on CAS. There is 1 reader thread which gets unparked by (exactly one) writer when the max size has been exceeded, or no other threads are sending messages.
Since:
4.0
  • Field Details

    • buf

      protected Message[] buf
    • read_index

      protected int read_index
    • write_index

      protected volatile int write_index
    • tmp_write_index

      protected final AtomicInteger tmp_write_index
    • write_permits

      protected final AtomicInteger write_permits
    • size

      protected final AtomicInteger size
    • num_threads

      protected final AtomicInteger num_threads
    • accumulated_bytes

      protected final AtomicLong accumulated_bytes
    • unparking

      protected final AtomicBoolean unparking
    • bundler_thread

      protected Runner bundler_thread
    • THREAD_NAME

      protected static final String THREAD_NAME
      See Also:
    • run_function

      protected final Runnable run_function
  • Constructor Details

    • RingBufferBundlerLockless

      public RingBufferBundlerLockless()
    • RingBufferBundlerLockless

      public RingBufferBundlerLockless(int capacity)
  • Method Details

    • readIndex

      public int readIndex()
    • writeIndex

      public int writeIndex()
    • size

      public int size()
      Description copied from class: BaseBundler
      Returns the total number of messages in the hashmap
      Specified by:
      size in interface Bundler
      Overrides:
      size in class BaseBundler
    • getQueueSize

      public int getQueueSize()
      Description copied from interface: Bundler
      If the bundler has a queue and it should be managed by a queuing discipline (like Random Early Detection), then return the number of elements in the queue, else -1. In the latter case, the queue won't be managed.

      This method needs to be fast as it might get called on every message to be sent.

      Specified by:
      getQueueSize in interface Bundler
      Overrides:
      getQueueSize in class BaseBundler
    • init

      public void init(TP transport)
      Description copied from interface: Bundler
      Called after creation of the bundler
      Specified by:
      init in interface Bundler
      Overrides:
      init in class BaseBundler
      Parameters:
      transport - the transport, for further reference
    • reset

      public void reset()
    • start

      public void start()
      Description copied from interface: Bundler
      Called after Bundler.init(TP)
      Specified by:
      start in interface Bundler
      Overrides:
      start in class BaseBundler
    • stop

      public void stop()
      Specified by:
      stop in interface Bundler
      Overrides:
      stop in class BaseBundler
    • renameThread

      public void renameThread()
    • send

      public void send(Message msg) throws Exception
      Specified by:
      send in interface Bundler
      Overrides:
      send in class BaseBundler
      Throws:
      Exception
    • getWriteIndex

      protected int getWriteIndex()
    • getPermitToWrite

      protected int getPermitToWrite()
    • advanceWriteIndex

      protected int advanceWriteIndex()
    • readMessages

      protected void readMessages()
    • sendBundledMessages

      protected int sendBundledMessages(Message[] buf, int read_index, int available_msgs)
      Read and send messages in range [read-index .. read-index+available_msgs-1]
    • toString

      public String toString()
      Overrides:
      toString in class Object
    • _readMessages

      public int _readMessages()
    • marshalMessagesToSameDestination

      protected int marshalMessagesToSameDestination(Address dest, Message[] buf, int start_index, int available_msgs, int max_bundle_size) throws Exception
      Throws:
      Exception
    • increment

      protected final int increment(int index)
    • index

      protected final int index(int idx)
    • assertPositive

      protected static int assertPositive(int value, String message)