1 摘要2 核心 Maven 依赖3 Kafka 服务配置3 消息生产者-Producer
3.1 application 配置文件3.2 生产者发送消息3.3 主题定义与消息实体3.4 发送示例-Controller3.5 SpringBoot 启动类 4 消息消费者-Consumer
4.1 application 配置文件4.2 消费者接收消息 5 测试6 推荐参考资料7 Github 源码
1 摘要Kafka 本身作为流处理平台,在大数据处理能力上应用广泛;同时 Kafka 也可以作为消息队列。本文将介绍基于 SpringBoot 2.6 集成 Kafka 2.8。
2 核心 Maven 依赖./demo-kafka-producer/pom.xml ./demo-kafka-consumer/pom.xml
org.springframework.kafka
spring-kafka
其中 SpringBoot 的版本为 2.6.3, spring-kafka 的版本为 2.8.2。
完整依赖
org.springframework.boot spring-boot-starter-web ${springboot.version} org.springframework.kafka spring-kafka cn.hutool hutool-all ${hutool.version} org.springframework.boot spring-boot-starter-test test org.springframework.kafka spring-kafka-test test org.projectlombok lombok true
对应版本
3 Kafka 服务配置1.8 UTF-8 ${java.version} ${java.version} 2.6.3 5.7.21
Kafka broker 核心配置
server.properties
# The id of the broker. This must be set to a unique integer for each broker. broker.id=1 # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). #advertised.listeners=PLAINTEXT://your.host.name:9092 advertised.listeners=PLAINTEXT://your.publicIP:9092 # A comma separated list of directories under which to store log files log.dirs=/var/log/kafka/logs-1 # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=127.0.0.1:2181
必须确保能够通过外部访问 Kafka 服务
3 消息生产者-Producer 3.1 application 配置文件./demo-kafka-producer/src/main/resources/application.yml
## config
## server
server:
port: 8900
## spring
spring:
application:
name: demo-kafka-producer
kafka:
bootstrap-servers: 172.16.140.10:9092
consumer:
group-id: 1
enable-auto-commit: true
auto-commit-interval: 100ms
properties:
session.timeout.ms: 15000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
producer:
retries: 0
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
3.2 生产者发送消息
./demo-kafka-producer/src/main/java/com/ljq/demo/springboot/kafka/producer/common/mq/KafkaMQProducer.java
package com.ljq.demo.springboot.kafka.producer.common.mq;
import cn.hutool.json.JSONUtil;
import com.ljq.demo.springboot.kafka.producer.common.constant.KafkaMessageConst;
import com.ljq.demo.springboot.kafka.producer.model.entity.KafkaMessageEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class KafkaMQProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
public void send(String msg) {
KafkaMessageEntity kafkaMessage = new KafkaMessageEntity();
kafkaMessage.setId("message-" + System.currentTimeMillis());
kafkaMessage.setMessage(msg);
log.info("kafka message: {}", JSONUtil.toJsonStr(kafkaMessage));
kafkaTemplate.send(KafkaMessageConst.KAFKA_TOPIC_DEMO, JSONUtil.toJsonStr(kafkaMessage));
}
}
3.3 主题定义与消息实体
主题常量类
./demo-kafka-producer/src/main/java/com/ljq/demo/springboot/kafka/producer/common/constant/KafkaMessageConst.java
package com.ljq.demo.springboot.kafka.producer.common.constant;
public class KafkaMessageConst {
private KafkaMessageConst() {
}
public static final String KAFKA_TOPIC_DEMO = "kafka_topic_demo";
}
消息实体类
./demo-kafka-producer/src/main/java/com/ljq/demo/springboot/kafka/producer/model/entity/KafkaMessageEntity.java
package com.ljq.demo.springboot.kafka.producer.model.entity;
import lombok.Data;
import java.io.Serializable;
@Data
public class KafkaMessageEntity implements Serializable {
private static final long serialVersionUID = -3812375964256200394L;
private String id;
private String message;
}
3.4 发送示例-Controller
./demo-kafka-producer/src/main/java/com/ljq/demo/springboot/kafka/producer/controller/KafkaMessageController.java
package com.ljq.demo.springboot.kafka.producer.controller;
import com.ljq.demo.springboot.kafka.producer.common.mq.KafkaMQProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
@RequestMapping("/api/demo/kafka")
public class KafkaMessageController {
@Autowired
private KafkaMQProducer kafkaMQProducer;
@PostMapping(value = "/send", produces = {MediaType.APPLICATION_JSON_VALUE})
public ResponseEntity send(String message) {
log.info("request path: {}, param: {}", "/api/demo/kafka/send", message);
kafkaMQProducer.send(message);
return ResponseEntity.ok(message);
}
}
3.5 SpringBoot 启动类
在消息发送之前需要创建主题
./demo-kafka-producer/src/main/java/com/ljq/demo/springboot/kafka/producer/DemoKafkaProducerApplication.java
package com.ljq.demo.springboot.kafka.producer;
import com.ljq.demo.springboot.kafka.producer.common.constant.KafkaMessageConst;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaAdmin;
@SpringBootApplication
public class DemoKafkaProducerApplication {
public static void main(String[] args) {
SpringApplication.run(DemoKafkaProducerApplication.class, args);
}
@Bean
public KafkaAdmin.NewTopics topics() {
return new KafkaAdmin.NewTopics(TopicBuilder.name(KafkaMessageConst.KAFKA_TOPIC_DEMO).build());
}
}
spring-kafka 2.8 支持同时创建多个主题,同时创建的操作也非常简洁
4 消息消费者-Consumer 4.1 application 配置文件./demo-kafka-consumer/src/main/resources/application.yml
## config
## server
server:
port: 8901
## spring
spring:
application:
name: demo-kafka-consumer
kafka:
bootstrap-servers: 172.16.140.10:9092
consumer:
group-id: 1
enable-auto-commit: true
auto-commit-interval: 100ms
properties:
session.timeout.ms: 15000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
producer:
retries: 0
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
4.2 消费者接收消息
./demo-kafka-consumer/src/main/java/com/ljq/demo/springboot/kafka/consumer/common/mq/KafkaMQConsumer.java
package com.ljq.demo.springboot.kafka.consumer.common.mq;
import cn.hutool.json.JSONUtil;
import com.ljq.demo.springboot.kafka.consumer.common.constant.KafkaMessageConst;
import com.ljq.demo.springboot.kafka.consumer.model.entity.KafkaMessageEntity;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;
@Slf4j
@Component
public class KafkaMQConsumer {
@KafkaListener(topics = {KafkaMessageConst.KAFKA_TOPIC_DEMO})
public void receive(ConsumerRecord, ?> record) {
log.info("record: {}", record);
Optional.ofNullable(record.value())
.ifPresent(message -> {
log.info("message: {}", JSONUtil.toBean(String.valueOf(message), KafkaMessageEntity.class));
});
}
}
5 测试
分别启动生产者服务和消费者服务
请求发送接口
接口地址:
http://127.0.0.1:8900/api/demo/kafka/send?message=kakakakak
请求方式: POST
生产者后台日志:
2022-03-01 18:27:53 | INFO | http-nio-8900-exec-6 | com.ljq.demo.springboot.kafka.producer.controller.KafkaMessageController 33| request path: /api/demo/kafka/send, param: kakakakak
2022-03-01 18:27:53 | INFO | http-nio-8900-exec-6 | com.ljq.demo.springboot.kafka.producer.common.mq.KafkaMQProducer 32| kafka message: {"message":"kakakakak","id":"message-1646130473195"}
消费者后台日志:
2022-03-01 18:27:53 | INFO | org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 | com.ljq.demo.springboot.kafka.consumer.common.mq.KafkaMQConsumer 29| record: ConsumerRecord(topic = kafka_topic_demo, partition = 0, leaderEpoch = 0, offset = 5279, CreateTime = 1646130473195, serialized key size = -1, serialized value size = 52, headers = RecordHeaders(headers = [], isReadonly = false), key = null, value = {"message":"kakakakak","id":"message-1646130473195"})
2022-03-01 18:27:53 | INFO | org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 | com.ljq.demo.springboot.kafka.consumer.common.mq.KafkaMQConsumer 32| message: KafkaMessageEntity(id=message-1646130473195, message=kakakakak)
至此,SpringBoot 集成 kafka 已经完成。
6 推荐参考资料官方文档-APACHE KAFKA QUICKSTART
SpringBoot集成Kafka
How to Work with Apache Kafka in Your Spring Boot Application
Spring for Apache Kafka
Kafka常见错误整理
SpringBoot工程连远程Kafka报错UnknownHostException
7 Github 源码Gtihub 源码地址 : https://github.com/Flying9001/springBootDemo
个人公众号:404Code,分享半个互联网人的技术与思考,感兴趣的可以关注.



