官术网_书友最值得收藏!

Using body serialization with JSON

In AMQP the messages are opaque entities; AMQP does not provide any standard way to encode/decode them.

However, web applications very often use JSON as an application layer format, that is, the JavaScript serialization format that has become a de-facto standard; in this way, the RabbitMQ client Java library can include some utility functions for this task.

On the other side, this is not the only protocol; any application can choose its own protocol (XML, Google Protocol Buffers, ASN.1, or proprietary).

In this example we are showing how to use the JSON protocol to encode and decode message bodies. We are using a publisher written in Java (Chapter01/Recipe04/Java_4/src/rmqexample) and a consumer in Python (Chapter01/Recipe04/Python04).

Getting ready

To use this recipe you will need to set up Java and Python environments as described in the introduction.

How to do it…

To implement a Java producer and a Python consumer, you can perform the following steps:

  1. Java: In addition to the standard import described in the recipe Connecting to the broker, we have to import:
    import
    com.rabbitmq.tools.json.JSONWriter;
  2. Java: We create a queue that is not persistent:
    String myQueue="myJSONBodyQueue_4";
      channel.queueDeclare(MyQueue, false, false, false, null);
  3. Java: We create a list for the Book class and fill it with example data:
    List<Book>newBooks = new ArrayList<Book>();
      for (inti = 1; i< 11; i++) {
        Book book = new Book();
        book.setBookID(i);
        book.setBookDescription("History VOL: " + i  );
        book.setAuthor("John Doe");
        newBooks.add(book);
      }
  4. Java: We are ready to serialize the newBooks instance with JSONwriter:
    JSONWriter rabbitmqJson = new JSONWriter();
    String jsonmessage = rabbitmqJson.write(newBooks);
  5. Java: We can finally send our jsonmessage:
    channel.basicPublish("",MyQueue,null, jsonmessage.getBytes());
  6. Python: To use the Pika library we must add the follow import:
    import pika;
    import json;

    Python has a powerful built-in library for JSON.

  7. Python: In order to create a connection to RabbitMQ, use the following code:
    connection = pika.BlockingConnection(pika.ConnectionParameters(rabbitmq_host));
  8. Python: Let's declare a queue, bind as a consumer, and then register a callback:
    channel = connection.channel()
    my_queue = "myJSONBodyQueue_4"
    channel.queue_declare(queue=my_queue)
    channel.basic_consume(consumer_callback, queue=my_queue, no_ack=True) 
    channel.start_consuming()

How it works…

After we set up the environments (step 1 and step 2), we serialize the newbooks class with the method write(newbooks). The method returns a JSON String (jsonmessage) as shown in the following code snippet:

[
  {
    "author" : "John Doe",
    "bookDescription" : "History VOL: 1",
    "bookID" : 1
  },
  {
    "author" : "John Doe",
    "bookDescription" : "History VOL: 2",
    "bookID" : 2
  }
]

In step 4 we publish jsonmessage to the queue myJSONBodyQueue_4. Now the Python Consumer can get the message from the same queue. Let's see how to do it in Python:

  connection = pika.BlockingConnection(pika.ConnectionParameters(rabbitmq_host));
  channel = connection.channel()
  queue_name = "myJSONBodyQueue_4"
  channel.queue_declare(queue=my_queue)
  ..
  channel.basic_consume(consumer_callback, queue=my_queue, no_ack=True) 
  channel.start_consuming()

As we have seen in the Java implementation, we must create a connection and then create a channel. With the method channel.queue_declare(queue=myQueue), we declare a queue that is not durable, exclusive or autodelete. In order to change the queue's attribute, it's enough to add the parameter in the queue_declare method as follows:

channel.queue_declare(queue=myQueue,durable=True)

Tip

When different AMQP clients declare the same queue, it's important that all of them specify the same durable, exclusive, and autodelete attributes. Otherwise, channel.queue_declare() will raise an exception.

With the method channel.basic_consume(), the client starts consuming messages from the given queue, invoking the callback consumer_callback()where it will receive the messages.

While the callbacks in Java were defined in the consumer interface, in Python they are just passed to basic_consume(), in spite of the more functional, less declarative, and less formal paradigm typical of Python.

The callback consumer_callback is as follows:

def consumer_callback(ch, method, properties, body):
  newBooks=json.loads(body); 
  print" Count books:",len(newBooks);
  for item in newBooks:
    print 'ID:',item['bookID'], '-Description:',item['bookDescription'],' -Author:',item['author']

The callback takes the message, deserializes it with json.loads(), and then the newBooks structure is ready to be read.

There's more…

The JSON helper tools included in the RabbitMQ client library are very simple, but in a real project you can evaluate them to use an external JSON library. For example, a powerful Java JSON library is google-gson (https://code.google.com/p/google-gson/) or jackson (http://jackson.codehaus.org/).

主站蜘蛛池模板: 长武县| 邮箱| 沐川县| 南漳县| 井研县| 开平市| 安达市| 碌曲县| 绥阳县| 交口县| 凤冈县| 康平县| 玉林市| 阳泉市| 双桥区| 建宁县| 凤翔县| 莱州市| 杂多县| 泗阳县| 镇江市| 柳江县| 册亨县| 通渭县| 扶风县| 凤山市| 镇沅| 枣庄市| 江都市| 邳州市| 武陟县| 铜梁县| 江陵县| 当阳市| 平和县| 孙吴县| 彩票| 和林格尔县| 浏阳市| 乃东县| 类乌齐县|