Class RingBufferSeqno<T>
- All Implemented Interfaces:
Iterable<T>
The buffer has a fixed capacity, and a low (LOW), highest delivered (HD) and highest received (HR) seqno.
An element with a sequence number (seqno) > low + capacity or < HD will get discarded.
Elements are added after HD, but cannot wrap around beyond LOW. Addition doesn't need to be sequential, e.g. adding 5, 6, 8 is OK (as long as a seqno doesn't pass LOW). Addition may advance HR. Addition of elements that are already present is a no-op, and will not set the element again.
Removal of elements starts at HD+1; any non-null element is removed and HD is advanced accordingly. If a remove method is called with nullify=true, then removed elements are nulled and LOW is advanced as well (LOW=HD). Note that all removals in a given RingBuffer must either have nullify=true, or all must be false. It is not permitted to do some removals with nullify=true, and others with nullify=false, in the same RingBuffer.
The stable(long) method is called periodically; it nulls all elements between LOW and HD and advances LOW
to HD.
The design of RingBuffer is discussed in doc/design/RingBufferSeqno.txt.
- Since:
- 3.1
-
Nested Class Summary
Nested Classes -
Field Summary
FieldsModifier and TypeFieldDescriptionprotected final T[]Atomic ref array so that elements can be checked for null and set atomically.protected final Conditionprotected longThe highest delivered seqno.protected longThe highest received seqno.protected final LockLock for adders to block on when the buffer is fullprotected longThe lowest seqno.protected final longprotected final AtomicBooleanprotected boolean -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescription_get(long seqno) Only used for testing !!booleanbooleanAdds a new element to the bufferprotected booleanblock(long seqno) final intcapacity()protected intcount(boolean missing) voiddestroy()get(long seqno) get(long from, long to) Returns a list of messages in the range [from ..long[]longlonglonggetLow()protected intindex(long seqno) iterator()Returns an iterator over the elements of the ring buffer in the range [HD+1 ..intmissing()remove()Removes the next element (at hd +1).remove(boolean nullify) Removes the next element (at hd +1).removeMany(boolean nullify, int max_results) removeMany(AtomicBoolean processing, boolean nullify, int max_results) doublevoidsetHighestDelivered(long hd) intsize()intvoidstable(long seqno) Nulls elements between low and seqno and forwards lowtoString()Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waitMethods inherited from interface java.lang.Iterable
forEach, spliterator
-
Field Details
-
buf
Atomic ref array so that elements can be checked for null and set atomically. Should always be sized to a power of 2. -
low
protected long lowThe lowest seqno. Moved forward by stable() -
hd
protected long hdThe highest delivered seqno. Moved forward by a remove method. The next message to be removed is hd +1 -
hr
protected long hrThe highest received seqno. Moved forward by add(). The next message to be added is hr +1 -
offset
protected final long offset -
lock
Lock for adders to block on when the buffer is full -
buffer_full
-
running
protected boolean running -
processing
-
-
Constructor Details
-
RingBufferSeqno
public RingBufferSeqno(int capacity, long offset) Creates a RingBuffer- Parameters:
capacity- The number of elements the ring buffer's array should hold.offset- The offset. The first element to be added has to be offset +1.
-
-
Method Details
-
getLow
public long getLow() -
getHighestDelivered
public long getHighestDelivered() -
setHighestDelivered
public void setHighestDelivered(long hd) -
getHighestReceived
public long getHighestReceived() -
getDigest
public long[] getDigest() -
getProcessing
-
add
-
add
Adds a new element to the buffer- Parameters:
seqno- The seqno of the elementelement- The elementblock- If true, add() will block when the buffer is full until there is space. Else, add() will return immediately, either successfully or unsuccessfully (if the buffer is full)- Returns:
- True if the element was added, false otherwise.
-
remove
Removes the next element (at hd +1). Note that this method is not concurrent, as RingBuffer can only have 1 remover thread active at any time !- Parameters:
nullify- Nulls the element in the array if true- Returns:
- T if there was a non-null element at position hd +1, or null if the element at hd+1 was null, or hd+1 > hr.
-
remove
Removes the next element (at hd +1). Note that this method is not concurrent, as RingBuffer can only have 1 remover thread active at any time !- Returns:
- T if there was a non-null element at position hd +1, or null if the element at hd+1 was null.
-
removeMany
-
removeMany
-
get
-
_get
Only used for testing !! -
get
Returns a list of messages in the range [from .. to], including from and to- Parameters:
from-to-- Returns:
- A list of messages, or null if none in range [from .. to] was found
-
stable
public void stable(long seqno) Nulls elements between low and seqno and forwards low -
destroy
public void destroy() -
capacity
public final int capacity() -
size
public int size() -
missing
public int missing() -
spaceUsed
public int spaceUsed() -
saturation
public double saturation() -
getMissing
-
iterator
Returns an iterator over the elements of the ring buffer in the range [HD+1 .. HR]- Specified by:
iteratorin interfaceIterable<T>- Returns:
- RingBufferIterator
- Throws:
NoSuchElementException- is HD is moved forward during the iteration
-
toString
-
index
protected int index(long seqno) -
block
protected boolean block(long seqno) -
count
protected int count(boolean missing)
-