项目构建工具使用的是maven:
二、搭建步骤 1.添加配置文件org.springframework.boot spring-boot-starter-parent2.2.8.RELEASE 4.0.0 springboot-kafka1.8 org.springframework.boot spring-boot-starter-weborg.springframework.kafka spring-kafkaorg.springframework.boot spring-boot-starter-testtest org.junit.vintage junit-vintage-engineorg.springframework.kafka spring-kafka-testtest org.springframework.boot spring-boot-maven-plugin
application.properties的配置代码如下:
spring.application.name=springboot-kafka-02 server.port=8080 # 用于建立初始连接的broker地址 spring.kafka.bootstrap-servers=192.168.42.21:9092 # producer用到的key和value的序列化类 spring.kafka.producer.keyserializer=org.apache.kafka.common.serialization.IntegerSerializer spring.kafka.producer.valueserializer=org.apache.kafka.common.serialization.StringSerializer # 默认的批处理记录数 spring.kafka.producer.batch-size=16384 # 32MB的总发送缓存 spring.kafka.producer.buffer-memory=33554432 # consumer用到的key和value的反序列化类 spring.kafka.consumer.keydeserializer=org.apache.kafka.common.serialization.IntegerDeserializer spring.kafka.consumer.valuedeserializer=org.apache.kafka.common.serialization.StringDeserializer # consumer的消费组id spring.kafka.consumer.group-id=spring-kafka-consumer-02 # 是否自动动提交消费者偏移量 spring.kafka.consumer.enable-auto-commit=true # 每隔100ms向broker提交一次偏移量 spring.kafka.consumer.auto-commit-interval=100 # 如果该消费者的偏移量不存在,则自动设置为最早的偏移量 spring.kafka.consumer.auto-offset-reset=earliest2.添加相关的项目包结构 3.添加SpringBoot启动主类
package com.kafka.learn;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaDemoApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaDemoApplication.class, args);
}
}
4.添加Kakfa配置信息类
package com.kafka.learn.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class KafkaConfig {
@Bean
public NewTopic topic1() {
return new NewTopic("topic1", 5, (short) 1);
}
@Bean
public NewTopic topic2() {
return new NewTopic("topic2", 3, (short) 1);
}
}
5.添加Kakfa消息生产者
在Controller层添加相关的消息生产接口,主要有同步发送、异步发送
同步发送Kafka消息生成者的业务接口设计:
package com.kafka.learn.controller;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.*;
import java.util.concurrent.ExecutionException;
@RestController
@RequestMapping("/kafka/sync")
public class KafkaSyncProducerController {
@Autowired
private KafkaTemplate template;
@RequestMapping(value = "/send/{message}", method = RequestMethod.GET)
public String sendSync(@PathVariable("message") String message) {
ListenableFuture future = template.send(new ProducerRecord(
"topic-spring-02",
0,
1,
message
));
try {
// 同步等待broker的响应
Object o = future.get();
SendResult result = (SendResult) o;
System.out.println(result.getRecordmetadata().topic()
+ result.getRecordmetadata().partition()
+ result.getRecordmetadata().offset());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return "success";
}
@RequestMapping(value = "/{topic}/send", method = RequestMethod.GET)
public void sendMessageToTopic(@PathVariable("topic") String topic,
@RequestParam(value = "partition", defaultValue = "0") int partition) {
System.out.println("开发发送消息给kafka:" + topic);
template.send(topic, partition, partition, "你好,kafka");
}
}
异步发送Kafka消息生成者的业务接口设计:
package com.kafka.learn.controller;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/kafka/async")
public class KafkaAsyncProducerController {
@Autowired
private KafkaTemplate template;
@RequestMapping("/send/{message}")
public String asyncSend(@PathVariable("message") String message) {
ProducerRecord record = new ProducerRecord<>(
"topic-spring-02",
0,
3,
message
);
ListenableFuture> future = template.send(record);
// 添加回调,异步等待响应
future.addCallback(new ListenableFutureCallback>() {
@Override
public void onFailure(Throwable throwable) {
System.out.println("发送失败:" + throwable.getMessage());
}
@Override
public void onSuccess(SendResult result) {
System.out.println("发送成功:" +
result.getRecordmetadata().topic() + "t"
+ result.getRecordmetadata().partition() + "t"
+ result.getRecordmetadata().offset());
}
});
return "success";
}
}
6.添加Kakfa消息消费者
package com.kafka.learn.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;
import java.util.Optional;
@Component
public class MyConsumer {
@KafkaListener(topics = "topic-spring-02")
public void onMessage(ConsumerRecord record) {
Optional> optional =
Optional.ofNullable(record);
if (optional.isPresent()) {
System.out.println(
record.topic() + "t"
+ record.partition() + "t"
+ record.offset() + "t"
+ record.key() + "t"
+ record.value());
}
}
@KafkaListener(id = "listen01",
topicPartitions = {
@TopicPartition(topic = "topic1", partitions = { "0", "3" }),
@TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4"))
})
public void listen(ConsumerRecord, ?> record) {
System.out.println("topic" + record.topic());
System.out.println("key:" + record.key());
System.out.println("value:"+record.value());
}
}
三、测试结果
测试kafka消息的同步发送:接口 --- http://localhost:8080/kafka/sync/send/你好,kafka
后端消费结果:
2021-11-12 10:54:20.143 INFO 20160 --- [nio-8080-exec-2] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet' 2021-11-12 10:54:20.144 INFO 20160 --- [nio-8080-exec-2] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet' 2021-11-12 10:54:20.150 INFO 20160 --- [nio-8080-exec-2] o.s.web.servlet.DispatcherServlet : Completed initialization in 6 ms 2021-11-12 10:54:20.186 INFO 20160 --- [nio-8080-exec-2] o.a.k.clients.producer.ProducerConfig : ProducerConfig values: acks = 1 batch.size = 16384 bootstrap.servers = [192.168.42.21:9092] buffer.memory = 33554432 client.dns.lookup = default client.id = compression.type = none connections.max.idle.ms = 540000 delivery.timeout.ms = 120000 enable.idempotence = false interceptor.classes = [] key.serializer = class org.apache.kafka.common.serialization.IntegerSerializer linger.ms = 0 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 2147483647 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.StringSerializer 2021-11-12 10:54:20.203 INFO 20160 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1 2021-11-12 10:54:20.203 INFO 20160 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01 2021-11-12 10:54:20.203 INFO 20160 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1636685660203 2021-11-12 10:54:20.213 INFO 20160 --- [ad | producer-1] org.apache.kafka.clients.metadata: [Producer clientId=producer-1] Cluster ID: BNUXbtH2RYmLyhUrarFpBw topic-spring-02 0 1 1 你好,kafka topic-spring-0201
测试kafka消息的异步发送:接口 --- http://localhost:8080/kafka/async/send/你好async,kafka
后端消费结果:
开发发送消息给kafka:topic1 topictopic1 key:3 value:你好,kafka
测试发送到消费者消费外的分区:http://localhost:8080/kafka/sync/topic1/send?partition=4
后端消费结果(无消费日志):
开发发送消息给kafka:topic1
测试kafka消息的指定分区消费的topic1的同步发送:
接口 --- http://localhost:8080/kafka/sync/topic2/send?partition=0
后端消费结果:
开发发送消息给kafka:topic2 topictopic2 key:0 value:你好,kafka总结
SpringBoot集成Kafka很简单,相关的配置信息很多都已经被Kafka相关的项目组设置好,我们只要配置一些必须的参数,即可完成对Kafka的集成。当然要是需要对Kafka的发送和消费做业务上的限制,就需要我们去做二次封装了,比如:使用自定义的序列化方式,Kafka的生成者使用自定义的封装对象--- 限制发送的内容等等。
Kafka简单集成使用的是单机场景下的配置,若考虑集群情况下,还需要考虑一下kafka相关的配置参数,需要按自己的业务需求去调整。



