- Building Data Streaming Applications with Apache Kafka
- Manish Kumar Chanchal Singh
- 672字
- 2022-07-12 10:38:14
Producer object and ProducerRecord object
Producer accepts the ProducerRecord object to send records to the .ProducerRecord topic. It contains a topic name, partition number, timestamp, key, and value. Partition number, timestamp, and key are optional parameters, but the topic to which data will be sent and value that contains the data is mandatory.
- If the partition number is specified, then the specified partition will be used when sending the record
- If the partition is not specified but a key is specified, a partition will be chosen using a hash of the key
- If both key and partition are not specified, a partition will be assigned in a round-robin fashion
Here is the producerRecord in Java:
ProducerRecord producerRecord = new ProducerRecord<String, String>(topicName, data);
Future<RecordMetadata> recordMetadata = producer.send(producerRecord);
Here is an example of producerRecord in Scala:
val producerRecord = new ProducerRecord<String, String>(topicName, data);
val recordMetadata = producer.send(producerRecord);
We have different constructors available for ProducerRecord:
- Here is the first constructor for producerRecord:
ProducerRecord(String topicName, Integer numberOfpartition, K key, V value)
- The second constructor goes something like this:
ProducerRecord(String topicName, Integer numberOfpartition, Long timestamp, K key, V value)
- The third constructor is as follows:
ProducerRecord(String topicName, K key, V value)
- The final constructor of our discussion is as follows:
ProducerRecord(String topicName, V value)
Each record also has a timestamp associated with it. If we do not mention a timestamp, the producer will stamp the record with its current time. The timestamp eventually used by Kafka depends on the timestamp type configured for the particular topic:
- CreateTime: The timestamp of ProducerRecord will be used to append a timestamp to the data
- LogAppendTime: The Kafka broker will overwrite the timestamp of ProducerRecord to the message and add a new timestamp when the message is appended to the log
Once data is sent using the send() method, the broker persists that message to the partition log and returns RecordMetadata, which contains metadata of the server response for the record, which includes offset, checksum, timestamp, topic, serializedKeySize, and so on. We previously discussed common messaging publishing patterns. The sending of messages can be either synchronous or asynchronous.
Synchronous messaging: Producer sends a message and waits for brokers to reply. The Kafka broker either sends an error or RecordMetdata. We can deal with errors depending on their type. This kind of messaging will reduce throughput and latency as the producer will wait for the response to send the next message.
Generally, Kafka retries sending the message in case certain connection errors occur. However, errors related to serialization, message, and so on have to be handled by the application, and in such cases, Kafka does not try to resend the message and throws an exception immediately.
Java:
ProducerRecord producerRecord = new ProducerRecord<String, String>(topicName, data);
Object recordMetadata = producer.send(producerRecord).get();
Scala:
val producerRecord = new ProducerRecord<String, String>(topicName, data);
val recordMetadata = producer.send(producerRecord);
Asynchronous messaging: Sometimes, we have a scenario where we do not want to deal with responses immediately or we do not care about losing a few messages and we want to deal with it after some time.
Kafka provides us with the callback interface that helps in dealing with message reply, irrespective of error or successful. send() can accept an object that implements the callback interface.
send(ProducerRecord<K,V> record,Callback callback)
The callback interface contains the onCompletion method, which we need to override. Let's look at the following example:
Here is the example in Java:
public class ProducerCallback implements Callback {
public void onCompletion(RecordMetadata recordMetadata, Exception ex) {
if(ex!=null){
//deal with exception here
}
else{
//deal with RecordMetadata here
}
}
}
Scala:
class ProducerCallback extends Callback {
override def onCompletion(recordMetadata: RecordMetadata, ex: Exception): Unit = {
if (ex != null) {
//deal with exception here
}
else {
//deal with RecordMetadata here
}
}
}
Once we have the Callback class implemented, we can simply use it in the send method as follows:
val callBackObject = producer.send(producerRecord,new ProducerCallback());
If Kafka has thrown an exception for the message, we will not have a null exception object. We can also deal with successful and error messages accordingly in onCompletion().
- HornetQ Messaging Developer’s Guide
- AngularJS Testing Cookbook
- Learning Chef
- Android開發精要
- WSO2 Developer’s Guide
- Responsive Web Design with HTML5 and CSS3
- C程序設計實踐教程
- Mastering Android Development with Kotlin
- Oracle GoldenGate 12c Implementer's Guide
- Raspberry Pi Robotic Projects(Third Edition)
- Hadoop大數據分析技術
- Android系統下Java編程詳解
- Java Web開發基礎與案例教程
- Yii2 By Example
- 高質量程序設計指南:C++/C語言