栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

springboot集成kafka

springboot集成kafka

springboot 集成 kafka 的方法非常简单,直接上代码。

引入maven依赖

    org.springframework.kafka
    spring-kafka

引入此包后,通过 springboot 的自动装配机制,KafkaAutoConfiguration 类会被加载到 ioc 容器,同时它里面定义的一些 bean 也会被加载,比如 kafkaTemplate,后面发送消息时就会用到。

配置 application.yml
spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    listener:
      missing-topics-fatal: false
    producer:
      retries: 0
      batch-size: 16384
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: demo-group 
      auto-commit-interval: 100
      enable-auto-commit: true
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  • missing-topics-fatal 如果设置为 true,当消费者监听的主题不存在时,会报错。所以需要设置为 false
  • consumer 的 group-id 是消费者组,自行设置
  • key 的序列化器和发序列化器,需要对应。value的同理
编写消息发送者

发送端通过 kafkaTemplate.send 方法发送消息

@Autowired
private KafkaTemplate kafkaTemplate;

private void sendMailMessage() {
	String[] ccMail = getMailReceiver();
	MailEntity mailEntity = new MailEntity();
	mailEntity.setReceiver("xxx.qq.com");
	mailEntity.setCc(ccMail);
	mailEntity.setSubject("测试邮件");
	mailEntity.setContent("测试邮件");
	kafkaTemplate.send("mail-send", JSON.toJSonString(mailEntity));
}
编写消息消费者

消费端通过 @KafkaListener 注解,并指定 topics 参数来接收消息。record.value() 中可以取得消息体的内容。 

@Component
public class MailSendConsumer {
    @Autowired
    private JavaMailSender javaMailSender;

    @KafkaListener(topics = { "mail-send" })
    public void sendMail(ConsumerRecord record){
        MailEntity entity = JSON.parseObject(record.value(), MailEntity.class);
        SimpleMailMessage message = new SimpleMailMessage();
        message.setFrom(sender);
        message.setTo(entity.getReceiver());
        message.setCc(entity.getCc());
        message.setSubject(entity.getSubject());
        message.setText(entity.getContent());
        javaMailSender.send(message);
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/281660.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号