栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

kafka多个消费者同时消费topic中的消息

kafka多个消费者同时消费topic中的消息

首先我们先模拟生产者代码

    
    @GetMapping("/kafka/many_consumer")
    public void manyConsumer() {
        for (int i = 0; i <1; i++) {
            try {
                Thread.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Long orderId = new SnowflakeGenerator().next();
            //key值取hash值对分区数量取模
            final Integer partition =Math.abs(orderId.hashCode())%3;
            final Order order = new Order();
            order.setOrderNo(orderId+"");
            order.setCreateTime(new Date());
            order.setPhone('1' + RandomUtil.randomNumbers(10));
            log.info("kafka 发送消息"+orderId + "分区====="+partition);
            kafkaService.sendMsg("topic.many.consumer",partition,orderId+"", JSON.toJSONString(order));
        }
    }
}

然后实现多个消费者同时消费topic中的消息

 @KafkaListener(groupId="many-consumer-id-1", topics = {"topic.many.consumer"})
    public void consumer1(ConsumerRecord record) {
        log.info("分组:many-consumer-id-1 "+" 主题:" + record.topic() + "-" + record.partition() + "-" + record.value());
    }
    
    @KafkaListener(groupId="many-consumer-id-2", topics = {"topic.many.consumer"})
    public void consumer2(ConsumerRecord record) {
        log.info("分组:many-consumer-id-2 "+" 主题:" + record.topic() + "-" + record.partition() + "-" + record.value());
    }
    @KafkaListener(groupId="many-consumer-id-3", topics = {"topic.many.consumer"})
    public void consumer3(ConsumerRecord record) {
        log.info("分组:many-consumer-id-3 "+" 主题:" + record.topic() + "-" + record.partition() + "-" + record.value());
    }

其实实现这个功能的重点在于@KafkaListener这个注解中的groupId是不同的

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/280975.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号