官术网_书友最值得收藏!

Rebalance listeners

We discussed earlier that in case of addition or removal of consumer to the consumer group, Kafka triggers the rebalancer and consumer loses the ownership of the current partition. This may lead to duplicate processing when the partition is reassigned to consumer. There are some other operations such as database connection operation, file operation, or caching operations that may be part of consumer; you may want to deal with this before ownership of the partition is lost.

Kafka provides you with an API to handle such scenarios. It provides the ConsumerRebalanceListener interface that contains the onPartitionsRevoked() and onPartitionsAssigned() methods. We can implement these two methods and pass an object while subscribing to the topic using the subscribe method discussed earlier:

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;

import java.util.Collection;

public class DemoRebalancer implements ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
//TODO: Things to Do before your partition got revoked
}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
//TODO : Things to do when new partition get assigned
}
}
主站蜘蛛池模板: 阳曲县| 东乡族自治县| 泾源县| 鞍山市| 安远县| 金湖县| 沾益县| 湘阴县| 塔城市| 宜城市| 太仆寺旗| 大关县| 常熟市| 黄大仙区| 扬中市| 天等县| 安康市| 宝清县| 修水县| 黄石市| 尼玛县| 唐山市| 卢湾区| 虹口区| 滁州市| 临沭县| 淮阳县| 紫金县| 巴南区| 加查县| 遵义市| 游戏| 无极县| 噶尔县| 凉山| 曲麻莱县| 轮台县| 油尖旺区| 哈巴河县| 隆子县| 平南县|