使用java 来测试Kafka API 运行环境基于Kafka(三)的搭建的集群环境;
测试电脑(windows) 需要配置host
192.168.141.131 CentOSA 192.168.141.132 CentOSB 192.168.141.133 CentOSC
maven配置
2. 查看&创建&删除&查看 topicorg.apache.kafka kafka-clients 2.7.0 log4j log4j 1.2.17 org.slf4j slf4j-log4j12 1.7.7 org.slf4j slf4j-api 1.7.7
public class KafkaTopicDML {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1.创建KafkaAdminClient 相当创建了一个topic 连接的配置
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
KafkaAdminClient adminClient = (KafkaAdminClient)KafkaAdminClient.create(properties);
ListTopicsResult listTopics = adminClient.listTopics();
Set names = listTopics.names().get();
for (String name : names) {
System.out.println("topic 为:"+ name);
}
//异步创建
adminClient.createTopics(Arrays.asList(new NewTopic("topic03",3,(short)3)));
//同步创建
//CreateTopicsResult topic05 = adminClient.createTopics(Arrays.asList(new NewTopic("topic05", 3, (short) 3)));
//topic05.all().get();
//异步删除
adminClient.deleteTopics(Arrays.asList("topic05", "topic06"));
//同步删除
//DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("topic05", "topic06"));
//deleteTopicsResult.all().get();
//同步查看
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(names);
Map stringTopicDescriptionMap = describeTopicsResult.all().get();
System.out.println(stringTopicDescriptionMap);
//关闭 adminClient
adminClient.close();
}
}
3. 生产者& 消费者
//生产者
public class KafkaProducerDemo {
public static void main(String[] args) {
//1.创建 KafkaProducer
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
KafkaProducer kafkaProducer = new KafkaProducer (properties);
for (int i = 0; i < 10; i++) {
ProducerRecord record = new ProducerRecord<>("topic02", "key" + i, "value" + i);
//发送
kafkaProducer.send(record);
}
kafkaProducer.close();
}
}
//消费者
public class KafkaConsumerDemo {
public static void main(String[] args) {
//1.创建 KafkaProducer
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"g2");
KafkaConsumer kafkaConsumer = new KafkaConsumer (properties);
kafkaConsumer.subscribe(Pattern.compile("^topic02.*"));
while (true){
ConsumerRecords records = kafkaConsumer.poll(Duration.ofSeconds(1));
if(!records.isEmpty()){
Iterator> iterator = records.iterator();
while (iterator.hasNext()){
ConsumerRecord next = iterator.next();
System.out.println(next);
}
}
}
}
}
测试验证 不指定分区的情况下
###打开一个消费者 消费了三个分区 S1
INFO - ConsumerCoordinator - [Consumer clientId=consumer-g2-1, groupId=g2] Notifying assignor about the new Assignment(partitions=[topic02-0, topic02-1, topic02-2])
###再打开一个消费者 S2 ====>S1的一个分区被同组另一个消费者S2获取了
##S1 log
INFO - ConsumerCoordinator - [Consumer clientId=consumer-g2-1, groupId=g2] Finished assignment for group at generation 8: {consumer-g2-1-58d640e4-0269-4c1c-a212-5f21bc761cca=Assignment(partitions=[topic02-0, topic02-1]), consumer-g2-1-81b66c7f-6c84-4095-809c-ce7afb4100a9=Assignment(partitions=[topic02-2])}
=======================================================================
##S2 log
INFO - ConsumerCoordinator - [Consumer clientId=consumer-g2-1, groupId=g2] Notifying assignor about the new Assignment(partitions=[topic02-2])
####再打开一个消费者 S3 ====>====>S1的一个分区被同组另一个消费者S3获取了
##s1 log
INFO - ConsumerCoordinator - [Consumer clientId=consumer-g2-1, groupId=g2] Finished assignment for group at generation 9: {consumer-g2-1-27f9cab4-cacc-425c-81c8-6e9fc141580c=Assignment(partitions=[topic02-0]), consumer-g2-1-58d640e4-0269-4c1c-a212-5f21bc761cca=Assignment(partitions=[topic02-1]), consumer-g2-1-81b66c7f-6c84-4095-809c-ce7afb4100a9=Assignment(partitions=[topic02-2])}
###S3 log
INFO - ConsumerCoordinator - [Consumer clientId=consumer-g2-1, groupId=g2] Notifying assignor about the new Assignment(partitions=[topic02-0])
=======================================================================
###再打开一个消费者 S4 ====>已经没有分区在分给新增的同组消费者了
INFO - ConsumerCoordinator - [Consumer clientId=consumer-g2-1, groupId=g2] Notifying assignor about the new Assignment(partitions=[])
=======================================================================
##关闭S2 候补S4 接替S2监听的分区
INFO - ConsumerCoordinator - [Consumer clientId=consumer-g2-1, groupId=g2] Notifying assignor about the new Assignment(partitions=[])
INFO - ConsumerCoordinator - [Consumer clientId=consumer-g2-1, groupId=g2] Adding newly assigned partitions:
INFO - AbstractCoordinator - [Consumer clientId=consumer-g2-1, groupId=g2] Attempt to heartbeat failed since group is rebalancing
INFO - ConsumerCoordinator - [Consumer clientId=consumer-g2-1, groupId=g2] Revoke previously assigned partitions
INFO - AbstractCoordinator - [Consumer clientId=consumer-g2-1, groupId=g2] (Re-)joining group
INFO - AbstractCoordinator - [Consumer clientId=consumer-g2-1, groupId=g2] Successfully joined group with generation Generation{generationId=11, memberId='consumer-g2-1-8f55c39e-9bf4-45a5-a35b-42ab9eb64d4f', protocol='range'}
INFO - AbstractCoordinator - [Consumer clientId=consumer-g2-1, groupId=g2] Successfully synced group in generation Generation{generationId=11, memberId='consumer-g2-1-8f55c39e-9bf4-45a5-a35b-42ab9eb64d4f', protocol='range'}
INFO - ConsumerCoordinator - [Consumer clientId=consumer-g2-1, groupId=g2] Notifying assignor about the new Assignment(partitions=[topic02-2])
INFO - ConsumerCoordinator - [Consumer clientId=consumer-g2-1, groupId=g2] Adding newly assigned partitions: topic02-2
得出:消费者组内消费者每添加一个,消费者组内部会有一个简单的负载均衡机制,当消费者组成员数目大于分区数的时候就没有分区可以分配了,会进行候补;直到其中一个消费者宕机,最后新增的才可以替补宕机消费者的分区;
当开启三个消费者S1 S2 S3时 开启生产者生产10个记录
======================================================================= ##S1 ConsumerRecord(topic = topic02, partition = 1, leaderEpoch = 2, offset = 7, CreateTime = 1632753328402, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadonly = false), key = key0, value = value0) ConsumerRecord(topic = topic02, partition = 1, leaderEpoch = 2, offset = 8, CreateTime = 1632753328416, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadonly = false), key = key3, value = value3) ConsumerRecord(topic = topic02, partition = 1, leaderEpoch = 2, offset = 9, CreateTime = 1632753328416, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadonly = false), key = key5, value = value5) ConsumerRecord(topic = topic02, partition = 1, leaderEpoch = 2, offset = 10, CreateTime = 1632753328416, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadonly = false), key = key8, value = value8) ======================================================================= ##S2 ConsumerRecord(topic = topic02, partition = 2, leaderEpoch = 2, offset = 5, CreateTime = 1632753328415, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadonly = false), key = key1, value = value1) ConsumerRecord(topic = topic02, partition = 2, leaderEpoch = 2, offset = 6, CreateTime = 1632753328416, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadonly = false), key = key2, value = value2) ConsumerRecord(topic = topic02, partition = 2, leaderEpoch = 2, offset = 7, CreateTime = 1632753328416, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadonly = false), key = key9, value = value9) ======================================================================= ##S3 ConsumerRecord(topic = topic02, partition = 0, leaderEpoch = 0, offset = 6, CreateTime = 1632753328416, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadonly = false), key = key4, value = value4) ConsumerRecord(topic = topic02, partition = 0, leaderEpoch = 0, offset = 7, CreateTime = 1632753328416, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadonly = false), key = key6, value = value6) ConsumerRecord(topic = topic02, partition = 0, leaderEpoch = 0, offset = 8, CreateTime = 1632753328416, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadonly = false), key = key7, value = value7)
得出:消费者消费的数据实在分区内有序 在分区之间比较是没有顺序的
4.指定消费那个分区可以指定消费开始的偏移量 失去消费组的特性,消费者实例之间没有任何关系了
public class KafkaConsumerDemo_1 {
//同时启动多个实例 实例之间是没有联系的
public static void main(String[] args) {
//1.创建 KafkaProducer
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
// properties.put(ConsumerConfig.GROUP_ID_CONFIG,"g2"); 不需要指定组了
KafkaConsumer kafkaConsumer = new KafkaConsumer (properties);
List topics = Arrays.asList(new TopicPartition("topic02", 0));
//指定订阅那个topic 那个分区 Partition
kafkaConsumer.assign(topics);
kafkaConsumer.seek(new TopicPartition("topic02", 0),3);//还可以设置偏移量
while (true){
ConsumerRecords records = kafkaConsumer.poll(Duration.ofSeconds(1));
if(!records.isEmpty()){
Iterator> iterator = records.iterator();
while (iterator.hasNext()){
ConsumerRecord next = iterator.next();
System.out.println(next);
}
}
}
}
}
4.1负载均衡
当启动三个消费者是 发送30 条数据 得到的log ===>默认的不是轮训而是hash
##S1 ==》11 key4 key6 key7 key10 key13 key14 key23 key24 key25 key26 key29 ##S2 ==》9 key1 key2 key9 key11 key12 key19 key20 key21 key22 ##S3 ==》10 key0 key3 key5 key8 key15 key16 key17 key18 key27 key284.2编写简单轮训
可以通过实现Partitioner接口 并配置到KafkaProducer中 来实现指定分区发送数据
//指定生产者分区
public class KafkaProducerDemo_2 {
public static void main(String[] args) {
//1.创建 KafkaProducer
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "CentOSA:9092,CentOSB:9092,CentOSC:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);
KafkaProducer kafkaProducer = new KafkaProducer(properties);
for (int i = 0; i < 10; i++) {
ProducerRecord record = new ProducerRecord<>("topic02", "key" + i, "value" + i);
kafkaProducer.send(record);
}
kafkaProducer.close();
}
}
//MyPartitioner 是实现了轮训的 分区策略类 需要实现Partitioner 接口
public class MyPartitioner implements Partitioner {
public MyPartitioner() {
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
int size = cluster.topics().size();
Integer key1 = null;
try {
key1 = Integer.valueOf(key.toString().replace("key", ""));
} catch (NumberFormatException e) {
return 1;
}
return key1%3;
}
@Override
public void close() {
}
@Override
public void configure(Map configs) {
}
}
5.发送对象序列化
可以使用现成的序列化包
org.apache.commons commons-lang3 3.9
配置对象序列化
public class ObjectSerializer implements Serializer {
@Override
public byte[] serialize(String topic, Object data) {
return SerializationUtils.serialize((Serializable) data);
}
}
配置对象反序列化
public class ObjectDeserializer implements Deserializer
测试对象
public class DemoObj implements Serializable {
private Integer f1;
private String f2;
private Date f3;
}
开始测试
生产者
public class KafkaProducerObjectDemo {
public static void main(String[] args) {
//1.创建 KafkaProducer
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,ObjectSerializer.class.getName());//修改
KafkaProducer kafkaProducer = new KafkaProducer (properties);
for (int i = 0; i < 10; i++) {
//修改
ProducerRecord record = new ProducerRecord<>("topic04", "key" + i,new DemoObj(i, UUID.randomUUID().toString(),new Date() ));
kafkaProducer.send(record);
}
kafkaProducer.close();
}
}
消费者
public class KafkaConsumerObjectDemo {
public static void main(String[] args) {
//1.创建 KafkaProducer
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,ObjectDeserializer.class.getName());//修改
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"g2");
KafkaConsumer kafkaConsumer = new KafkaConsumer (properties);
kafkaConsumer.subscribe(Arrays.asList("topic04"));
while (true){
//修改
ConsumerRecords records = kafkaConsumer.poll(Duration.ofSeconds(1));
if(!records.isEmpty()){
Iterator> iterator = records.iterator();
while (iterator.hasNext()){
ConsumerRecord next = iterator.next();
System.out.println(next.value());
}
}
}
}
}
测试结果 顺利反序列化
DemoObj{f1=0, f2='22c16c50-f703-4b78-ba1e-8fb2f58d9801', f3=Tue Sep 28 23:06:38 CST 2021}
DemoObj{f1=3, f2='abd6089a-3ac8-4952-b184-d09f94336893', f3=Tue Sep 28 23:06:39 CST 2021}
DemoObj{f1=5, f2='4732aafa-8a24-42b1-941d-b5efd72395e9', f3=Tue Sep 28 23:06:39 CST 2021}
DemoObj{f1=8, f2='08a56491-f238-4bba-afa5-008a39577756', f3=Tue Sep 28 23:06:39 CST 2021}
DemoObj{f1=4, f2='7bf2cb8b-e898-4581-b3ea-fecda46ba899', f3=Tue Sep 28 23:06:39 CST 2021}
DemoObj{f1=6, f2='0c14cf73-96e3-4f89-999b-5662e86900bf', f3=Tue Sep 28 23:06:39 CST 2021}
DemoObj{f1=7, f2='aa88582b-59ef-449c-a86d-c147aa1815e7', f3=Tue Sep 28 23:06:39 CST 2021}
DemoObj{f1=1, f2='b94e35a0-69ce-4190-b111-b9a65e9d60ed', f3=Tue Sep 28 23:06:39 CST 2021}
DemoObj{f1=2, f2='57c54244-9522-4689-97b3-411bd2fc587c', f3=Tue Sep 28 23:06:39 CST 2021}
DemoObj{f1=9, f2='6a3c5526-c015-425b-bdda-e242b1fda780', f3=Tue Sep 28 23:06:39 CST 2021}
6.自定义拦截器
可以对数据的发送 做一些拦截处理 ,比如发送失败处理
定义拦截器
public class KafkaProducerInterceptor implements ProducerInterceptor {
//消息的进一步包装
@Override
public ProducerRecord onSend(ProducerRecord record) {
return new ProducerRecord(record.topic(),record.key(),record.value()+"sffffffffff") ;
}
//成功或者失败调用
@Override
public void onAcknowledgement(Recordmetadata metadata, Exception exception) {
System.out.println(metadata);
System.out.println(exception);
}
@Override
public void close() {
}
@Override
public void configure(Map configs) {
}
}
生产者配置
public class KafkaProducerObjectInterceptorDemo {
public static void main(String[] args) {
//1.创建 KafkaProducer
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, KafkaProducerInterceptor.class.getName());//添加配置
KafkaProducer kafkaProducer = new KafkaProducer (properties);
for (int i = 0; i < 10; i++) {
ProducerRecord record = new ProducerRecord<>("topic07", "key" + i,"value"+i);
kafkaProducer.send(record);
}
kafkaProducer.close();
}
}
消费者不改动;
消费者日志
//对应上面拦截器对 ProducerRecord 的value 重新包装 //打印 消息的 value value0sffffffffff value3sffffffffff value5sffffffffff value8sffffffffff value1sffffffffff value2sffffffffff value9sffffffffff
生产者日志
//对应上面拦截器对 onAcknowledgement 方法对发送后的调用 topic07-0@10 null topic07-0@11 null topic07-0@12 null



