springboot 集成 kafka 的方法非常简单,直接上代码。
引入maven依赖org.springframework.kafka spring-kafka
引入此包后,通过 springboot 的自动装配机制,KafkaAutoConfiguration 类会被加载到 ioc 容器,同时它里面定义的一些 bean 也会被加载,比如 kafkaTemplate,后面发送消息时就会用到。
配置 application.ymlspring:
kafka:
bootstrap-servers: 127.0.0.1:9092
listener:
missing-topics-fatal: false
producer:
retries: 0
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: demo-group
auto-commit-interval: 100
enable-auto-commit: true
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- missing-topics-fatal 如果设置为 true,当消费者监听的主题不存在时,会报错。所以需要设置为 false
- consumer 的 group-id 是消费者组,自行设置
- key 的序列化器和发序列化器,需要对应。value的同理
发送端通过 kafkaTemplate.send 方法发送消息
@Autowired private KafkaTemplate编写消息消费者kafkaTemplate; private void sendMailMessage() { String[] ccMail = getMailReceiver(); MailEntity mailEntity = new MailEntity(); mailEntity.setReceiver("xxx.qq.com"); mailEntity.setCc(ccMail); mailEntity.setSubject("测试邮件"); mailEntity.setContent("测试邮件"); kafkaTemplate.send("mail-send", JSON.toJSonString(mailEntity)); }
消费端通过 @KafkaListener 注解,并指定 topics 参数来接收消息。record.value() 中可以取得消息体的内容。
@Component
public class MailSendConsumer {
@Autowired
private JavaMailSender javaMailSender;
@KafkaListener(topics = { "mail-send" })
public void sendMail(ConsumerRecord record){
MailEntity entity = JSON.parseObject(record.value(), MailEntity.class);
SimpleMailMessage message = new SimpleMailMessage();
message.setFrom(sender);
message.setTo(entity.getReceiver());
message.setCc(entity.getCc());
message.setSubject(entity.getSubject());
message.setText(entity.getContent());
javaMailSender.send(message);
}
} 


