官术网_书友最值得收藏!

Guaranteeing message processing

In this example we will show how to use the explicit acknowledgment, the so-called ack, while consuming messages.

A message is stored in a queue until one consumer gets the message and sends the ack back to the broker.

The ack can be either implicit or explicit. In the previous examples we have used theimplicit ack.

In order to view this example in action, you can run the publisher from the Producing messages recipe and the consumer who gets the message, which you can find in the book archive at Chapter01/Recipe09/Java_9/.

Getting ready

To use this recipe we need to set up the Java development environment as indicated in the Introduction section.

How to do it…

In order to guarantee that the messages have been acknowledged by the consumer after processing them, you can perform the following steps:

  1. Declare a queue:
    channel.queueDeclare(myQueue, true, false, false,null);
  2. Bind the consumer to the queue, specifying false for the autoAck parameter of basicConsume():
    ActualConsumer consumer = new ActualConsumer(channel);
    boolean autoAck = false; // n.b.
    channel.basicConsume(MyQueue, autoAck, consumer);
  3. Consume a message and send the ack:
    public void handleDelivery(String consumerTag,Envelope envelope, BasicPropertiesproperties,byte[] body) throws java.io.IOException {
    
    String message = new String(body);
    this.getChannel().basicAck(envelope.getDeliveryTag(),false);

How it works…

After we created the queue (step 1), we added the consumer to the queue and defined the ack behavior (step 2).

The parameter autoack = false informs the RabbitMQ client API that we are going to send explicit ack ourselves.

After we have got a message from the queue, we must acknowledge to RabbitMQ that we have received and properly processed the message calling channel.basicAck()(step 3). The message will be removed from the queue only when RabbitMQ receives the ack.

Tip

If you don't send the ack back, the consumer continues to fetch subsequent messages; however, when you disconnect the consumer, all the messages will still be in the queue. Messages are not consumed until RabbitMQ receives the corresponding ack. Try to comment out the basicAck() call in the example to experiment this behavior.

The method channel.basicAck() has two parameters:

  • deliveryTag
  • multiple

The deliveryTag parameter is a value assigned by the server to the message, which you can retrieve using delivery.getEnvelope().getDeliveryTag().

If multiple is set to false the client acknowledges only the message of the deliveryTag parameter, otherwise the client acknowledges all the messages until this last one. This flag allows us to optimize consuming messages by sending ack to RabbitMQ on a block of messages instead of for each one.

Tip

A message must be acknowledged only once; if you try to acknowledge the same message more than once, the method raises a precondition-failed exception.

Calling channel.basicAck(0,true) all the unacknowledged messages get acknowledged; the 0 stands for "all the messages".

Furthermore, calling channel.basicAck(0,false) raises an exception.

There's more…

In the next chapter we will discuss the basicReject() method. This method is a RabbitMQ extension that allows further flexibility.

See also

The Distributing messages to many consumers recipe is a real example that explains better explicit ack use.

主站蜘蛛池模板: 梨树县| 湖州市| 呈贡县| 昔阳县| 海口市| 宜阳县| 秀山| 黄山市| 宜宾市| 盐山县| 上杭县| 泾川县| 蚌埠市| 黄陵县| 中山市| 开阳县| 裕民县| 当涂县| 绥宁县| 皋兰县| 保山市| 肃南| 自治县| 沁水县| 富锦市| 淮南市| 隆化县| 河曲县| 花莲市| 呼图壁县| 朔州市| 南和县| 平定县| 龙胜| 苍溪县| 禹城市| 桓台县| 聂荣县| 江北区| 鸡西市| 长武县|