主要是记录下 SpringBoot 如何集成 Kafka,完成消息队列的使用,代码包括 Json 序列化消息,生产者,消费者,配置文件。
1、maven 依赖
org.springframework.boot spring-boot-starter-weborg.springframework.kafka spring-kafkaorg.springframework.boot spring-boot-starter-json
2、定义序列化消息
public class OrderMsg{
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public String getAccountId() {
return accountId;
}
public void setAccountId(String accountId) {
this.accountId = accountId;
}
private String orderId;
private String accountId;
public OrderMsg() {}
public OrderMsg(String orderId, String accountId) {
this.orderId = orderId;
this.accountId = accountId;
}
@Override
public String toString() {
return "OrderMsg{" +
"orderId='" + orderId + ''' +
", accountId='" + accountId + ''' +
'}';
}
}
3、定义消息消费者
用于消费主题topic01,这个主题已在Kafka集群搭建的时候创建,这里直接消费;一共定义了两个消费者-consumer01和consumer02,属于orderGroup消费组,用于分担消费topic01主题3个分区里的消息。
@Component
public class OrderConsumer {
@KafkaListener(id="consumer01", groupId = "orderGroup", topics = {"topic01"})
public void processOrder1(ConsumerRecord msg) {
System.out.println("consumer01-" +
"topic:" + msg.topic() +
";partition:" + msg.partition() +
";key:" + msg.key() +
";value:" + msg.value());
}
@KafkaListener(id="consumer02", groupId = "orderGroup", topics = {"topic01"})
public void processOrder2(ConsumerRecord msg) {
System.out.println("consumer02-" +
"topic:" + msg.topic() +
";partition:" + msg.partition() +
";key:" + msg.key() +
";value:" + msg.value());
}
}
4、引入KafkaTemplate 来生产消息
@RestController
@RequestMapping("kafka")
public class OrderController {
@Resource
private KafkaTemplate kafkaTemplate;
@GetMapping("/order/{message}")
public void sendMessage(@PathVariable("message") String message) {
System.out.println(message);
for (int i = 1; i < 10; i++) {
kafkaTemplate.send("topic01", i+"", i+"-"+message);
}
kafkaTemplate.flush();
}
}
5、application.yml 配置文件
server:
port: 8888
spring:
kafka:
#Kafka集群
bootstrap-servers: 192.168.216.118:9092,192.168.216.128:9092,192.168.216.138:9092
producer:
acks: all
# 重试次数
retries: 2
# 批量大小
batch-size: 16384
# 生产端缓冲区大小
buffer-memory: 33554432
# Kafka提供的序列化和反序列化类
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
#开启事务
# transaction-id-prefix: transaction-id-
# 设置提交延时
# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就交给kafka
properties:
enable:
idempotence: true
linger:
ms: 0
consumer:
# 是否自动提交offset
enable-auto-commit: true
# 提交offset延时(接收到消息后多久提交offset)
auto-commit-interval: 100ms
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
auto-offset-reset: latest
# 默认的消费组ID
group-id: group01
# 消费端key反序列化
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 消费端value反序列化
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
# 批量消费每次最多消费条目数
max-poll-records: 10
properties:
# 消费会话超时时间
session:
timeout:
ms: 120000
heartbeat:
timeout:
ms: 1000
# 消费请求超时时间
request:
timeout:
ms: 120000
spring:
json:
trusted:
packages: com.studyplan.mq.kafka.bean
#isolation:
#level: read_committed
listener:
# 关闭监听topic不存在的话项目启动报错
missing-topics-fatal: false
# 设置为批量消费
# type: batch
6、浏览器输入
http://localhost:8888/kafka/order/world 进行测试,观察控制台输出。



