官术网_书友最值得收藏!

Using RPC with messaging

Remote Procedure Calls (RPC) are commonly used with client-server architectures. The client is required to perform some actions to the server, and then waits for the server reply.

The messaging paradigm tries to enforce a totally different approach with the fire-and-forget messaging style, but it is possible to use properly designed AMQP queues to perform and enhance RPC, as shown in the following figure:

Using RPC with messaging

Graphically it is depicted that the request queue is associated with the responder, the reply queues with the callers.

However, when we use RabbitMQ, all the involved peers (both the callers and the responders) are AMQP clients.

We are now going to describe the steps performed in the example in Chapter01/Recipe05/Java_5/src/rmqexample/rpc.

Getting ready

To use this recipe we need to set up the Java development environment as indicated in the Introduction section.

How to do it…

Let's perform the following steps to implement the RPC responder:

  1. Declare the request queue where the responder will be waiting for the RPC requests:
    channel.queueDeclare(requestQueue, false, false, false, null);
  2. Define our specialized consumer RpcResponderConsumer by overriding DefaultConsumer.handleDelivery() as already seen in the Consuming messages recipe. On the reception of each RPC request, this consumer will:
    • Perform the action required in the RPC request
    • Prepare the reply message
    • Set the correlation ID in the reply properties by using the following code:
        BasicProperties replyProperties = new BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();
    • Publish the answer on the reply queue:
        getChannel().basicPublish("", properties.getReplyTo(),replyProperties, reply.getBytes());
      
    • Send the ack to the RPC request:
        getChannel().basicAck(envelope.getDeliveryTag(), false);
  3. Start consuming messages, until we stop it as already seen in the Consuming messages recipe.

Now let's perform the following steps to implement the RPC caller:

  1. Declare the request queue where the responder will be waiting for the RPC requests:
    channel.queueDeclare(requestQueue, false, false, false, null);
    
  2. Create a temporary, private, autodelete reply queue:
    String replyQueue = channel.queueDeclare().getQueue();
  3. Define our specialized consumer RpcCallerConsumer, which will take care of receiving and handling RPC replies. It will:
    • Allow to specify what to do when it gets the replies (in our example, by defining AddAction())
    • Override handleDelivery():
          public void handleDelivery(String consumerTag, 
          Envelope envelope,
          AMQP.BasicProperties properties, 
          byte[] body) throws java.io.IOException {
        
          String messageIdentifier = properties.getCorrelationId();
          String action = actions.get(messageIdentifier);
          actions.remove(messageIdentifier);
      
          String response = new String(body);
          OnReply(action, response);
        }
  4. Start consuming reply messages invoking channel.basicConsume().
  5. Prepare and serialize the requests (messageRequest in our example).
  6. Initialize an arbitrary, unique message identifier (messageIdentifier).
  7. Define what to do when the consumer gets the corresponding reply, by binding the action with the messageIdentifier. In our example we do it by calling our custom method consumer.AddAction().
  8. Publish the message to requestqueue, setting its properties:
    BasicProperties props = new BasicProperties.Builder()
    .correlationId(messageIdentifier)
    .replyTo(replyQueue).build();
    channel.basicPublish("", requestQueue, props,messageRequest.getBytes());

How it works…

In this example the RPC responder takes the role of an RPC server; the responder listens on the requestQueue public queue (step 1), where the callers will place their requests.

Each caller, on the other hand, will consume the responder replies on its own private queue, created in step 5.

When the caller sends a message (step 11), it includes two properties: the name of the temporary reply queue (replyTo()) where it will be listening, and a message identifier (correlationId()), needed by the caller to identify the call when the reply comes back.

In fact, in our example we have implemented an asynchronous RPC caller. The action to be performed by the RpcCallerConsumer (step 6) when the reply comes back is recorded by the nonblocking consumer by calling AddAction() (step 10).

Coming back to the responder, the RPC logic is all in the RpcResponderConsumer. This is not different from a specialized non-blocking consumer, as we have seen in the Consuming messages recipe, except for two details:

  • The reply queue name is got by the message properties, properties.getReplyTo(). Its value has been set by the caller to its private, temporary reply queue.
  • The reply message must include in its properties the correlation ID sent in the incoming message.

Tip

The correlation ID is not used by the RPC responder; it is only used to let the caller receiving this message correlate this reply with its corresponding request.

There's more…

In this section we will discuss the use of blocking RPC and some scalability notes.

Using blocking RPC

Sometimes simplicity is more important than scalability. In this case it is possible to use the following helper classes, included in the Java RabbitMQ client library, that implement blocking RPC semantics:

com.rabbitmq.client.RpcClient
com.rabbitmq.client.StringRpcServer

The logic is identical, but there are no non-blocking consumers involved, and the handling of temporary queues and correlation IDs is transparent to the user.

You can find a working example at Chapter01/Recipe05/Java_5/src/rmqexample/simplerpc.

Scalability notes

What happens when there are multiple callers? It mainly works as a standard RPC client/server architecture. But what if we run many responders?

In this case all the responders will take care of consuming messages from the request queue. Furthermore, the responders can be located on different hosts. We have just got load distribution for free. More on this topic is in the recipe Distributing messages to many consumers.

主站蜘蛛池模板: 华安县| 安图县| 上栗县| 昂仁县| 北海市| 游戏| 海原县| 承德县| 财经| 雷波县| 宜州市| 萍乡市| 洪洞县| 清丰县| 饶阳县| 遵义县| 洞头县| 丁青县| 丰城市| 台南市| 桂平市| 岳阳县| 翁牛特旗| 蕲春县| 岳普湖县| 大方县| 额尔古纳市| 宁德市| 皋兰县| 安义县| 贵港市| 民丰县| 阳曲县| 仙居县| 阿坝县| 民和| 屯昌县| 门源| 彩票| 遂昌县| 孟村|