栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Springboot 整合 kafka 实现消息的发布和订阅

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Springboot 整合 kafka 实现消息的发布和订阅

Kafka 是一个分布式、高吞吐量、可持久性和自动负载均衡的消息队列。它在实现了传统意义上的 MQ 功能的同时,也可以作为大数据的流处理平台。

简单来说,Kafka 就是一个高吞吐量的分布式发布订阅消息系统。

Kafka 的用法跟 RabbitMQ 用法相同,都是作为一个消息中间件收发消息,下面介绍的是 Springboot 微服务集成 Kafka,已经简单的用法说明。

依赖

Spring 有专门支持 Kafka 的依赖,引入 Spring 对应版本支持的 Kafka 依赖即可,如下

        
            org.springframework.kafka
            spring-kafka
        
配置
spring:
  kafka:
    bootstrap-servers: ${KAFKA_HOST:192.168.0.105:9092}
    #=============== producer  =======================
    producer:
      #如果该值大于零时,表示启用重试失败的发送次数
      retries: 0
      #每当多个记录被发送到同一分区时,生产者将尝试将记录一起批量处理为更少的请求,默认值为16384(单位字节)
      batch-size: 16384
      #生产者可用于缓冲等待发送到服务器的记录的内存总字节数,默认值为3355443
      buffer-memory: 33554432
      #key的Serializer类,实现类实现了接口org.apache.kafka.common.serialization.Serializer
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      #value的Serializer类,实现类实现了接口org.apache.kafka.common.serialization.Serializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    #=============== consumer  =======================
    consumer:
      #用于标识此使用者所属的使用者组的唯一字符串
      group-id: consumer-group-default
      #当Kafka中没有初始偏移量或者服务器上不再存在当前偏移量时该怎么办,默认值为latest,表示自动将偏移重置为最新的偏移量
      #可选的值为latest, earliest, none
      auto-offset-reset: earliest
      #消费者的偏移量将在后台定期提交,默认值为true
      enable-auto-commit: true
      #如果'enable-auto-commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
      auto-commit-interval: 100
      #密钥的反序列化器类,实现类实现了接口org.apache.kafka.common.serialization.Deserializer
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #值的反序列化器类,实现类实现了接口org.apache.kafka.common.serialization.Deserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

默认 value-serializer 使用 org.apache.kafka.common.serialization.StringSerializer ,只支持文本消息。自定义 org.springframework.kafka.support.serializer.JsonSerializer 可以让消息支持其他类型。

使用示例

新建消息实体类

public class Message {
    private Long id;
    private String content;
    private Date sendTime;
	
	// constructor、getter、setter...
}

消息生产者控制器

@RestController
@RequestMapping("/kafka/producer")
public class KafkaProducerController {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerController.class);

    private static final String TOPIC = "topic-test";

    private KafkaTemplate kafkaTemplate;

    public KafkaProducerController(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @PostMapping("/push")
    public ResponseEntity pushMessage(@RequestBody Message message) {
        Date time = new Date();
        message.setSendTime(time);
        kafkaTemplate.send(TOPIC, JSON.toJSONString(message)).addCallback(success
                -> LOGGER.info("{}-生产者发送消息成功:{},时间:{}", TOPIC, success, time), failure
                -> LOGGER.error("{}-生产者发送消息失败:{}", failure.getMessage()));
        return new ResponseEntity<>("success", HttpStatus.OK);
    }
}

消息消费者监听

@Component
public class KafkaConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);

    private static final String TOPIC = "topic-test";

    @KafkaListener(topics = {TOPIC})
    public void testConsumer(String body) {
        LOGGER.info("消费时间: {}", new Date());
        Message message = JSON.parseObject(body, Message.class);
        LOGGER.info("topic: {}, 消费消息内容: {}", TOPIC, message);
    }
}

上边的示例是生产者发送消息到 topic-test,消费者以默认组 consumer-group-default 身份监听 topic-test 消费消息,监听器用 @KafkaListener 注解,topics 属性表示监听的topic,支持同时监听多个,用英文逗号分隔,如果需要使用指定组身份消费消息,可通过注解中的 groupId 属性指定。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/318207.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号