1.有一个可运行的SpringBoot项目
2.已经安装好zookeeper和kafka
没有安装好的请看我的上一篇博客安装kafka 地址如下:
执行步骤https://blog.csdn.net/zhanghengchao123/article/details/122149237
调用新增、编辑、删除之类的方法中,用kafka发送这个消息给其他方法监听
springboot项目集成kafka-
导入依赖
org.springframework.kafka spring-kafkacn.hutool hutool-all5.7.9 -
配置yml文件
kafka:
consumer:
zookeeper.connect: localhost:2181
servers: localhost:9092
enable.auto.commit: false
session.timeout: 60000
auto.commit.interval: 100
auto.offset.reset: latest
group.id: consumer-group
concurrency: 3
max_poll_interval_ms: 60000
max_poll_records: 20
producer:
servers: localhost:9092
retries: 3
batch.size: 16384
linger: 1
buffer.memory: 3355432
- 业务层方法调用kafka 发送消息
// 先注入kafka 模板类 @Autowired private KafkaTemplatekafkaTemplate; //kafka 不能直接发送实体类所以要 转换成json格式的字符串 JSON parse = JSONUtil.parse(你的实体类); kafkaTemplate.send("topicdemo",parse.toString());
- 消费者消费(建一个监听类如下)
@Slf4j
@Component
public class KafkaListenerDemo {
@KafkaListener(topics = "topicdemo",groupId = "consumer-group")
public void handle(ConsumerRecord, ?> record ){
Object value = record.value();
log.info("消费者进来了!"+record.toString());
DemoTeacher teacher=JSONUtil.toBean(value.toString(),DemoTeacher.class);
log.info("老师的值:--------"+teacher.toString());
}
}
- 执行你的springboot项目并调用业务方法输出



