- Building Data Streaming Applications with Apache Kafka
- Manish Kumar Chanchal Singh
- 613字
- 2022-07-12 10:38:17
Committing and polling
Polling is fetching data from the Kafka topic. Kafka returns the messages that have not yet been read by consumer. How does Kafka know that consumer hasn't read the messages yet?
Consumer needs to tell Kafka that it needs data from a particular offset and therefore, consumer needs to store the latest read message somewhere so that in case of consumer failure, consumer can start reading from the next offset.
Kafka commits the offset of messages that it reads successfully. There are different ways in which commit can happen and each way has its own pros and cons. Let's start looking at the different ways available:
- Auto commit: This is the default configuration of consumer. Consumer auto-commits the offset of the latest read messages at the configured interval of time. If we make enable.auto.commit = true and set auto.commit.interval.ms=1000, then consumer will commit the offset every second. There are certain risks associated with this option. For example, you set the interval to 10 seconds and consumer starts consuming the data. At the seventh second, your consumer fails, what will happen? Consumer hasn't committed the read offset yet so when it starts again, it will start reading from the start of the last committed offset and this will lead to duplicates.
- Current offset commit: Most of the time, we may want to have control over committing an offset when required. Kafka provides you with an API to enable this feature. We first need to do enable.auto.commit = false and then use the commitSync() method to call a commit offset from the consumer thread. This will commit the latest offset returned by polling. It would be better to use this method call after we process all instances of ConsumerRecord, otherwise there is a risk of losing records if consumer fails in between.
Java:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(2);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %sn",
record.offset(), record.key(), record.value());
try {
consumer.commitSync();
} catch (CommitFailedException ex) {
//Logger or code to handle failed commit
}
}
Scala:
while (true) {
val records: ConsumerRecords[String, String] = consumer.poll(2)
import scala.collection.JavaConversions._
for (record <- records) println("offset = %d, key = %s, value = %sn", record.offset, record.key, record.value)
try
consumer.commitSync()
catch {
case ex: CommitFailedException => {
//Logger or code to handle failed commit
}
}
}
- Asynchronous commit: The problem with synchronous commit is that unless we receive an acknowledgment for a commit offset request from the Kafka server, consumer will be blocked. This will cost low throughput. It can be done by making commit happen asynchronously. However, there is a problem in asynchronous commit--it may lead to duplicate message processing in a few cases where the order of the commit offset changes. For example, offset of message 10 got committed before offset of message 5. In this case, Kafka will again serve message 5-10 to consumer as the latest offset 10 is overridden by 5.
Java:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(2);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %sn",
record.offset(), record.key(), record.value());
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
}
});
}
Scala:
while (true) {
val records: ConsumerRecords[String, String] = consumer.poll(2)
for (record <- records) println("offset = %d, key = %s, value = %sn", record.offset, record.key, record.value)
consumer.commitAsync(new OffsetCommitCallback {
override def onComplete(map: util.Map[TopicPartition, OffsetAndMetadata], ex: Exception): Unit = {
}
})
}
You have learned about synchronous and asynchronous calls. However, the best practice is to use a combination of both. Asynchronous should be used after every poll call and synchronous should be used for behaviors such as the triggering of the rebalancer, closing consumer due to some condition, and so on.
Kafka also provides you with an API to commit a specific offset.
- 解構產品經理:互聯網產品策劃入門寶典
- Hands-On RESTful Web Services with Go
- 從零開始學C語言
- 軟件測試教程
- OpenCV 3 Blueprints
- Android應用開發實戰
- R語言:邁向大數據之路(加強版)
- Python大規模機器學習
- ABAQUS6.14中文版有限元分析與實例詳解
- Continuous Delivery and DevOps:A Quickstart Guide Second Edition
- 軟件測試技術
- Learning ECMAScript 6
- Java EE程序設計與開發實踐教程
- Python Django Web從入門到項目實戰(視頻版)
- 鋁合金陽極氧化與表面處理技術(第三版)