栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Kafka(四).Kafka&JAVA 基础API

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Kafka(四).Kafka&JAVA 基础API

Kafka(四).Kafka&JAVA 基础API 1.环境

使用java 来测试Kafka API 运行环境基于Kafka(三)的搭建的集群环境;

测试电脑(windows) 需要配置host

192.168.141.131 CentOSA
192.168.141.132 CentOSB
192.168.141.133 CentOSC

maven配置

		
            org.apache.kafka
            kafka-clients
            2.7.0
        

        
            log4j
            log4j
            1.2.17
        

        
            org.slf4j
            slf4j-log4j12
            1.7.7
        

        
            org.slf4j
            slf4j-api
            1.7.7
        
2. 查看&创建&删除&查看 topic
public class KafkaTopicDML {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1.创建KafkaAdminClient  相当创建了一个topic 连接的配置
        Properties properties = new Properties();
        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");

        KafkaAdminClient adminClient = (KafkaAdminClient)KafkaAdminClient.create(properties);

        ListTopicsResult listTopics = adminClient.listTopics();
        Set names = listTopics.names().get();
        for (String name : names) {
            System.out.println("topic 为:"+ name);
        }
        
        //异步创建  
        adminClient.createTopics(Arrays.asList(new NewTopic("topic03",3,(short)3)));
        
	    //同步创建
        //CreateTopicsResult topic05 = adminClient.createTopics(Arrays.asList(new NewTopic("topic05", 3, (short) 3)));
        //topic05.all().get();
        
        //异步删除
        adminClient.deleteTopics(Arrays.asList("topic05", "topic06"));
        
        //同步删除
        //DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("topic05", "topic06"));
        //deleteTopicsResult.all().get();
        
        //同步查看
        DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(names);
        Map stringTopicDescriptionMap = describeTopicsResult.all().get();
        System.out.println(stringTopicDescriptionMap);
        
        
        //关闭 adminClient
        adminClient.close();
    }
}
3. 生产者& 消费者
//生产者
public class KafkaProducerDemo {

    public static void main(String[] args) {
        //1.创建 KafkaProducer
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        KafkaProducer kafkaProducer = new KafkaProducer (properties);
        for (int i = 0; i < 10; i++) {
            ProducerRecord record = new ProducerRecord<>("topic02", "key" + i, "value" + i);
            //发送
            kafkaProducer.send(record);
        }

        kafkaProducer.close();
    }
}

//消费者
public class KafkaConsumerDemo {

    public static void main(String[] args) {
        //1.创建 KafkaProducer
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"g2");

        KafkaConsumer kafkaConsumer = new KafkaConsumer (properties);
        kafkaConsumer.subscribe(Pattern.compile("^topic02.*"));
        while (true){
            ConsumerRecords records = kafkaConsumer.poll(Duration.ofSeconds(1));
            if(!records.isEmpty()){
                Iterator> iterator = records.iterator();
                while (iterator.hasNext()){
                    ConsumerRecord next = iterator.next();
                    System.out.println(next);
                }
            }
        }
    }
}
测试验证 不指定分区的情况下
###打开一个消费者   消费了三个分区  S1
INFO  - ConsumerCoordinator        - [Consumer clientId=consumer-g2-1, groupId=g2] Notifying assignor about the new Assignment(partitions=[topic02-0, topic02-1, topic02-2])

###再打开一个消费者  S2  ====>S1的一个分区被同组另一个消费者S2获取了
##S1 log
INFO  - ConsumerCoordinator        - [Consumer clientId=consumer-g2-1, groupId=g2] Finished assignment for group at generation 8: {consumer-g2-1-58d640e4-0269-4c1c-a212-5f21bc761cca=Assignment(partitions=[topic02-0, topic02-1]), consumer-g2-1-81b66c7f-6c84-4095-809c-ce7afb4100a9=Assignment(partitions=[topic02-2])}

=======================================================================

##S2 log
INFO  - ConsumerCoordinator        - [Consumer clientId=consumer-g2-1, groupId=g2] Notifying assignor about the new Assignment(partitions=[topic02-2])

