Class Table<T>

java.lang.Object
org.jgroups.util.Table<T>
All Implemented Interfaces:
Iterable<T>

public class Table<T> extends Object implements Iterable<T>
A store for elements (typically messages) to be retransmitted or delivered. Used on sender and receiver side, as a replacement for HashMap. Table should use less memory than HashMap, as HashMap.Entry has 4 fields, plus arrays for storage.

Table maintains a matrix (an array of arrays) of elements, which are stored in the matrix by mapping their seqno to an index. E.g. when we have 10 rows of 1000 elements each, and first_seqno is 3000, then an element with seqno=5600, will be stored in the 3rd row, at index 600.

Rows are removed when all elements in that row have been received.

Table started out as a copy of RetransmitTable, but is synchronized and maintains its own low, hd and hr pointers, so it can be used as a replacement for NakReceiverWindow. The idea is to put messages into Table, deliver them in order of seqnos, and periodically scan over all tables in NAKACK2 to do retransmission.

Version:
3.1
  • Field Details

    • num_rows

      protected final int num_rows
    • elements_per_row

      protected final int elements_per_row
      Must be a power of 2 for efficient modular arithmetic
    • resize_factor

      protected final double resize_factor
    • matrix

      protected T[][] matrix
    • offset

      protected long offset
      The first seqno, at matrix[0][0]
    • size

      protected int size
    • low

      protected long low
      The highest seqno purged
    • hr

      protected long hr
      The highest received seqno
    • hd

      protected long hd
      The highest delivered (= removed) seqno
    • max_compaction_time

      protected long max_compaction_time
      Time (in nanoseconds) after which a compaction should take place. 0 disables compaction
    • last_compaction_timestamp

      protected long last_compaction_timestamp
      The time when the last compaction took place. If a compact() takes place and sees that the last compaction is more than max_compaction_time nanoseconds ago, a compaction will take place
    • lock

      protected final Lock lock
    • adders

      protected final AtomicInteger adders
    • num_compactions

      protected int num_compactions
    • num_resizes

      protected int num_resizes
    • num_moves

      protected int num_moves
    • num_purges

      protected int num_purges
    • DEFAULT_MAX_COMPACTION_TIME

      protected static final long DEFAULT_MAX_COMPACTION_TIME
      See Also:
    • DEFAULT_RESIZE_FACTOR

      protected static final double DEFAULT_RESIZE_FACTOR
      See Also:
  • Constructor Details

    • Table

      public Table()
    • Table

      public Table(long offset)
    • Table

      public Table(int num_rows, int elements_per_row, long offset)
    • Table

      public Table(int num_rows, int elements_per_row, long offset, double resize_factor)
    • Table

      public Table(int num_rows, int elements_per_row, long offset, double resize_factor, long max_compaction_time)
      Creates a new table
      Parameters:
      num_rows - the number of rows in the matrix
      elements_per_row - the number of elements per row
      offset - the seqno before the first seqno to be inserted. E.g. if 0 then the first seqno will be 1
      resize_factor - teh factor with which to increase the number of rows
      max_compaction_time - the max time in milliseconds after we attempt a compaction
  • Method Details

    • getAdders

      public AtomicInteger getAdders()
    • getOffset

      public long getOffset()
    • getElementsPerRow

      public int getElementsPerRow()
    • capacity

      public int capacity()
      Returns the total capacity in the matrix
    • getNumCompactions

      public int getNumCompactions()
    • getNumMoves

      public int getNumMoves()
    • getNumResizes

      public int getNumResizes()
    • getNumPurges

      public int getNumPurges()
    • size

      public int size()
      Returns an appromximation of the number of elements in the table
    • isEmpty

      public boolean isEmpty()
    • getLow

      public long getLow()
    • getHighestDelivered

      public long getHighestDelivered()
    • getHighestReceived

      public long getHighestReceived()
    • getMaxCompactionTime

      public long getMaxCompactionTime()
    • setMaxCompactionTime

      public Table<T> setMaxCompactionTime(long max_compaction_time)
    • getNumRows

      public int getNumRows()
    • resetStats

      public void resetStats()
    • getHighestDeliverable

      public long getHighestDeliverable()
      Returns the highest deliverable (= removable) seqno. This may be higher than getHighestDelivered(), e.g. if elements have been added but not yet removed
    • getNumDeliverable

      public int getNumDeliverable()
      Returns the number of messages that can be delivered
    • setHighestDelivered

      public Table<T> setHighestDelivered(long seqno)
      Only used internally by JGroups on a state transfer. Please don't use this in application code, or you're on your own !
      Parameters:
      seqno -
    • add

      public boolean add(long seqno, T element)
      Adds an element if the element at the given index is null. Returns true if no element existed at the given index, else returns false and doesn't set the element.
      Parameters:
      seqno -
      element -
      Returns:
      True if the element at the computed index was null, else false
    • add

      public boolean add(long seqno, T element, Predicate<T> remove_filter)
      Adds an element if the element at the given index is null. Returns true if no element existed at the given index, else returns false and doesn't set the element.
      Parameters:
      seqno -
      element -
      remove_filter - If not null, a filter used to remove all consecutive messages passing the filter
      Returns:
      True if the element at the computed index was null, else false
    • add

      public boolean add(List<LongTuple<T>> list)
      Adds elements from list to the table
      Parameters:
      list -
      Returns:
      True if at least 1 element was added successfully
    • add

      public boolean add(List<LongTuple<T>> list, boolean remove_added_elements)
      Adds elements from list to the table, removes elements from list that were not added to the table
      Parameters:
      list -
      Returns:
      True if at least 1 element was added successfully. This guarantees that the list has at least 1 element
    • add

      public boolean add(List<LongTuple<T>> list, boolean remove_added_elements, T const_value)
      Adds elements from the list to the table
      Parameters:
      list - The list of tuples of seqnos and elements. If remove_added_elements is true, if elements could not be added to the table (e.g. because they were already present or the seqno was < HD), those elements will be removed from list
      remove_added_elements - If true, elements that could not be added to the table are removed from list
      const_value - If non-null, this value should be used rather than the values of the list tuples
      Returns:
      True if at least 1 element was added successfully, false otherwise.
    • add

      public boolean add(MessageBatch batch, Function<T,Long> seqno_getter)
    • add

      public boolean add(MessageBatch batch, Function<T,Long> seqno_getter, boolean remove_from_batch, T const_value)
      Adds all messages from the given batch to the table
      Parameters:
      batch - The batch
      seqno_getter - A function to return the sequence number (seqno) of a given Message. Must be non-null. If the function return -1, then the message won't be added
      remove_from_batch - If true, the message is removed
      regardless
      of whether it was added successfully or not
      const_value - If non-null, this value should be used rather than the values of the list tuples
      Returns:
      True if at least 1 element was added successfully, false otherwise.
    • get

      public T get(long seqno)
      Returns an element at seqno
      Parameters:
      seqno -
      Returns:
    • _get

      public T _get(long seqno)
      To be used only for testing; doesn't do any index or sanity checks
      Parameters:
      seqno -
      Returns:
    • remove

      public T remove()
    • remove

      public T remove(boolean nullify)
      Removes the next non-null element and nulls the index if nullify=true
    • removeMany

      public List<T> removeMany(boolean nullify, int max_results)
    • removeMany

      public List<T> removeMany(boolean nullify, int max_results, Predicate<T> filter)
    • removeMany

      public <R> R removeMany(boolean nullify, int max_results, Predicate<T> filter, Supplier<R> result_creator, BiConsumer<R,T> accumulator)
      Removes elements from the table and adds them to the result created by result_creator. Between 0 and max_results elements are removed. If no elements were removed, processing will be set to true while the table lock is held.
      Type Parameters:
      R - the type of the result
      Parameters:
      nullify - if true, the x,y location of the removed element in the matrix will be nulled
      max_results - the max number of results to be returned, even if more elements would be removable
      filter - a filter which accepts (or rejects) elements into the result. If null, all elements will be accepted
      result_creator - a supplier required to create the result, e.g. ArrayList::new
      accumulator - an accumulator accepting the result and an element, e.g. ArrayList::add
      Returns:
      the result
    • purge

      public void purge(long seqno)
      Removes all elements less than or equal to seqno from the table. Does this by nulling entire rows in the matrix and nulling all elements < index(seqno) of the first row that cannot be removed
      Parameters:
      seqno -
    • purge

      public void purge(long seqno, boolean force)
      Removes all elements less than or equal to seqno from the table. Does this by nulling entire rows in the matrix and nulling all elements < index(seqno) of the first row that cannot be removed.
      Parameters:
      seqno - All elements <= seqno will be nulled
      force - If true, we only ensure that seqno <= hr, but don't care about hd, and set hd=low=seqno.
    • compact

      public void compact()
    • forEach

      public void forEach(long from, long to, Table.Visitor<T> visitor)
      Iterates over the matrix with range [from .. to] (including from and to), and calls Table.Visitor.visit(long,Object,int,int). If the visit() method returns false, the iteration is terminated.

      This method must be called with the lock held

      Parameters:
      from - The starting seqno
      to - The ending seqno, the range is [from .. to] including from and to
      visitor - An instance of Visitor
    • iterator

      public Iterator<T> iterator()
      Specified by:
      iterator in interface Iterable<T>
    • iterator

      public Iterator<T> iterator(long from, long to)
    • stream

      public Stream<T> stream()
    • stream

      public Stream<T> stream(long from, long to)
    • _add

      protected boolean _add(long seqno, T element, boolean check_if_resize_needed, Predicate<T> remove_filter)
    • findHighestSeqno

      protected long findHighestSeqno(List<LongTuple<T>> list)
    • findHighestSeqno

      protected static <T> long findHighestSeqno(MessageBatch batch, Function<T,Long> seqno_getter)
    • resize

      protected void resize(long seqno)
      Moves rows down the matrix, by removing purged rows. If resizing to accommodate seqno is still needed, computes a new size. Then either moves existing rows down, or copies them into a new array (if resizing took place). The lock must be held by the caller of resize().
    • move

      protected void move(int num_rows)
      Moves contents of matrix num_rows down. Avoids a System.arraycopy(). Caller must hold the lock.
    • _compact

      protected void _compact()
      Moves the contents of matrix down by the number of purged rows and resizes the matrix accordingly. The capacity of the matrix should be size * resize_factor. Caller must hold the lock.
    • computeSize

      public int computeSize()
      Iterate from low to hr and add up non-null values. Caller must hold the lock.
    • getNumMissing

      public int getNumMissing()
      Returns the number of null elements in the range [hd+1 .. hr-1] excluding hd and hr
    • getMissing

      public SeqnoList getMissing()
      Returns a list of missing (= null) elements
      Returns:
      A SeqnoList of missing messages, or null if no messages are missing
    • getMissing

      public SeqnoList getMissing(int max_msgs)
      Returns a list of missing messages
      Parameters:
      max_msgs - If > 0, the max number of missing messages to be returned (oldest first), else no limit
      Returns:
      A SeqnoList of missing messages, or null if no messages are missing
    • getDigest

      public long[] getDigest()
    • toString

      public String toString()
      Overrides:
      toString in class Object
    • dump

      public String dump()
      Dumps the seqnos in the table as a list
    • getRow

      protected T[] getRow(int index)
      Returns a row. Creates a new row and inserts it at index if the row at index doesn't exist
      Parameters:
      index -
      Returns:
      A row
    • computeRow

      protected int computeRow(long seqno)
      Computes and returns the row index for seqno. The caller must hold the lock.
    • computeIndex

      protected int computeIndex(long seqno)
      Computes and returns the index within a row for seqno