In this recipe we are closing the loop; we have already seen how to send messages to RabbitMQ—or to any AMQP broker—and now we are ready to learn how to retrieve them.
You can find the source code of the recipe at Chapter01/Recipe03/src/rmqexample/nonblocking.
Getting ready
To use this recipe we need to set up the Java development environment as indicated in the introduction.
How to do it…
In order to consume the messages sent as seen in the previous recipe, perform the following steps:
Declare the queue where we want to consume the messages from:
After we have established the connection and the channel to the AMQP broker as seen in the Connecting to the broker recipe, we need to ensure that the queue from which we are going to consume the messages exists (step 1).
In fact it is possible that the consumer is started before any producer has sent a message to the queue and the queue itself may actually not exist at all. To avoid the failure of the subsequent operations on the queue, we need to declare the queue.
Tip
By allowing both producers and consumers to declare the same queue, we are decoupling their existence; the order in which we start them is not important.
The heart of this recipe is step 2. Here we have defined our specialized consumer that overrides handleDelivery() and instantiated it in step 3. In the Java client API the consumer callbacks are defined by the com.rabbitmq.client.Consumer interface. We have extended our consumer from DefaultConsumer, which provides a no-operation implementation for all the methods declared in the Consumer interface.
In step 3, by calling channel.basicConsume(), we let the consumer threads start consuming messages. The consumers of each channel are always executed on the same thread, independent of the calling one.
Now that we have activated a consumer for myQueue, the Java client library starts getting messages from that queue on the RabbitMQ broker, and invokes handleDelivery() for each one.
Then after the channel.basicConsume() method's invocation, we just sit idle waiting for a key press in the main thread. Messages are being consumed with nonblocking semantics respecting the event-driven paradigm, typical of messaging applications.
Only after we press Enter, the execution proceeds to step 5, cancelling the consumer. At this point the consumer threads stop invoking our consumer object, and we can release the resources and exit.
There's more…
In this section we will learn more about consumer threads and the use of blockage semantics.
More on consumer threads
At connection definition time, the RabbitMQ Java API allocates a thread pool from which it will allocate consumer threads on need.
All the consumers bound to one channel will be executed by one single thread in the pool; however, it is possible that consumers from different channels are handled by the same thread. That's why it is important to avoid long-lasting operations in the consumer methods, in order to avoid blocking other consumers.
It is also possible to handle the consumer thread pool by ourselves, as we have shown in our example; however, this not obligatory at all. We have defined a thread pool, java.util.concurrent.ExecutorService, and passed it at connection time:
As we were managing it, we were also in charge of terminating it:
eService.shutdown();
However, remember that if you don't define your own ExecutorService thread pool, the Java client library will create one during connection creation time, and destroy it as soon as we destroy the corresponding connections.
Blocking semantics
It is possible to use blocking semantics too, but we strongly discourage this approach if it's not being used for simple applications and test cases; the recipe to consume messages is non-blocking.
However, you can find the source code for the blocking approach at Chapter01/Recipe03/src/rmqexample/blocking.
See also
You can find all the available methods of the consumer interface in the official Javadoc at