####再打开一个消费者  S3  ====>====>S1的一个分区被同组另一个消费者S3获取了
##s1 log
INFO  - ConsumerCoordinator        - [Consumer clientId=consumer-g2-1, groupId=g2] Finished assignment for group at generation 9: {consumer-g2-1-27f9cab4-cacc-425c-81c8-6e9fc141580c=Assignment(partitions=[topic02-0]), consumer-g2-1-58d640e4-0269-4c1c-a212-5f21bc761cca=Assignment(partitions=[topic02-1]), consumer-g2-1-81b66c7f-6c84-4095-809c-ce7afb4100a9=Assignment(partitions=[topic02-2])}

###S3 log
INFO  - ConsumerCoordinator        - [Consumer clientId=consumer-g2-1, groupId=g2] Notifying assignor about the new Assignment(partitions=[topic02-0])

=======================================================================

###再打开一个消费者  S4  ====>已经没有分区在分给新增的同组消费者了
INFO  - ConsumerCoordinator        - [Consumer clientId=consumer-g2-1, groupId=g2] Notifying assignor about the new Assignment(partitions=[])

=======================================================================
##关闭S2   候补S4 接替S2监听的分区
INFO  - ConsumerCoordinator        - [Consumer clientId=consumer-g2-1, groupId=g2] Notifying assignor about the new Assignment(partitions=[])
INFO  - ConsumerCoordinator        - [Consumer clientId=consumer-g2-1, groupId=g2] Adding newly assigned partitions: 
INFO  - AbstractCoordinator        - [Consumer clientId=consumer-g2-1, groupId=g2] Attempt to heartbeat failed since group is rebalancing
INFO  - ConsumerCoordinator        - [Consumer clientId=consumer-g2-1, groupId=g2] Revoke previously assigned partitions 
INFO  - AbstractCoordinator        - [Consumer clientId=consumer-g2-1, groupId=g2] (Re-)joining group
INFO  - AbstractCoordinator        - [Consumer clientId=consumer-g2-1, groupId=g2] Successfully joined group with generation Generation{generationId=11, memberId='consumer-g2-1-8f55c39e-9bf4-45a5-a35b-42ab9eb64d4f', protocol='range'}
INFO  - AbstractCoordinator        - [Consumer clientId=consumer-g2-1, groupId=g2] Successfully synced group in generation Generation{generationId=11, memberId='consumer-g2-1-8f55c39e-9bf4-45a5-a35b-42ab9eb64d4f', protocol='range'}
INFO  - ConsumerCoordinator        - [Consumer clientId=consumer-g2-1, groupId=g2] Notifying assignor about the new Assignment(partitions=[topic02-2])
INFO  - ConsumerCoordinator        - [Consumer clientId=consumer-g2-1, groupId=g2] Adding newly assigned partitions: topic02-2

得出:消费者组内消费者每添加一个,消费者组内部会有一个简单的负载均衡机制,当消费者组成员数目大于分区数的时候就没有分区可以分配了,会进行候补;直到其中一个消费者宕机,最后新增的才可以替补宕机消费者的分区;

当开启三个消费者S1 S2 S3时 开启生产者生产10个记录

=======================================================================
##S1
ConsumerRecord(topic = topic02, partition = 1, leaderEpoch = 2, offset = 7, CreateTime = 1632753328402, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadonly = false), key = key0, value = value0)
ConsumerRecord(topic = topic02, partition = 1, leaderEpoch = 2, offset = 8, CreateTime = 1632753328416, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadonly = false), key = key3, value = value3)
ConsumerRecord(topic = topic02, partition = 1, leaderEpoch = 2, offset = 9, CreateTime = 1632753328416, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadonly = false), key = key5, value = value5)
ConsumerRecord(topic = topic02, partition = 1, leaderEpoch = 2, offset = 10, CreateTime = 1632753328416, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadonly = false), key = key8, value = value8)

=======================================================================
##S2
ConsumerRecord(topic = topic02, partition = 2, leaderEpoch = 2, offset = 5, CreateTime = 1632753328415, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadonly = false), key = key1, value = value1)
ConsumerRecord(topic = topic02, partition = 2, leaderEpoch = 2, offset = 6, CreateTime = 1632753328416, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadonly = false), key = key2, value = value2)
ConsumerRecord(topic = topic02, partition = 2, leaderEpoch = 2, offset = 7, CreateTime = 1632753328416, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadonly = false), key = key9, value = value9)

