Class RingBufferSeqno<T>

java.lang.Object
org.jgroups.util.RingBufferSeqno<T>
All Implemented Interfaces:
Iterable<T>

public class RingBufferSeqno<T> extends Object implements Iterable<T>
Ring buffer, implemented with a circular array. Designed for multiple producers (add()) and a single consumer (remove()). Note that the remove() methods are not reentrant, so multiple consumers won't work correctly !

The buffer has a fixed capacity, and a low (LOW), highest delivered (HD) and highest received (HR) seqno.

An element with a sequence number (seqno) > low + capacity or < HD will get discarded.

Elements are added after HD, but cannot wrap around beyond LOW. Addition doesn't need to be sequential, e.g. adding 5, 6, 8 is OK (as long as a seqno doesn't pass LOW). Addition may advance HR. Addition of elements that are already present is a no-op, and will not set the element again.

Removal of elements starts at HD+1; any non-null element is removed and HD is advanced accordingly. If a remove method is called with nullify=true, then removed elements are nulled and LOW is advanced as well (LOW=HD). Note that all removals in a given RingBuffer must either have nullify=true, or all must be false. It is not permitted to do some removals with nullify=true, and others with nullify=false, in the same RingBuffer.

The stable(long) method is called periodically; it nulls all elements between LOW and HD and advances LOW to HD.

The design of RingBuffer is discussed in doc/design/RingBufferSeqno.txt.

Since:
3.1
  • Field Details

    • buf

      protected final T[] buf
      Atomic ref array so that elements can be checked for null and set atomically. Should always be sized to a power of 2.
    • low

      protected long low
      The lowest seqno. Moved forward by stable()
    • hd

      protected long hd
      The highest delivered seqno. Moved forward by a remove method. The next message to be removed is hd +1
    • hr

      protected long hr
      The highest received seqno. Moved forward by add(). The next message to be added is hr +1
    • offset

      protected final long offset
    • lock

      protected final Lock lock
      Lock for adders to block on when the buffer is full
    • buffer_full

      protected final Condition buffer_full
    • running

      protected boolean running
    • processing

      protected final AtomicBoolean processing
  • Constructor Details

    • RingBufferSeqno

      public RingBufferSeqno(int capacity, long offset)
      Creates a RingBuffer
      Parameters:
      capacity - The number of elements the ring buffer's array should hold.
      offset - The offset. The first element to be added has to be offset +1.
  • Method Details

    • getLow

      public long getLow()
    • getHighestDelivered

      public long getHighestDelivered()
    • setHighestDelivered

      public void setHighestDelivered(long hd)
    • getHighestReceived

      public long getHighestReceived()
    • getDigest

      public long[] getDigest()
    • getProcessing

      public AtomicBoolean getProcessing()
    • add

      public boolean add(long seqno, T element)
    • add

      public boolean add(long seqno, T element, boolean block)
      Adds a new element to the buffer
      Parameters:
      seqno - The seqno of the element
      element - The element
      block - If true, add() will block when the buffer is full until there is space. Else, add() will return immediately, either successfully or unsuccessfully (if the buffer is full)
      Returns:
      True if the element was added, false otherwise.
    • remove

      public T remove(boolean nullify)
      Removes the next element (at hd +1). Note that this method is not concurrent, as RingBuffer can only have 1 remover thread active at any time !
      Parameters:
      nullify - Nulls the element in the array if true
      Returns:
      T if there was a non-null element at position hd +1, or null if the element at hd+1 was null, or hd+1 > hr.
    • remove

      public T remove()
      Removes the next element (at hd +1). Note that this method is not concurrent, as RingBuffer can only have 1 remover thread active at any time !
      Returns:
      T if there was a non-null element at position hd +1, or null if the element at hd+1 was null.
    • removeMany

      public List<T> removeMany(boolean nullify, int max_results)
    • removeMany

      public List<T> removeMany(AtomicBoolean processing, boolean nullify, int max_results)
    • get

      public T get(long seqno)
    • _get

      public T _get(long seqno)
      Only used for testing !!
    • get

      public List<T> get(long from, long to)
      Returns a list of messages in the range [from .. to], including from and to
      Parameters:
      from -
      to -
      Returns:
      A list of messages, or null if none in range [from .. to] was found
    • stable

      public void stable(long seqno)
      Nulls elements between low and seqno and forwards low
    • destroy

      public void destroy()
    • capacity

      public final int capacity()
    • size

      public int size()
    • missing

      public int missing()
    • spaceUsed

      public int spaceUsed()
    • saturation

      public double saturation()
    • getMissing

      public SeqnoList getMissing()
    • iterator

      public Iterator<T> iterator()
      Returns an iterator over the elements of the ring buffer in the range [HD+1 .. HR]
      Specified by:
      iterator in interface Iterable<T>
      Returns:
      RingBufferIterator
      Throws:
      NoSuchElementException - is HD is moved forward during the iteration
    • toString

      public String toString()
      Overrides:
      toString in class Object
    • index

      protected int index(long seqno)
    • block

      protected boolean block(long seqno)
    • count

      protected int count(boolean missing)