- RabbitMQ Cookbook
- Sigismondo Boschi Gabriele Santomaggio
- 839字
- 2021-07-19 18:52:48
Using RPC with messaging
Remote Procedure Calls (RPC) are commonly used with client-server architectures. The client is required to perform some actions to the server, and then waits for the server reply.
The messaging paradigm tries to enforce a totally different approach with the fire-and-forget messaging style, but it is possible to use properly designed AMQP queues to perform and enhance RPC, as shown in the following figure:

Graphically it is depicted that the request queue is associated with the responder, the reply queues with the callers.
However, when we use RabbitMQ, all the involved peers (both the callers and the responders) are AMQP clients.
We are now going to describe the steps performed in the example in Chapter01/Recipe05/Java_5/src/rmqexample/rpc
.
Getting ready
To use this recipe we need to set up the Java development environment as indicated in the Introduction section.
How to do it…
Let's perform the following steps to implement the RPC responder:
- Declare the
request queue
where the responder will be waiting for the RPC requests:channel.queueDeclare(requestQueue, false, false, false, null);
- Define our specialized consumer
RpcResponderConsumer
by overridingDefaultConsumer.handleDelivery()
as already seen in the Consuming messages recipe. On the reception of each RPC request, this consumer will:- Perform the action required in the RPC request
- Prepare the
reply
message - Set the correlation ID in the reply properties by using the following code:
BasicProperties replyProperties = new BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();
- Publish the answer on the
reply
queue:getChannel().basicPublish("", properties.getReplyTo(),replyProperties, reply.getBytes());
- Send the
ack
to the RPC request:
getChannel().basicAck(envelope.getDeliveryTag(), false);
- Start consuming messages, until we stop it as already seen in the Consuming messages recipe.
Now let's perform the following steps to implement the RPC caller:
- Declare the request queue where the responder will be waiting for the RPC requests:
channel.queueDeclare(requestQueue,
false, false, false, null);
- Create a temporary, private, autodelete reply queue:
String replyQueue = channel.queueDeclare().getQueue();
- Define our specialized consumer
RpcCallerConsumer
, which will take care of receiving and handling RPC replies. It will:- Allow to specify what to do when it gets the replies (in our example, by defining
AddAction()
) - Override
handleDelivery()
:public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws java.io.IOException { String messageIdentifier = properties.getCorrelationId(); String action = actions.get(messageIdentifier); actions.remove(messageIdentifier); String response = new String(body); OnReply(action, response); }
- Allow to specify what to do when it gets the replies (in our example, by defining
- Start consuming reply messages invoking
channel.basicConsume()
. - Prepare and serialize the requests (
messageRequest
in our example). - Initialize an arbitrary, unique message identifier (
messageIdentifier
). - Define what to do when the consumer gets the corresponding reply, by binding the action with the
messageIdentifier
. In our example we do it by calling our custom methodconsumer.AddAction()
. - Publish the message to
requestqueue
, setting its properties:BasicProperties props = new BasicProperties.Builder() .correlationId(messageIdentifier) .replyTo(replyQueue).build(); channel.basicPublish("", requestQueue, props,messageRequest.getBytes());
How it works…
In this example the RPC responder takes the role of an RPC server; the responder listens on the requestQueue
public queue (step 1), where the callers will place their requests.
Each caller, on the other hand, will consume the responder replies on its own private queue, created in step 5.
When the caller sends a message (step 11), it includes two properties: the name of the temporary reply queue (replyTo()
) where it will be listening, and a message identifier (correlationId()
), needed by the caller to identify the call when the reply comes back.
In fact, in our example we have implemented an asynchronous RPC caller. The action to be performed by the RpcCallerConsumer
(step 6) when the reply comes back is recorded by the nonblocking consumer by calling AddAction()
(step 10).
Coming back to the responder, the RPC logic is all in the RpcResponderConsumer
. This is not different from a specialized non-blocking consumer, as we have seen in the Consuming messages recipe, except for two details:
- The reply queue name is got by the message properties,
properties.getReplyTo()
. Its value has been set by the caller to its private, temporary reply queue. - The reply message must include in its properties the correlation ID sent in the incoming message.
There's more…
In this section we will discuss the use of blocking RPC and some scalability notes.
Sometimes simplicity is more important than scalability. In this case it is possible to use the following helper classes, included in the Java RabbitMQ client library, that implement blocking RPC semantics:
com.rabbitmq.client.
RpcClientcom.rabbitmq.client.
StringRpcServer
The logic is identical, but there are no non-blocking consumers involved, and the handling of temporary queues and correlation IDs is transparent to the user.
You can find a working example at Chapter01/Recipe05/Java_5/src/rmqexample/simplerpc.
What happens when there are multiple callers? It mainly works as a standard RPC client/server architecture. But what if we run many responders?
In this case all the responders will take care of consuming messages from the request queue. Furthermore, the responders can be located on different hosts. We have just got load distribution for free. More on this topic is in the recipe Distributing messages to many consumers.
- Learning Scala Programming
- 演進式架構(原書第2版)
- Implementing Modern DevOps
- JavaScript Unlocked
- Mastering Python Scripting for System Administrators
- NumPy Essentials
- R的極客理想:工具篇
- Rust Essentials(Second Edition)
- 零基礎學單片機C語言程序設計
- Mastering Apache Maven 3
- 精通Python自動化編程
- D3.js By Example
- Learning Apache Karaf
- HTML5權威指南
- Advanced UFT 12 for Test Engineers Cookbook