=======================================================================
##S3
ConsumerRecord(topic = topic02, partition = 0, leaderEpoch = 0, offset = 6, CreateTime = 1632753328416, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadonly = false), key = key4, value = value4)
ConsumerRecord(topic = topic02, partition = 0, leaderEpoch = 0, offset = 7, CreateTime = 1632753328416, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadonly = false), key = key6, value = value6)
ConsumerRecord(topic = topic02, partition = 0, leaderEpoch = 0, offset = 8, CreateTime = 1632753328416, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadonly = false), key = key7, value = value7)

得出:消费者消费的数据实在分区内有序 在分区之间比较是没有顺序的

4.指定消费那个分区

可以指定消费开始的偏移量 失去消费组的特性,消费者实例之间没有任何关系了

public class KafkaConsumerDemo_1 {
	//同时启动多个实例  实例之间是没有联系的
    public static void main(String[] args) {
        //1.创建 KafkaProducer
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
	    // properties.put(ConsumerConfig.GROUP_ID_CONFIG,"g2");  不需要指定组了

        KafkaConsumer kafkaConsumer = new KafkaConsumer (properties);
        List topics = Arrays.asList(new TopicPartition("topic02", 0));
        //指定订阅那个topic  那个分区 Partition
        kafkaConsumer.assign(topics);
        kafkaConsumer.seek(new TopicPartition("topic02", 0),3);//还可以设置偏移量  

        while (true){
            ConsumerRecords records = kafkaConsumer.poll(Duration.ofSeconds(1));
            if(!records.isEmpty()){
                Iterator> iterator = records.iterator();
                while (iterator.hasNext()){
                    ConsumerRecord next = iterator.next();
                    System.out.println(next);
                }
            }
        }
    }
}
4.1负载均衡

当启动三个消费者是 发送30 条数据 得到的log ===>默认的不是轮训而是hash

##S1  ==》11
key4
key6
key7
key10
key13
key14
key23
key24
key25
key26
key29

##S2  ==》9
key1
key2
key9
key11
key12
key19
key20
key21
key22
##S3   ==》10
key0
key3
key5
key8
key15
key16
key17
key18
key27
key28
4.2编写简单轮训

可以通过实现Partitioner接口 并配置到KafkaProducer中 来实现指定分区发送数据

//指定生产者分区 

public class KafkaProducerDemo_2 {

    public static void main(String[] args) {
        //1.创建 KafkaProducer
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "CentOSA:9092,CentOSB:9092,CentOSC:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        for (int i = 0; i < 10; i++) {
            ProducerRecord record = new ProducerRecord<>("topic02", "key" + i, "value" + i);
            kafkaProducer.send(record);
        }
        kafkaProducer.close();
    }

}
//MyPartitioner 是实现了轮训的   分区策略类  需要实现Partitioner 接口
public class MyPartitioner implements Partitioner {
    public MyPartitioner() {
    }
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        int size = cluster.topics().size();
        Integer key1 = null;
        try {
            key1 = Integer.valueOf(key.toString().replace("key", ""));
        } catch (NumberFormatException e) {
             return 1;
        }
        return key1%3;
    }
    @Override
    public void close() {
    }
    @Override
    public void configure(Map configs) {
    }
}
5.发送对象序列化

可以使用现成的序列化包

		 
            org.apache.commons
            commons-lang3
            3.9
        

配置对象序列化

public class ObjectSerializer implements Serializer {
    @Override
    public byte[] serialize(String topic, Object data) {
        return SerializationUtils.serialize((Serializable) data);
    }
}

配置对象反序列化

public class ObjectDeserializer implements Deserializer {
    @Override
    public Object deserialize(String topic, byte[] data) {
        return  SerializationUtils.deserialize(data);
    }
}
 

测试对象

public class DemoObj implements Serializable {
    private Integer f1;
    private String  f2;
    private Date f3;
}

开始测试

生产者

public class KafkaProducerObjectDemo {

    public static void main(String[] args) {
        //1.创建 KafkaProducer
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,ObjectSerializer.class.getName());//修改
        KafkaProducer kafkaProducer = new KafkaProducer (properties);
        for (int i = 0; i < 10; i++) {
            //修改
            ProducerRecord record = new ProducerRecord<>("topic04", "key" + i,new DemoObj(i, UUID.randomUUID().toString(),new Date() ));
            kafkaProducer.send(record);
        }
        kafkaProducer.close();
    }
}

