开发自定义分区器
上一节我们看到,如果在发送消息的时候没有指定对应的分区,会使用默认分区器对消息进行分区,这一节我们试着写一个自己的分区器
默认分区器源码阅读
- 进入默认分区器
1 | org.apache.kafka.clients.producer.internals.DefaultPartitioner |
1 | ********** |
自定义分区器
implements Partitioner
1
public class MyPartition implements Partitioner {
重写partition、close、configure方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
System.out.println("Customed Partitioner is running...");
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null){
throw new InvalidRecordException("key cannot be null..");
}else {
if (((String)key).equals("1")){
return 1;
}else {
return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions));
}
}
}
public void close() {
}
public void configure(Map<String, ?> configs) {
}使用
1
kafkaProps.put("partitioner.class","com.shiyanlou.MyPartition");
demo
1 | package com.shiyanlou; |
1 | package com.shiyanlou.producer; |