栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Springboot项目整合spring-kafka依赖发送消息、监听消息

Springboot项目整合spring-kafka依赖发送消息、监听消息

目录

一、 Springboot项目整合spring-kafka依赖包配置

二、配置文件修改增加生产者信息

三、发送消息

四、yml中增加消费者的配置

五、监听消息


一、 Springboot项目整合spring-kafka依赖包配置

添加pom文件

    
      org.springframework.kafka
      spring-kafka
    

二、配置文件修改增加生产者信息
spring:
  kafka:
    bootstrap-servers: xxx.74.xxx.xxxx:9092,112.xx.xx.240:9093,112.xxx.xxx.xxx:9094

    producer:
      # # 消息重发的次数。 配置事务的话:如果用户显式地指定了 retries 参数,那么这个参数的值必须大于0
      retries: 1
      #一个批次可以使用的内存大小
      batch-size: 16384
      # 设置生产者内存缓冲区的大小。
      buffer-memory: 33554432
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      #配置事务的话:如果用户显式地指定了 acks 参数,那么这个参数的值必须-1 all
      acks: all

三、发送消息
    @GetMapping("/api/v2/{num}")
    public void sendMessage1(@PathVariable("num") String num){

        kafkaTemplate.send(TOPIC_NAME,"这是一个消息,num="+num).addCallback(success->{
            String topic = success.getRecordmetadata().topic();

            int partition = success.getRecordmetadata().partition();

            long offset = success.getRecordmetadata().offset();

            System.out.println("发送成功:topic="+topic+", partition="+partition+",offset ="+offset);

        },failure->{
            System.out.println("发送消息失败:"+failure.getMessage());
        });

    }

结果:

 

四、yml中增加消费者的配置
  consumer:
      # 自动提交的时间间隔 在spring boot 2.X 版本是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
      auto-commit-interval: 1S

      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
      auto-offset-reset: earliest

      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: false

      # 键的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

    listener:
      #手工ack,调用ack后立刻提交offset
      ack-mode: manual_immediate
      #容器运行的线程数
      concurrency: 4

五、监听消息
@Component
public class MQListener {


    @KafkaListener(topics = {"user.register.topic"},groupId = "wnn-gp1")
    public void onMessage(ConsumerRecord record, Acknowledgment ack,
                          @Header(KafkaHeaders.RECEIVED_TOPIC) String topic){


        System.out.println("这是消费者在消费消息:"+record.topic()+"----"+record.partition()+"----"+record.value());

        ack.acknowledge();
    }

}

结果:

 

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/688233.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号