Class RequestCorrelator

java.lang.Object
org.jgroups.blocks.RequestCorrelator

public class RequestCorrelator extends Object
Framework to send requests and receive matching responses (on request ID). Multiple requests can be sent at a time. Whenever a response is received, the correct Request is looked up (key = id) and its method receiveResponse() invoked.
  • Field Details

    • down_prot

      protected Protocol down_prot
      The protocol layer to use to pass up/down messages. Can be either a Protocol or a Transport
    • requests

      protected final Map<Long,Request<?>> requests
      The table of pending requests (keys=Long (request IDs), values=RequestEntry)
    • REQUEST_ID

      protected static final AtomicLong REQUEST_ID
      To generate unique request IDs
    • request_handler

      protected RequestHandler request_handler
      The handler for the incoming requests. It is called from inside the dispatcher thread
    • corr_id

      protected short corr_id
      makes the instance unique (together with IDs)
    • local_addr

      protected Address local_addr
      The address of this group member
    • view

      protected volatile View view
    • started

      protected boolean started
    • async_dispatching

      protected boolean async_dispatching
      Whether or not to use async dispatcher
    • wrap_exceptions

      protected boolean wrap_exceptions
    • async_rsp_handling

      protected boolean async_rsp_handling
      When enabled, responses are handled by the common ForkJoinPool (https://issues.redhat.com/browse/JGRP-2644)
    • probe_handler

      protected final RequestCorrelator.MyProbeHandler probe_handler
    • common_pool

      protected final ForkJoinPool common_pool
    • rpc_stats

      protected final RpcStats rpc_stats
    • avg_req_delivery

      protected final AverageMinMax avg_req_delivery
    • avg_rsp_delivery

      protected final AverageMinMax avg_rsp_delivery
    • log

      protected static final Log log
  • Constructor Details

    • RequestCorrelator

      public RequestCorrelator(Protocol down_prot, RequestHandler handler, Address local_addr)
      Constructor. Uses DOWN_PROT to send messages. If handler is not null, all incoming requests will be dispatched to it (via handleRequest(Message, Header).
      Parameters:
      down_prot - Used to send requests or responses.
      handler - Request handler. Method handle(Message) will be called when a request is received.
      local_addr - The address of this member
  • Method Details

    • setRequestHandler

      public void setRequestHandler(RequestHandler handler)
    • getLocalAddress

      public Address getLocalAddress()
    • setLocalAddress

      public RequestCorrelator setLocalAddress(Address a)
    • asyncDispatching

      public boolean asyncDispatching()
    • asyncDispatching

      public RequestCorrelator asyncDispatching(boolean flag)
    • asyncRspHandling

      public boolean asyncRspHandling()
    • asyncRspHandling

      public RequestCorrelator asyncRspHandling(boolean f)
    • wrapExceptions

      public boolean wrapExceptions()
    • wrapExceptions

      public RequestCorrelator wrapExceptions(boolean flag)
    • sendMulticastRequest

      public <T> void sendMulticastRequest(Collection<Address> dest_mbrs, Message msg, Request<T> req, RequestOptions opts) throws Exception
      Sends a request to a group. If no response collector is given, no responses are expected (making the call asynchronous)
      Parameters:
      dest_mbrs - The list of members who should receive the call. Usually a group RPC is sent via multicast, but a receiver drops the request if its own address is not in this list. Will not be used if it is null.
      msg - The message to be sent.
      req - A request (usually the object that invokes this method). Its methods receiveResponse() and suspect() will be invoked when a message has been received or a member is suspected.
      Throws:
      Exception
    • sendUnicastRequest

      public <T> void sendUnicastRequest(Message msg, Request<T> req, RequestOptions opts) throws Exception
      Sends a request to a single destination
      Throws:
      Exception
    • done

      public void done(long id)
      Used to signal that a certain request may be garbage collected as all responses have been received
    • receive

      public boolean receive(Event evt)
      Callback.

      Called by the protocol below when a message has been received. The algorithm should test whether the message is destined for us and, if not, pass it up to the next layer. Otherwise, it should remove the header and check whether the message is a request or response. In the first case, the message will be delivered to the request handler registered (calling its handle() method), in the second case, the corresponding response collector is looked up and the message delivered.

      Parameters:
      evt - The event to be received
      Returns:
      Whether or not the event was consumed. If true, don't pass message up, else pass it up
    • start

      public final void start()
    • stop

      public void stop()
    • registerProbeHandler

      public void registerProbeHandler(TP transport)
    • unregisterProbeHandler

      public void unregisterProbeHandler(TP transport)
    • setSiteUnreachable

      public void setSiteUnreachable(String site)
      An entire site is down; mark all requests that point to that site as unreachable (used by RELAY2)
    • setMemberUnreachable

      public void setMemberUnreachable(Address mbr)
    • receiveView

      public void receiveView(View new_view)
      View received: mark all responses from members that are not in new_view as suspected
    • receiveMessage

      public boolean receiveMessage(Message msg)
      Handles a message coming from a layer below
      Returns:
      true if the message was consumed, don't pass it further up, else false
    • receiveMessageBatch

      public void receiveMessageBatch(MessageBatch batch)
    • iterate

      protected void iterate(MessageBatch batch, boolean skip_excluded_msgs, boolean process_reqs, boolean process_rsps)
    • skip

      protected boolean skip(RequestCorrelator.Header hdr, Address sender)
    • sendAnycastRequest

      protected void sendAnycastRequest(Message req, Collection<Address> dest_mbrs)
    • addEntry

      protected <T> void addEntry(Request<T> req, RequestCorrelator.Header hdr, boolean unicast)
    • removeEntry

      protected RequestCorrelator removeEntry(long req_id)
    • dispatch

      protected void dispatch(Message msg, RequestCorrelator.Header hdr)
    • handleRequest

      protected void handleRequest(Message req, RequestCorrelator.Header hdr)
      Handle a request msg for this correlator
    • handleResponse

      protected void handleResponse(Message rsp, RequestCorrelator.Header hdr)
    • sendReply

      protected void sendReply(Message req, long req_id, Object reply, boolean is_exception)
    • makeReply

      protected static Message makeReply(Message msg)
    • sendResponse

      protected void sendResponse(Message rsp, long req_id, boolean is_exception)