- Building Data Streaming Applications with Apache Kafka
- Manish Kumar Chanchal Singh
- 613字
- 2022-07-12 10:38:17
Committing and polling
Polling is fetching data from the Kafka topic. Kafka returns the messages that have not yet been read by consumer. How does Kafka know that consumer hasn't read the messages yet?
Consumer needs to tell Kafka that it needs data from a particular offset and therefore, consumer needs to store the latest read message somewhere so that in case of consumer failure, consumer can start reading from the next offset.
Kafka commits the offset of messages that it reads successfully. There are different ways in which commit can happen and each way has its own pros and cons. Let's start looking at the different ways available:
- Auto commit: This is the default configuration of consumer. Consumer auto-commits the offset of the latest read messages at the configured interval of time. If we make enable.auto.commit = true and set auto.commit.interval.ms=1000, then consumer will commit the offset every second. There are certain risks associated with this option. For example, you set the interval to 10 seconds and consumer starts consuming the data. At the seventh second, your consumer fails, what will happen? Consumer hasn't committed the read offset yet so when it starts again, it will start reading from the start of the last committed offset and this will lead to duplicates.
- Current offset commit: Most of the time, we may want to have control over committing an offset when required. Kafka provides you with an API to enable this feature. We first need to do enable.auto.commit = false and then use the commitSync() method to call a commit offset from the consumer thread. This will commit the latest offset returned by polling. It would be better to use this method call after we process all instances of ConsumerRecord, otherwise there is a risk of losing records if consumer fails in between.
Java:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(2);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %sn",
record.offset(), record.key(), record.value());
try {
consumer.commitSync();
} catch (CommitFailedException ex) {
//Logger or code to handle failed commit
}
}
Scala:
while (true) {
val records: ConsumerRecords[String, String] = consumer.poll(2)
import scala.collection.JavaConversions._
for (record <- records) println("offset = %d, key = %s, value = %sn", record.offset, record.key, record.value)
try
consumer.commitSync()
catch {
case ex: CommitFailedException => {
//Logger or code to handle failed commit
}
}
}
- Asynchronous commit: The problem with synchronous commit is that unless we receive an acknowledgment for a commit offset request from the Kafka server, consumer will be blocked. This will cost low throughput. It can be done by making commit happen asynchronously. However, there is a problem in asynchronous commit--it may lead to duplicate message processing in a few cases where the order of the commit offset changes. For example, offset of message 10 got committed before offset of message 5. In this case, Kafka will again serve message 5-10 to consumer as the latest offset 10 is overridden by 5.
Java:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(2);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %sn",
record.offset(), record.key(), record.value());
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
}
});
}
Scala:
while (true) {
val records: ConsumerRecords[String, String] = consumer.poll(2)
for (record <- records) println("offset = %d, key = %s, value = %sn", record.offset, record.key, record.value)
consumer.commitAsync(new OffsetCommitCallback {
override def onComplete(map: util.Map[TopicPartition, OffsetAndMetadata], ex: Exception): Unit = {
}
})
}
You have learned about synchronous and asynchronous calls. However, the best practice is to use a combination of both. Asynchronous should be used after every poll call and synchronous should be used for behaviors such as the triggering of the rebalancer, closing consumer due to some condition, and so on.
Kafka also provides you with an API to commit a specific offset.
- Data Visualization with D3 4.x Cookbook(Second Edition)
- GraphQL學習指南
- 動手玩轉Scratch3.0編程:人工智能科創教育指南
- R語言游戲數據分析與挖掘
- Web Development with Django Cookbook
- Python 3網絡爬蟲實戰
- Learn WebAssembly
- TradeStation交易應用實踐:量化方法構建贏家策略(原書第2版)
- Arduino可穿戴設備開發
- Mastering jQuery Mobile
- Mastering Python
- LibGDX Game Development By Example
- Mastering VMware vSphere Storage
- OpenStack Networking Cookbook
- 信息安全技術(第2版)