官术网_书友最值得收藏!

Consuming messages

In this recipe we are closing the loop; we have already seen how to send messages to RabbitMQ—or to any AMQP broker—and now we are ready to learn how to retrieve them.

You can find the source code of the recipe at Chapter01/Recipe03/src/rmqexample/nonblocking.

Getting ready

To use this recipe we need to set up the Java development environment as indicated in the introduction.

How to do it…

In order to consume the messages sent as seen in the previous recipe, perform the following steps:

  1. Declare the queue where we want to consume the messages from:
    String myQueue="myFirstQueue";
    channel.queueDeclare(myQueue, true, false, false, null);
  2. Define a specialized consumer class inherited from DefaultConsumer:
    public class ActualConsumer extends DefaultConsumer {
      public ActualConsumer(Channel channel) {
        super(channel);
      }
      @Override
      public void handleDelivery(
        String consumerTag, 
        Envelope envelope, 
        BasicProperties properties, 
        byte[] body) throws java.io.IOException {
          String message = new String(body);
          System.out.println("Received: " + message);
        }
    }
  3. Create a consumer object, which is an instance of this class, bound to our channel:
    ActualConsumer consumer = new ActualConsumer(channel);
  4. Start consuming messages:
    String consumerTag = channel.basicConsume(myQueue, true, consumer);
  5. Once done, stop the consumer:
    channel.basicCancel(consumerTag);

How it works…

After we have established the connection and the channel to the AMQP broker as seen in the Connecting to the broker recipe, we need to ensure that the queue from which we are going to consume the messages exists (step 1).

In fact it is possible that the consumer is started before any producer has sent a message to the queue and the queue itself may actually not exist at all. To avoid the failure of the subsequent operations on the queue, we need to declare the queue.

Tip

By allowing both producers and consumers to declare the same queue, we are decoupling their existence; the order in which we start them is not important.

The heart of this recipe is step 2. Here we have defined our specialized consumer that overrides handleDelivery() and instantiated it in step 3. In the Java client API the consumer callbacks are defined by the com.rabbitmq.client.Consumer interface. We have extended our consumer from DefaultConsumer, which provides a no-operation implementation for all the methods declared in the Consumer interface.

In step 3, by calling channel.basicConsume(), we let the consumer threads start consuming messages. The consumers of each channel are always executed on the same thread, independent of the calling one.

Now that we have activated a consumer for myQueue, the Java client library starts getting messages from that queue on the RabbitMQ broker, and invokes handleDelivery() for each one.

Then after the channel.basicConsume() method's invocation, we just sit idle waiting for a key press in the main thread. Messages are being consumed with nonblocking semantics respecting the event-driven paradigm, typical of messaging applications.

Only after we press Enter, the execution proceeds to step 5, cancelling the consumer. At this point the consumer threads stop invoking our consumer object, and we can release the resources and exit.

There's more…

In this section we will learn more about consumer threads and the use of blockage semantics.

More on consumer threads

At connection definition time, the RabbitMQ Java API allocates a thread pool from which it will allocate consumer threads on need.

All the consumers bound to one channel will be executed by one single thread in the pool; however, it is possible that consumers from different channels are handled by the same thread. That's why it is important to avoid long-lasting operations in the consumer methods, in order to avoid blocking other consumers.

It is also possible to handle the consumer thread pool by ourselves, as we have shown in our example; however, this not obligatory at all. We have defined a thread pool, java.util.concurrent.ExecutorService, and passed it at connection time:

ExecutorService eService = Executors.newFixedThreadPool(10);
Connection connection = factory.newConnection(eService);

As we were managing it, we were also in charge of terminating it:

eService.shutdown();

However, remember that if you don't define your own ExecutorService thread pool, the Java client library will create one during connection creation time, and destroy it as soon as we destroy the corresponding connections.

Blocking semantics

It is possible to use blocking semantics too, but we strongly discourage this approach if it's not being used for simple applications and test cases; the recipe to consume messages is non-blocking.

However, you can find the source code for the blocking approach at Chapter01/Recipe03/src/rmqexample/blocking.

See also

You can find all the available methods of the consumer interface in the official Javadoc at

http://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Consumer.html

主站蜘蛛池模板: 马尔康县| 社会| 定安县| 潮州市| 高唐县| 祁门县| 庆阳市| 萨迦县| 平顶山市| 界首市| 河曲县| 皮山县| 西昌市| 平和县| 城步| 邵阳县| 华阴市| 宜章县| 吉林省| 响水县| 金湖县| 云安县| 新乐市| 十堰市| 镇远县| 榆树市| 安吉县| 和平县| 沅江市| 吉木萨尔县| 长垣县| 深圳市| 青神县| 凉城县| 南通市| 景德镇市| 博野县| 高清| 涞源县| 融水| 阜平县|