背景:kafka 客户端之producer API发送消息以及简单源码分析已经介绍了producer的异步发送和异步回调发送消息的基本使用,但是都是使用内置的负载均衡策略。kafka的负载均衡是在客户端实现的。
自定义负载均衡实现在某些特殊的业务场景下我们经常会有自定义负载均衡算法的需求,在Kafka中可以通过实现Partitioner接口来自定义Partition负载均衡器。
kafka自带的有三种实现
DefaultPartitioner:如果record中指定了分区,则使用它;如果未指定分区但存在key,则根据key的hash选择分区;如果不存在分区或key,则选择在批处理已满时更改的sticky partition。UniformStickyPartitioner:如果record中指定了分区,则使用它;否则选择batch已满时更改的sticky partition。 注意:与 DefaultPartitioner 相比,record key不用作此分区器中分区策略的一部分。 具有相同键的record不保证发送到同一个分区。 有关sticky partition的详细信息,请参阅 KIP-480RoundRobinPartitioner: “循环”分区器 当用户希望将写入平均分配到所有分区时,可以使用此分区策略。这是与record key hash无关的行为
自己实现Partitioner接口
public class MyPartition implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
int partitionsNum = cluster.partitionsForTopic(topic).size();
// 使用 ThreadLocalRandom 产生随机数
return ThreadLocalRandom.current().nextInt(partitionsNum);
}
@Override
public void close() {
}
@Override
public void configure(Map configs) {
}
}
new 一个producer实例的时候,把自己的负载均衡实现类的全路径名导入进去。
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.xt.kafkademo.producer.MyPartition");Producer异步发送消息(带回调函数和自定义Partition负载均衡)
public static void producerSendWithCallbackAndPartition(Producer完整代码producer){ // 消息对象 - ProducerRecoder for(int i=0;i<10;i++){ ProducerRecord record = new ProducerRecord<>(TOPIC_NAME,"key-"+i,"value-"+i); producer.send(record, new Callback() { @Override public void onCompletion(Recordmetadata recordmetadata, Exception e) { if(e != null){ e.printStackTrace(); }else{ System.out.println("partition : "+recordmetadata.partition()+" , offset : "+recordmetadata.offset()); } } }); } // 所有的通道打开都需要关闭 producer.close(); }
public class ProducerSample {
private final static String TOPIC_NAME="xt";
public static Producer createProducer(boolean mypartition) {
Properties properties = new Properties();
//配置文件里面的变量都是静态final类型的,并且都有默认的值
//用于建立与 kafka 集群连接的 host/port
//继承的hashtable,保证了线程安全
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka服务器IP:9092");
properties.put(ProducerConfig.ACKS_CONFIG,"all");
properties.put(ProducerConfig.RETRIES_CONFIG,"0");
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG,"5000");
//将消息发送到kafka server, 所以肯定需要用到序列化的操作 我们这里发送的消息是string类型的,所以使用string的序列化类
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
if(mypartition == true){
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.xt.kafkademo.producer.MyPartition");
}
return new KafkaProducer<>(properties);
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
// Producer的主对象
Producer producer = ProducerSample.createProducer(true);
// Producer异步发送带回调函数和Partition负载均衡
producerSendWithCallbackAndPartition(producer);
}
public static void producerSendWithCallbackAndPartition(Producer producer){
// 消息对象 - ProducerRecoder
for(int i=0;i<10;i++){
ProducerRecord record =
new ProducerRecord<>(TOPIC_NAME,"key-"+i,"value-"+i);
producer.send(record, new Callback() {
@Override
public void onCompletion(Recordmetadata recordmetadata, Exception e) {
if(e != null){
e.printStackTrace();
}else{
System.out.println("partition : "+recordmetadata.partition()+" , offset : "+recordmetadata.offset());
}
}
});
}
// 所有的通道打开都需要关闭
producer.close();
}
}
kafka是怎样调用我们的负载均衡实现类的
在自己的负载均衡类上的方法打断点
然后debug运行自己的主程序
我们可以发现是如下代码调用的我们自己的实现
KafkaProducer的 partition方法
private int partition(ProducerRecordrecord, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { Integer partition = record.partition(); return partition != null ? partition : partitioner.partition( record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); }
而这个partitioner实例正是我们自己实现的那个负载均衡类的实例,他在KafkaProducer类中被定义为了私有 final 字段。
private final Partitioner partitioner;
而partitioner 是通过如下形式来实例化的
this.partitioner = config.getConfiguredInstance(
ProducerConfig.PARTITIONER_CLASS_CONFIG,
Partitioner.class,
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
config是一个ProducerConfig实例,是在KafkaProducer的构造函数new出来的。如下代码所示。而下面的Map
public KafkaProducer(Mapconfigs, Serializer keySerializer, Serializer valueSerializer) { this(new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, keySerializer, valueSerializer)), keySerializer, valueSerializer, null, null, null, Time.SYSTEM); }
那config是怎么去new 我们自定义负载均衡实现类的呢?
先前我们说到了config是由我们传入的配置信息new 出来的,而他是一个ProducerConfig类的实例,他既然保存了我们的传入的配置信息,然后他再找出来是易于反掌
如下,已经拿到了我们自定义负载均衡类的全路径
如下就是AbstractConfig类保存配置信息的字段,是一个Map,虽然是一个私有字段,但是ProducerConfig类的实例通过从AbstractConfig类继承的getClass方法(此方法是public的)拿到了配置信息
private final Mapvalues;
拿到负载均衡类的全路径之后,kafka使用自己的封装的Utils来new 实例
privateT getConfiguredInstance(Object klass, Class t, Map configPairs) { if (klass == null) return null; Object o; if (klass instanceof String) { try { o = Utils.newInstance((String) klass, t); } catch (ClassNotFoundException e) { throw new KafkaException("Class " + klass + " cannot be found", e); } } else if (klass instanceof Class>) { o = Utils.newInstance((Class>) klass); } else throw new KafkaException("Unexpected element of type " + klass.getClass().getName() + ", expected String or Class"); if (!t.isInstance(o)) throw new KafkaException(klass + " is not an instance of " + t.getName()); if (o instanceof Configurable) ((Configurable) o).configure(configPairs); return t.cast(o); }
进去Utils.newInstance一看,对的,还是使用的反射来创建的负载均衡实例
public staticT newInstance(Class c) { if (c == null) throw new KafkaException("class cannot be null"); try { return c.getDeclaredConstructor().newInstance(); } catch (NoSuchMethodException e) { throw new KafkaException("Could not find a public no-argument constructor for " + c.getName(), e); } catch (ReflectiveOperationException | RuntimeException e) { throw new KafkaException("Could not instantiate class " + c.getName(), e); } }
(写博客主要是对自己学习的归纳整理,资料大部分来源于书籍、网络资料和自己的实践,整理不易,但是难免有不足之处,如有错误,请大家评论区批评指正。同时感谢广大博主和广大作者辛苦整理出来的资源和分享的知识。)



