Class TransferQueueBundler2

java.lang.Object
org.jgroups.protocols.TransferQueueBundler2
All Implemented Interfaces:
Runnable, Bundler

public class TransferQueueBundler2 extends Object implements Bundler, Runnable
This bundler adds all (unicast or multicast) messages to a queue until max size has been exceeded, but does send messages immediately when no other messages are available. https://issues.redhat.com/browse/JGRP-1540

The difference to TransferQueueBundler is that a size is maintained

per destination
and we maintain byte arrays of max_bundle_size per destination into which we marshall a message directly when it is sent.
  • Field Details

    • max_size

      protected int max_size
      Maximum number of bytes for messages to be queued until they are sent. This value needs to be smaller than the largest datagram packet size in case of UDP
    • capacity

      protected int capacity
    • poll_timeout

      protected long poll_timeout
    • transport

      protected TP transport
    • log

      protected Log log
    • queue

      protected BlockingQueue<Message> queue
    • remove_queue

      protected List<Message> remove_queue
    • bundler_thread

      protected volatile Thread bundler_thread
    • running

      protected volatile boolean running
    • num_sends_because_full_queue

      protected long num_sends_because_full_queue
    • num_sends_because_no_msgs

      protected long num_sends_because_no_msgs
    • avg_fill_count

      protected final AverageMinMax avg_fill_count
    • THREAD_NAME

      protected static final String THREAD_NAME
      See Also:
    • messages

      protected final Map<Address,TransferQueueBundler2.Buffer> messages
    • NIL

      protected static final NullAddress NIL
  • Constructor Details

    • TransferQueueBundler2

      public TransferQueueBundler2()
    • TransferQueueBundler2

      protected TransferQueueBundler2(BlockingQueue<Message> queue)
    • TransferQueueBundler2

      public TransferQueueBundler2(int capacity)
  • Method Details

    • getCapacity

      public int getCapacity()
      Description copied from interface: Bundler
      If the bundler implementation supports a capacity (e.g. RingBufferBundler, then return it, else return -1
      Specified by:
      getCapacity in interface Bundler
    • setCapacity

      public Bundler setCapacity(int c)
    • getMaxSize

      public int getMaxSize()
      Description copied from interface: Bundler
      Maximum number of bytes for messages to be queued until they are sent
      Specified by:
      getMaxSize in interface Bundler
    • setMaxSize

      public Bundler setMaxSize(int s)
      Specified by:
      setMaxSize in interface Bundler
    • getQueueSize

      public int getQueueSize()
      Description copied from interface: Bundler
      If the bundler has a queue and it should be managed by a queuing discipline (like Random Early Detection), then return the number of elements in the queue, else -1. In the latter case, the queue won't be managed.

      This method needs to be fast as it might get called on every message to be sent.

      Specified by:
      getQueueSize in interface Bundler
    • removeQueueSize

      public int removeQueueSize()
    • removeQueueSize

      public TransferQueueBundler2 removeQueueSize(int size)
    • dump

      public String dump()
    • init

      public void init(TP transport)
      Description copied from interface: Bundler
      Called after creation of the bundler
      Specified by:
      init in interface Bundler
      Parameters:
      transport - the transport, for further reference
    • resetStats

      public void resetStats()
      Specified by:
      resetStats in interface Bundler
    • viewChange

      public void viewChange(View view)
      Specified by:
      viewChange in interface Bundler
    • start

      public void start()
      Description copied from interface: Bundler
      Called after Bundler.init(TP)
      Specified by:
      start in interface Bundler
    • stop

      public void stop()
      Specified by:
      stop in interface Bundler
    • renameThread

      public void renameThread()
      Specified by:
      renameThread in interface Bundler
    • size

      public int size()
      Description copied from interface: Bundler
      The number of unsent messages in the bundler
      Specified by:
      size in interface Bundler
    • send

      public void send(Message msg) throws Exception
      Specified by:
      send in interface Bundler
      Throws:
      Exception
    • run

      public void run()
      Specified by:
      run in interface Runnable
    • hasMessages

      protected boolean hasMessages()
    • addAndSendIfSizeExceeded

      protected void addAndSendIfSizeExceeded(Message msg)
    • sendBundledMessages

      protected void sendBundledMessages()
    • drain

      protected void drain()
      Takes all messages from the queue, adds them to the hashmap and then sends all bundled messages
    • assertPositive

      protected static int assertPositive(int value, String message)