官术网_书友最值得收藏!

Custom partition

Remember that we talked about key serializer and value serializer as well as partitions used in Kafka producer. As of now, we have just used the default partitioner and inbuilt serializer. Let's see how we can create a custom partitioner.

Kafka generally selects a partition based on the hash value of the key specified in messages. If the key is not specified/null, it will distribute the message in a round-robin fashion. However, sometimes you may want to have your own partition logic so that records with the same partition key go to the same partition on the broker. We will see some best practices for partitions later in this chapter. Kafka provides you with an API to implement your own partition.

In most cases, a hash-based default partition may suffice, but for some scenarios where a percentage of data for one key is very large, we may be required to allocate a separate partition for that key. This means that if key K has 30 percent of total data, it will be allocated to partition N so that no other key will be assigned to partition N and we will not run out of space or slow down. There can be other use cases as well where you may want to write Custom Partition. Kafka provides the partitioner interface, which helps us create our own partition.

Here is an example in Java:

public class CustomePartition implements Partitioner {
public int partition(String topicName, Object key, byte[] keyBytes, Object value, byte[] valueByte, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topicName);

int numPartitions = partitions.size();
//Todo: Partition logic here
return 0;
}

public void close() {

}

public void configure(Map<String, ?> map) {

}
}

Scala:

class CustomPartition extends Partitioner {
override def close(): Unit = {}

override def partition(topicName: String, key: scala.Any, keyBytes: Array[Byte], value: scala.Any, valueBytes: Array[Byte], cluster: Cluster): Int = {

val partitions: util.List[PartitionInfo] = cluster.partitionsForTopic(topicName)

val numPartitions: Int = partitions.size

//TODO : your partition logic here
0
}

override def configure(map: util.Map[String, _]): Unit = {}
}
主站蜘蛛池模板: 康乐县| 香港 | 夹江县| 三河市| 乌恰县| 栾城县| 大丰市| 武夷山市| 徐闻县| 正阳县| 平顶山市| 太康县| 瑞安市| 灵丘县| 麻阳| 桃园县| 岳池县| 湖口县| 盘锦市| 抚宁县| 商河县| 万山特区| 安龙县| 大荔县| 凌云县| 舒兰市| 阜平县| 惠来县| 辽中县| 金川县| 莱西市| 大厂| 宜阳县| 施秉县| 仙游县| 定结县| 喀喇沁旗| 白沙| 偏关县| 揭西县| 布尔津县|