官术网_书友最值得收藏!

Consumer configuration

Creating Kafka consumer also requires a few mandatory properties to be set. There are basically four properties:

  • bootstrap.servers: This property is similar to what we defined in Chapter 3, Deep Dive into Kafka Producers, for producer configuration. It takes a list of Kafka brokers' IPs.
  • key.deserializer: This is similar to what we specified in producer. The difference is that in producer, we specified the class that can serialize the key of the message. Serialize means converting a key to a ByteArray. In consumer, we specify the class that can deserialize the ByteArray to a specific key type. Remember that the serializer used in producer should match with the equivalent deserializer class here; otherwise, you may get a serialization exception.
  • value.deserializer: This property is used to deserialize the message. We should make sure that the deserializer class should match with the serializer class used to produce the data; for example, if we have used StringSerializer to serialize the message in producer, we should use StringDeserializer to deserialize the message.
  • group.id: This property is not mandatory for the creation of a property but recommended to use while creating. You have learned in the previous section about consumer groups and their importance in performance. Defining a consumer group while creating an application always helps in managing consumers and increasing performance if needed.

Let's see how we set and create this in the real programming world.

Java:

Properties consumerProperties = new Properties();
consumerProperties.put("bootstrap.servers", "10.200.99.197:6667");
consumerProperties.put("group.id", "Demo");
consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(consumerProperties);

Scala:

val consumerProperties: Properties = new Properties(); 
consumerProperties.put("bootstrap.servers", "10.200.99.197:6667")
consumerProperties.put("group.id", "consumerGroup1")
consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](consumerProperties)

The preceding code contains three specific things:

  • Properties object: This object is used to initialize consumer properties. Mandatory properties discussed earlier can be set as a key-value pair, where the key would be the property name and value would be the value for the key.
  • Deserializer: This is also a mandatory property that tells which deserializer class is to be used to convert ByteArray to the required object. Class can be different for key and value, but it should align with the serializer class used in producer while publishing data to the topic. Any mismatch will lead to a serialization exception.
  • KafkaConsumer: Once properties are set, we can create a consumer object by passing this property to the class. Properties tell the consumer object about brokers IP to connect, the group name that the consumer should be part of, the deserialization class to use, and offset strategy to be used for the commit.
主站蜘蛛池模板: 漳平市| 寿阳县| 沾益县| 略阳县| 洛隆县| 甘德县| 巴南区| 图木舒克市| 浮山县| 盐池县| 博罗县| 瑞金市| 唐山市| 桂阳县| 普兰店市| 五台县| 邵东县| 东平县| 阿坝| 利津县| 唐海县| 烟台市| 杭锦后旗| 延长县| 翼城县| 临湘市| 彩票| 牡丹江市| 丰城市| 聊城市| 丰顺县| 红安县| 安康市| 石阡县| 缙云县| 泰州市| 鄂托克旗| 炎陵县| 黔西县| 盐亭县| 施秉县|