一、安装kafka(前提是已安装了zookeeper),这里介绍docker安装方式
docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_ConNECT=xxxx:12181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://xxxx:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka
-e KAFKA_BROKER_ID=0 broker标识,集群中用来区分不同的broker
-e KAFKA_ZOOKEEPER_ConNECT=10.9.44.11:2181/ 配置zookeeper地址
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://xxxx:9092 把kafka的地址端口注册给zookeeper
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置端口
注意:如果连接程序与kafka服务不在同一服务器上KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://xxxx:9092这里的ip地址要设置成公网ip,否则在验证时会报:Connection to node 0 could not be established. Broker may not be available的错误。
二、导入依赖
org.springframework.kafka spring-kafkaorg.springframework.kafka spring-kafka-testtest
三、修改application.yml配置(配置详解看这位老哥的这里)
spring:
kafka:
bootstrap-servers: http://xxxx:9092 # 指定kafka server的地址,集群配多个,中间,逗号隔开
producer: # 生产者配置
retries: 0
batch-size: 16384 # 每次批量发送消息的数量,produce积累到一定数据,一次发送
buffer-memory: 33554432 # produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
acks: 1
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer: # 消费者配置
group-id: test2 # 指定默认消费者group id
auto-offset-reset: latest # 消费位置起始点
enable-auto-commit: true # 设置自动提交offset
auto-commit-interval: 1000 #如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
application.properties配置
spring.kafka.bootstrap-servers=http://xxxx:9092 # 指定kafka server的地址,集群配多个,中间,逗号隔开 # 生产者配置 spring.kafka.producer.retries=0 # 每次批量发送消息的数量,produce积累到一定数据,一次发送 spring.kafka.producer.batch-size=16384 # produce积累数据一次发送,缓存大小达到buffer.memory就发送数据 spring.kafka.producer.buffer-memory=33554432 spring.kafka.producer.acks=1 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer # 消费者配置 spring.kafka.consumer.group-id=test2 # 指定默认消费者group id spring.kafka.consumer.auto-offset-reset=latest spring.kafka.consumer.enable-auto-commit=true # 设置自动提交offset spring.kafka.consumer.auto-commit-interval=1000 #如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
四、启动类加上@EnableKafka注解
五、消息发送者
@Slf4j
@RestController
@RequestMapping("/webapi")
public class SysServerBusiController {
@Autowired
private KafkaTemplate kafkaTemplate;
}
@GetMapping("/send")
public String kafkaSendMessage(String topic, String message) {
log.info("==========kafka发送消息topic:{}, message:{}", topic, message);
kafkaTemplate.send(topic, message);
return JSONUtil.toJsonStr("send success");
}
}
六、消费者
@Slf4j
@Component
public class KafkaConsumer {
@KafkaListener(topics = "test")
private void onMessage(String message){
log.info("===========sys消费:{}", message);
}
}
七、测试验证



