官术网_书友最值得收藏!

Managing rejected or expired messages

In this example, we show how to manage expired or rejected messages using dead letter exchanges. The dead letter exchange is a normal exchange where dead messages are redirected; if not specified, dead messages are just dropped by the broker.

You can find the source in Chapter02/Recipe04/Java/src/rmqexample, where you can find the following files:

  • Producer.java
  • Consumer.java

To try expired messages, you can use the first code alone that publishes messages with a given TTL, as shown in the How to let messages expire on specific queues recipe.

Once started, the consumer of the example will not allow the messages to expire but will reject all the messages, leading to dead messages as well.

Getting ready

To use this recipe, we need to set up the Java development environment, as indicated in the Introduction section of Chapter 1, Working with AMQP.

How to do it...

Complete the following steps to show how to manage expired or rejected messages using dead letter exchanges:

  1. We create the work exchange and the dead letter exchange:
    channel.exchangeDeclare(Constants.exchange, "direct", false);
    channel.exchangeDeclare(Constants.exchange_dead_letter, "direct", false);
  2. We create the queue using the dead_letter exchange and x-message-ttl arguments:
    arguments.put("x-message-ttl", 10000);
    arguments.put("x-dead-letter-exchange",exchange_dead_letter);
    channel.queueDeclare(queue, false, false, false, arguments);
  3. Then, we bind queue as follows:
    channel.queueBind(queue, exchange, "");
  4. We can finally send the messages to exchange using channel.basicPublish().
  5. To try the rejected messages, we have to configure a consumer, as we have seen in the previous examples, and reject the message using the following code:
    basicReject(envelope.getDeliveryTag(), false);

How it works...

Let's start with the first scenario (using the producer alone): the expired messages. In step 1, we create two exchanges, the working exchange and the dead letter exchange. In step 2, we create the queue with the following two optional parameters:

  • Message TTL using arguments.put("x-message-ttl", 10000), as seen in the How to let messages expire on specific queues recipe.
  • The dead letter exchange name using arguments.put("x-dead-letter-exchange", exchange_dead_letter);

As you can see, we simply add the configuration to the optional queue arguments. So, when the producer sends a message to exchange, it will be routed to the queue parameter. The message will expire after 10 seconds, after which it will be redirected to exchange_dead_letter.

Note

The dead letter is a normal exchange, so you can use any one for this purpose.

For the second scenario, the consumer of this recipe will reject messages. When this consumer gets a message, it sends back a negative acknowledgement (nack) using basicReject() and when the broker receives the nack, it redirects the message to exchange_dead_letter. By binding a queue to the dead letter exchange, you can manage these messages.

When a message is redirected to a dead letter queue, the broker modifies the header message and adds the following values in the x-dead key:

  • reason: This denotes whether the queue is expired or rejected (with requeue = false)
  • queue: This denotes the queue source, for example, stat_queue_02/05
  • time: This denotes the date and time the message was dead lettered
  • exchange: This denotes the exchange source, for example, monitor_exchange_02/05
  • routing-keys: This denotes the original routing keys used to send the message

To see these values in action, you can use the GetOneDeadLetterQ class. This class creates a queue_dead_letter queue and binds it to exchange_dead_letter.

There's more...

You can also specify the dead letter routing key using arguments.put("x-dead-letter-routing-key", "myroutingkey"). This parameter replaces the original routing key. This means that you can redirect different messages with different routing keys to the same queue. That's great!

主站蜘蛛池模板: 永城市| 麻阳| 嘉善县| 庄浪县| 通海县| 周至县| 永安市| 平泉县| 精河县| 电白县| 漾濞| 兰考县| 怀安县| 瓦房店市| 湛江市| 大名县| 潮安县| 陵川县| 会理县| 阳城县| 招远市| 自治县| 方正县| 远安县| 庆城县| 手机| 柘荣县| 旺苍县| 铁岭县| 宿州市| 增城市| 嘉峪关市| 巴马| 贡嘎县| 和田县| 千阳县| 湘潭市| 海伦市| 邯郸市| 三都| 丁青县|