Windows环境下Springboot 集成Kafka
由于kafka是基于zookeeper运行的,在kafka2.x的版本中,自带了zk,本文介绍的是基于2.X版本的集成
kafka下载地址
https://kafka.apache.org/downloads
下载完成后解压到指定目录,如果用自带的路径,后面命令后会报集群属性异常,因为自带的路径是不确定的,需要修改配置文件
修改/config文件下的 zookeeper.properties 和 server.properties
zookeeper.properties文件:
dataDir=D:kafkakafka_2.12-3.1.0tempzookeeper
server.properties文件:
log.dirs=D:kafkakafka_2.12-3.1.0tempkafka
启动kafka时,需要启动三个cmd命令窗口,因为不是后台启动,所以窗口不能关闭
zookeeper进入windows目录执行:
zookeeper-server-start.bat …/…/config/zookeeper.properties
kafka 进入windows目录执行:
kafka-server-start.bat …/…/config/server.properties
新建窗口用来执行 创建topic
kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic demo
PS:2.X以上的版本由原先的 --zookeeper localhost:2181 更改为现在的 --bootstrap-server localhost:9092
硬性装备已经准备完成了,现在开始集成SpringBoot
引入依赖
org.springframework.kafka spring-kafka
修改appliaction后缀为.yml
server: port: 8080 spring: application: name: springboot-kafka kafka: #kafka的配置 bootstrap-servers: 127.0.0.1:9092 consumer: auto-commit-interval: 1000 #多长时间提交偏移量,自动提交 auto-offset-reset: earliest #设置偏移量 吐过kafaka中找不到当前消费者的偏移量,则直接将偏移量重置为最早的 enable-auto-commit: true #消费者的偏移量是自动提交还是手动提交。此处自动提交偏移量 group-id: springboot-consumer02 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: batch-size: 16384 #生产者每个批次放多少生产者 buffer-memory: 33554432 #生产者一端总的可用发送缓冲区大小、此处设置为32mb key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
创建生产者
@RestController
public class KafkaSyncProducerController {
@Resource
private KafkaTemplate stringKafkaTemplate;
//同步发送消息
@RequestMapping("/kafka/sync/{msg}")
public String sendMSG(@PathVariable("msg")String msg){
stringKafkaTemplate.send("demo", msg);
return "success";
}
}
创建消费者
@Component
public class KafkaConsumerController {
//设置监听器
@KafkaListener(topics = "demo")
public void listener(ConsumerRecord,?> record){
System.out.println("消费者消费到的消息:"+record.topic()+","+record.value());
}
}
发送请求
检查是否完成消费
参考文章:https://blog.csdn.net/zhaoyy0513/article/details/103904778



