轮询策略
配置代码配置信息 随机策略
配置代码配置信息 HashPartitioner (Key-ordering)策略
配置代码配置信息 分区策略可以自定义更据公司业务来制定
轮询策略像下图这样平均的发送不同分区中,轮询策略有非常优秀的负载均衡表现,总可以能保证消费最大限度地被平分到所有分区上,也是最常用的分区策略。
配置代码package cn.com.kaf.configuration;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
public class MyPartition implements Partitioner {
private AtomicInteger count = new AtomicInteger();
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
int partition = count.getAndIncrement() % cluster.partitionsForTopic(topic).size();
System.out.println("key: " + key + ", partition: " + partition);
return partition;
}
@Override
public void close() {
}
@Override
public void configure(Map configs) {
}
}
配置信息
在producer中添加: #name of the partitionerclassforpartitioningevents;default partitionspreadsdatarandomly partitioner.class=cn.com.kaf.configuration.MyPartition
随机策略 配置代码
package cn.com.kaf.configuration;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
import java.util.Random;
public class MyPartition implements Partitioner {
@Override
public void close() {
}
@Override
public void configure(Map configs) {
}
private Random random = new Random();
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
int partition = random.nextInt(cluster.partitionsForTopic(topic).size());
System.out.println("key:" + key + ",partition:" + partition);
return partition;
}
}
配置信息
在producer中添加: #name of the partitionerclassforpartitioningevents;default partitionspreadsdatarandomly partitioner.class=cn.com.kaf.configuration.MyPartition
HashPartitioner (Key-ordering)策略 配置代码
package cn.com.kaf.configuration;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
import java.util.Random;
public class MyPartition implements Partitioner {
@Override
public void close() {
}
@Override
public void configure(Map configs) {
}
@Override
public int partition(String topic,Object key,byte[] keyBytes,Object value,byte[] valueBytes,Cluster cluster){
if(key==null){
return 0;
}
int hash=Math.abs(key.hashCode()); //规避hash值为负数的情况
System.out.println("key:"+key+",hash:"+hash);
int partitionNum=cluster.partitionsForTopic(topic).size();
return hash%partitionNum;
}
}
配置信息
在producer中添加: #name of the partitionerclassforpartitioningevents;default partitionspreadsdatarandomly partitioner.class=cn.com.kaf.configuration.MyPartition分区策略可以自定义更据公司业务来制定