消费者

public class KafkaConsumerObjectDemo {

    public static void main(String[] args) {
        //1.创建 KafkaProducer
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,ObjectDeserializer.class.getName());//修改
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"g2");
        KafkaConsumer kafkaConsumer = new KafkaConsumer (properties);
        kafkaConsumer.subscribe(Arrays.asList("topic04"));
        while (true){
            //修改
            ConsumerRecords records = kafkaConsumer.poll(Duration.ofSeconds(1));
            if(!records.isEmpty()){
                Iterator> iterator = records.iterator();
                while (iterator.hasNext()){
                    ConsumerRecord next = iterator.next();
                    System.out.println(next.value());
                }
            }
        }
    }
}

测试结果 顺利反序列化

DemoObj{f1=0, f2='22c16c50-f703-4b78-ba1e-8fb2f58d9801', f3=Tue Sep 28 23:06:38 CST 2021}
DemoObj{f1=3, f2='abd6089a-3ac8-4952-b184-d09f94336893', f3=Tue Sep 28 23:06:39 CST 2021}
DemoObj{f1=5, f2='4732aafa-8a24-42b1-941d-b5efd72395e9', f3=Tue Sep 28 23:06:39 CST 2021}
DemoObj{f1=8, f2='08a56491-f238-4bba-afa5-008a39577756', f3=Tue Sep 28 23:06:39 CST 2021}
DemoObj{f1=4, f2='7bf2cb8b-e898-4581-b3ea-fecda46ba899', f3=Tue Sep 28 23:06:39 CST 2021}
DemoObj{f1=6, f2='0c14cf73-96e3-4f89-999b-5662e86900bf', f3=Tue Sep 28 23:06:39 CST 2021}
DemoObj{f1=7, f2='aa88582b-59ef-449c-a86d-c147aa1815e7', f3=Tue Sep 28 23:06:39 CST 2021}
DemoObj{f1=1, f2='b94e35a0-69ce-4190-b111-b9a65e9d60ed', f3=Tue Sep 28 23:06:39 CST 2021}
DemoObj{f1=2, f2='57c54244-9522-4689-97b3-411bd2fc587c', f3=Tue Sep 28 23:06:39 CST 2021}
DemoObj{f1=9, f2='6a3c5526-c015-425b-bdda-e242b1fda780', f3=Tue Sep 28 23:06:39 CST 2021}
6.自定义拦截器

可以对数据的发送 做一些拦截处理 ,比如发送失败处理

定义拦截器

public class KafkaProducerInterceptor implements ProducerInterceptor {
    //消息的进一步包装
    @Override
    public ProducerRecord onSend(ProducerRecord record) {
        return new ProducerRecord(record.topic(),record.key(),record.value()+"sffffffffff") ;
    }
    //成功或者失败调用
    @Override
    public void onAcknowledgement(Recordmetadata metadata, Exception exception) {
        System.out.println(metadata);
        System.out.println(exception);
    }
    @Override
    public void close() {
    }
    @Override
    public void configure(Map configs) {
    }
}

生产者配置

public class KafkaProducerObjectInterceptorDemo {

    public static void main(String[] args) {
        //1.创建 KafkaProducer
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, KafkaProducerInterceptor.class.getName());//添加配置
        KafkaProducer kafkaProducer = new KafkaProducer (properties);
        for (int i = 0; i < 10; i++) {
            ProducerRecord record = new ProducerRecord<>("topic07", "key" + i,"value"+i);
            kafkaProducer.send(record);
        }
        kafkaProducer.close();
    }
}

消费者不改动;

消费者日志

//对应上面拦截器对  ProducerRecord 的value 重新包装
//打印 消息的 value
value0sffffffffff
value3sffffffffff
value5sffffffffff
value8sffffffffff
value1sffffffffff
value2sffffffffff
value9sffffffffff

生产者日志

//对应上面拦截器对  onAcknowledgement 方法对发送后的调用
topic07-0@10
null
topic07-0@11
null
topic07-0@12
null
转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号