官术网_书友最值得收藏!

Producer object and ProducerRecord object

Producer accepts the ProducerRecord object to send records to the .ProducerRecord topic. It contains a topic name, partition number, timestamp, key, and value. Partition number, timestamp, and key are optional parameters, but the topic to which data will be sent and value that contains the data is mandatory.

  • If the partition number is specified, then the specified partition will be used when sending the record
  • If the partition is not specified but a key is specified, a partition will be chosen using a hash of the key
  • If both key and partition are not specified, a partition will be assigned in a round-robin fashion

Here is the producerRecord in Java:

ProducerRecord producerRecord = new ProducerRecord<String, String>(topicName, data);
Future<RecordMetadata> recordMetadata = producer.send(producerRecord);

Here is an example of producerRecord in Scala:

val producerRecord = new ProducerRecord<String, String>(topicName, data);
val recordMetadata = producer.send(producerRecord);

We have different constructors available for ProducerRecord:

  • Here is the first constructor for producerRecord:
ProducerRecord(String topicName, Integer numberOfpartition, K key, V value)
  • The second constructor goes something like this:
ProducerRecord(String topicName, Integer numberOfpartition, Long timestamp, K key, V value)
  • The third constructor is as follows:
ProducerRecord(String topicName, K key, V value)
  • The final constructor of our discussion is as follows:
ProducerRecord(String topicName, V value)

Each record also has a timestamp associated with it. If we do not mention a timestamp, the producer will stamp the record with its current time. The timestamp eventually used by Kafka depends on the timestamp type configured for the particular topic:

  • CreateTime: The timestamp of ProducerRecord will be used to append a timestamp to the data
  • LogAppendTime: The Kafka broker will overwrite the timestamp of ProducerRecord to the message and add a new timestamp when the message is appended to the log

Once data is sent using the send() method, the broker persists that message to the partition log and returns RecordMetadata, which contains metadata of the server response for the record, which includes offset, checksum, timestamp, topic, serializedKeySize, and so on. We previously discussed common messaging publishing patterns. The sending of messages can be either synchronous or asynchronous.

Synchronous messaging: Producer sends a message and waits for brokers to reply. The Kafka broker either sends an error or RecordMetdata. We can deal with errors depending on their type. This kind of messaging will reduce throughput and latency as the producer will wait for the response to send the next message.

Generally, Kafka retries sending the message in case certain connection errors occur. However, errors related to serialization, message, and so on have to be handled by the application, and in such cases, Kafka does not try to resend the message and throws an exception immediately.

Java:

ProducerRecord producerRecord = new ProducerRecord<String, String>(topicName, data);

Object recordMetadata = producer.send(producerRecord).get();

Scala:

val producerRecord = new ProducerRecord<String, String>(topicName, data);

val recordMetadata = producer.send(producerRecord);

Asynchronous messaging: Sometimes, we have a scenario where we do not want to deal with responses immediately or we do not care about losing a few messages and we want to deal with it after some time.

Kafka provides us with the callback interface that helps in dealing with message reply, irrespective of error or successful. send() can accept an object that implements the callback interface.

send(ProducerRecord<K,V> record,Callback callback)

The callback interface contains the onCompletion method, which we need to override. Let's look at the following example:

Here is the example in Java:

public class ProducerCallback implements Callback {
public void onCompletion(RecordMetadata recordMetadata, Exception ex) {
if(ex!=null){
//deal with exception here
}
else{
//deal with RecordMetadata here
}
}
}

Scala:

class ProducerCallback extends Callback {
override def onCompletion(recordMetadata: RecordMetadata, ex: Exception): Unit = {
if (ex != null) {
//deal with exception here
}
else {
//deal with RecordMetadata here
}
}
}

Once we have the Callback class implemented, we can simply use it in the send method as follows:

val callBackObject = producer.send(producerRecord,new ProducerCallback());

If Kafka has thrown an exception for the message, we will not have a null exception object. We can also deal with successful and error messages accordingly in onCompletion().

主站蜘蛛池模板: 银川市| 珠海市| 澳门| 彭州市| 兴和县| 郑州市| 喀喇| 巴彦淖尔市| 封丘县| 余干县| 虎林市| 江华| 惠东县| 交城县| 东兴市| 莱阳市| 辉南县| 平南县| 论坛| 平湖市| 昭通市| 永吉县| 湖南省| 皋兰县| 邯郸市| 灌云县| 蒲江县| 镇安县| 安新县| 溧水县| 丽水市| 平顶山市| 仁布县| 青海省| 巴塘县| 陆河县| 霞浦县| 蒙阴县| 阳西县| 茌平县| 沅陵县|