官术网_书友最值得收藏!

Point-to-point communication

The following diagram provides an overview of the scenario that we will implement:

For point-to-point communication, the sender can use either the default exchange or a direct exchange (that uses the routing key to determine to which queue a message must be sent; the routing key should match the binding key between the exchange and the queue). The CompetingReceiver class can be used to subscribe to a particular queue and receive messages from that queue:

import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;

public class CompetingReceiver {

    private final static String QUEUE_NAME = "event_queue";
    private final static Logger LOGGER =                         LoggerFactory.getLogger(Sender.class);
    private Connection connection = null;
    private Channel channel = null;
    public void initialize() {
        try {
            ConnectionFactory factory =                                     new ConnectionFactory();
            factory.setHost("localhost");
            connection = factory.newConnection();
            channel = connection.createChannel();
        } catch (IOException e) {
            LOGGER.error(e.getMessage(), e);
        }
    }
}

The receive() method is used to receive a message from the queue named event_queue by doing the following:

  • Creating the event_queue in the message broker, if not already created, using the queueDeclare() method
  • Creating a QueueingConsumer instance that is used as the handler for messages from the event_queue queue
  • Registering the QueueingConsumer as a message consumer using the basicConsume() method of the Channel instance that represents the AMQP channel to the message broker
  • Consuming a message from the event_queue queue using the nextDeliver() method of the QueueingConsumer instance, which blocks until a message arrives on the queue; QueueingConsumer.Delivery represents the received message:
        public String receive() {
            if (channel == null) {
                initialize();
            }
            String message = null;
            try {
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                QueueingConsumer consumer =                                     new QueueingConsumer(channel);
                channel.basicConsume(QUEUE_NAME, true,                                         consumer);
                QueueingConsumer.Delivery delivery =                             consumer.nextDelivery();
                message = new String(delivery.getBody());
                LOGGER.info("Message received: " + message);
                return message;
    
            } catch (IOException e) {
                LOGGER.error(e.getMessage(), e);
            } catch (ShutdownSignalException e) {
                LOGGER.error(e.getMessage(), e);
            } catch (ConsumerCancelledException e) {
                LOGGER.error(e.getMessage(), e);
            } catch (InterruptedException e) {
                LOGGER.error(e.getMessage(), e);
            }
            return message;
        }

The destroy() method closes the AMQP connection and must be called explicitly when needed; closing the connection closes all AMQP channels created in that connection:

    public void destroy() {
        if (connection != null) {
            try {
                connection.close();
            } catch (IOException e) {
                LOGGER.warn(e.getMessage(), e);
            }
        }
    }

In order to demonstrate the usage of the CompetingConsumer class in a point-to-point channel, we can use the DefaultExchangeSenderDemo class to send a message to the default exchange:

public class DefaultExchangeSenderDemo {

    public static void sendToDefaultExchange() {
        Sender sender = new Sender();
        sender.initialize();
        sender.send("Test message.");
        sender.destroy();
    }

    public static void main(String[] args) {
        sendToDefaultExchange();
    }
}

When invoking the main() method, a message is sent to the RabbitMQ server instance running on localhost; if no instance is running then a java.net.ConnectionException is thrown from the client. Assuming that there are no defined queues yet in the message broker, if you open the RabbitMQ management console you will notice the following before invoking the main() method:

After invoking the main() method, you will notice that the event_queue is created:

Moreover, there is one unprocessed message in the queue; the Ready section gives the number of unprocessed messages on the particular queue. In order to consume the message CompetingReceiverDemo class, perform the following:

public class CompetingReceiverDemo {

    public static void main(String[] args)                         throws InterruptedException {
        final CompetingReceiver receiver1 = new CompetingReceiver();
        receiver1.initialize();
        final CompetingReceiver receiver2 = new CompetingReceiver();
        receiver2.initialize();

        Thread t1 = new Thread(new Runnable() {
            public void run() {
                receiver1.receive();
            }
        });
        Thread t2 = new Thread(new Runnable() {
            public void run() {
                receiver2.receive();
            }
        });
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        receiver1.destroy();
        receiver2.destroy();
    }
}

We create two CompetingReceiver instances and invoke the receive() methods of the two instances in separate threads so that we have two subscribers for the same queue waiting for a message. The two threads are joined to the main application thread so that method execution continues once both consumers receive a message from the queue. Since our queue already has one message, one of the two consumers will receive the message while the other will continue to wait for a message. If we invoke the main() method of the DefaultExchangeSenderDemo class once again, the other consumer will also receive a message from the queue and the main() method of CompetingReceiverDemo() will terminate.

主站蜘蛛池模板: 泸定县| 荣昌县| 个旧市| 南溪县| 廊坊市| 鹤壁市| 丹棱县| 油尖旺区| 东兴市| 姚安县| 万州区| 朝阳县| 泽州县| 连江县| 巫山县| 紫阳县| 明光市| 遵义市| 遵义市| 湘乡市| 台湾省| 太保市| 长阳| 烟台市| 广州市| 牙克石市| 长沙市| 文昌市| 揭西县| 射洪县| 永兴县| 怀集县| 嵩明县| 习水县| 台南市| 井冈山市| 阿鲁科尔沁旗| 比如县| 苗栗市| 卢氏县| 延安市|