pom.xml:
4.0.0 com.kaven kafka 1.0-SNAPSHOT 8 8 org.apache.kafka kafka-clients 3.0.0
测试代码:
package com.kaven.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class ProducerTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
send("new-topic-user");
}
public static void send(String name) throws ExecutionException, InterruptedException {
Producer producer = ProducerTest.createProducer();
for (int i = 0; i < 10; i++) {
ProducerRecord producerRecord = new ProducerRecord<>(
name,
"key-" + i,
"value-" + i
);
// 异步发送并回调
producer.send(producerRecord, (metadata, exception) -> {
if(exception == null) {
System.out.println("partition: " + metadata.partition() + " offset: " + metadata.offset());
}
else {
exception.printStackTrace();
}
});
}
// 要关闭Producer实例
producer.close();
}
public static Producer createProducer() {
// Producer的配置
Properties properties = new Properties();
// 服务地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.7:9092");
// KEY的序列化器类
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// VALUE的序列化器类
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 分区器类
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.kaven.kafka.producer.PartitionLoadBalancer");
return new KafkaProducer<>(properties);
}
}
Producer自定义Partition负载均衡需要实现org.apache.kafka.clients.producer.Partitioner接口。
package com.kaven.kafka.producer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class PartitionLoadBalancer implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
int num = Integer.parseInt(((String) key).split("-")[1]) % cluster.partitionCountForTopic(topic);
System.out.println(key + " : " + num);
return num;
}
@Override
public void close() {
}
@Override
public void configure(Map configs) {
}
}
PartitionLoadBalancer根据消息key的最后一位数字(这里根据自己的需求来设计)来选择分区。
输出:
key-0 : 0 key-1 : 1 key-2 : 2 key-3 : 0 key-4 : 1 key-5 : 2 key-6 : 0 key-7 : 1 key-8 : 2 key-9 : 0 partition: 2 offset: 37 partition: 2 offset: 38 partition: 2 offset: 39 partition: 0 offset: 42 partition: 0 offset: 43 partition: 0 offset: 44 partition: 0 offset: 45 partition: 1 offset: 50 partition: 1 offset: 51 partition: 1 offset: 52
输出符合预期,Producer自定义Partition负载均衡就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。



