官术网_书友最值得收藏!

Committing and polling

Polling is fetching data from the Kafka topic. Kafka returns the messages that have not yet been read by consumer. How does Kafka know that consumer hasn't read the messages yet?

Consumer needs to tell Kafka that it needs data from a particular offset and therefore, consumer needs to store the latest read message somewhere so that in case of consumer failure, consumer can start reading from the next offset.

Kafka commits the offset of messages that it reads successfully. There are different ways in which commit can happen and each way has its own pros and cons. Let's start looking at the different ways available:

  • Auto commit: This is the default configuration of consumer. Consumer auto-commits the offset of the latest read messages at the configured interval of time. If we make enable.auto.commit = true and set auto.commit.interval.ms=1000, then consumer will commit the offset every second. There are certain risks associated with this option. For example, you set the interval to 10 seconds and consumer starts consuming the data. At the seventh second, your consumer fails, what will happen? Consumer hasn't committed the read offset yet so when it starts again, it will start reading from the start of the last committed offset and this will lead to duplicates.
  • Current offset commit: Most of the time, we may want to have control over committing an offset when required. Kafka provides you with an API to enable this feature. We first need to do enable.auto.commit = false and then use the commitSync() method to call a commit offset from the consumer thread. This will commit the latest offset returned by polling. It would be better to use this method call after we process all instances of ConsumerRecord, otherwise there is a risk of losing records if consumer fails in between.

Java:

while (true) {
ConsumerRecords<String, String> records = consumer.poll(2);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %sn",
record.offset(), record.key(), record.value());
try {
consumer.commitSync();
} catch (CommitFailedException ex) {
//Logger or code to handle failed commit
}
}

Scala:

while (true) {
val records: ConsumerRecords[String, String] = consumer.poll(2)
import scala.collection.JavaConversions._
for (record <- records) println("offset = %d, key = %s, value = %sn", record.offset, record.key, record.value)

try
consumer.commitSync()

catch {
case ex: CommitFailedException => {
//Logger or code to handle failed commit
}
}
}
  • Asynchronous commit: The problem with synchronous commit is that unless we receive an acknowledgment for a commit offset request from the Kafka server, consumer will be blocked. This will cost low throughput. It can be done by making commit happen asynchronously. However, there is a problem in asynchronous commit--it may lead to duplicate message processing in a few cases where the order of the commit offset changes. For example, offset of message 10 got committed before offset of message 5. In this case, Kafka will again serve message 5-10 to consumer as the latest offset 10 is overridden by 5.

Java:

while (true) {
ConsumerRecords<String, String> records = consumer.poll(2);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %sn",
record.offset(), record.key(), record.value());
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {

}
});

}

Scala:

while (true) {
val records: ConsumerRecords[String, String] = consumer.poll(2)
for (record <- records) println("offset = %d, key = %s, value = %sn", record.offset, record.key, record.value)

consumer.commitAsync(new OffsetCommitCallback {
override def onComplete(map: util.Map[TopicPartition, OffsetAndMetadata], ex: Exception): Unit = {
}
})
}

You have learned about synchronous and asynchronous calls. However, the best practice is to use a combination of both. Asynchronous should be used after every poll call and synchronous should be used for behaviors such as the triggering of the rebalancer, closing consumer due to some condition, and so on.

Kafka also provides you with an API to commit a specific offset.

主站蜘蛛池模板: 怀柔区| 三河市| 额尔古纳市| 沂水县| 平塘县| 江津市| 夹江县| 美姑县| 南部县| 上蔡县| 商丘市| 上犹县| 增城市| 凌云县| 江口县| 玛曲县| 阿坝县| 通化市| 米泉市| 葵青区| 廉江市| 司法| 合阳县| 田东县| 伊金霍洛旗| 康平县| 太仆寺旗| 三河市| 黄石市| 宾川县| 涡阳县| 达孜县| 京山县| 苍溪县| 长丰县| 中宁县| 连城县| 扶绥县| 盐亭县| 子长县| 山阳县|