一.基本版本信息
springboot2.5.5 kafka.2.4.1
二.代码部分
2.1pom.xml
4.0.0 org.springframework.boot spring-boot-starter-parent2.5.5 com.daqu springboot-kafka0.0.1-SNAPSHOT springboot-kafka Demo project for Spring Boot 1.8 org.springframework.boot spring-boot-starter-weborg.springframework.kafka spring-kafkaorg.springframework.boot spring-boot-starter-testtest org.springframework.kafka spring-kafka-testtest org.projectlombok lombokorg.springframework.boot spring-boot-maven-plugin
2.2application.properties 文件中参数 是可以设置在KafkaTemplate
server.port=8088 spring.application.name=springboot-kafak #kafka 配置 spring.kafka.bootstrap-servers=node01:9092 #producer peizhi spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #设置生产者每个批次需要放多少的数据 spring.kafka.producer.batch-size=16384 #生产者分区总 可用发送缓冲区大小,设置为32MB spring.kafka.producer.buffer-memory=33554432 #consumer 配置 spring.kafka.consumer.key-serialization=org.apache.kafka.common.serialization.IntegerSerializer spring.kafka.consumer.value-serialization=org.apache.kafka.common.serialization.StringSerializer spring.kafka.consumer.group-id=springboot-consumer01 #设置offset kafka找不到offset 将按照设置的最早的offset spring.kafka.consumer.auto-offset-reset=earliest #消费者的提交的方式 自动? 手动? 事务需要手动提交 spring.kafka.consumer.enable-auto-commit=true #设置提交自动提交量的 时间 spring.kafka.consumer.auto-commit-interval=1000
2.3kafkaConfig 在这里面可以重写很多默认的参数,参数
@Configuration
public class kafkaConfig {
//创建新的主题
// ./kafka-topics.sh --bootstrap-server node01:2181 --list
@Bean
public NewTopic topic1(){
return new NewTopic("nptc-01",1,(short)1);
}
//重新设置producer
@Autowired
@Bean
public KafkaTemplate kafkaTemplate(ProducerFactory producerFactory){
HashMap configOverride = new HashMap<>();
configOverride.put(ProducerConfig.BATCH_SIZE_CONFIG,20000);
configOverride.put(ProducerConfig.ACKS_CONFIG,"all");
//生产者发送数据最大消息的大小 1M
configOverride.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,1048576);
//表示生产端消息发送失败时的重试次数,默认值为0
configOverride.put(ProducerConfig.RETRIES_CONFIG,3);
//表示生产端是否对消息进行压缩,默认值为none,即不压缩消息 综合考虑吞吐量与压缩比,建议选择lz4压缩。如果追求最高的压缩比则推荐zstd压缩。
configOverride.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");
//表示生产端消息缓冲池或缓冲区的大小,默认值为33554432,即32M
configOverride.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
//表示生产端与broker之间的每个连接最多缓存的请求数,默认值为5,即每个连接最多可以缓存5个未响应的请求。这个参数通常用来解决分区乱序的问题 为了避免消息乱序问题,建议将该参数设置为1
configOverride.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,1);
// 这个参数表示生产端发送请求后等待broker端响应的最长时间,默认值为30000
configOverride.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,60000);
KafkaTemplate template = new KafkaTemplate<>(producerFactory, true, configOverride);
return template;
}
}
2.4 kafkaSyncController 同步发送数据 可以默认的在kafka中创建topic
@Slf4j
@RestController
public class kafkaSyncController {
@Autowired
private KafkaTemplate template;
@GetMapping("/sendData/sync/{message}")
public String sendData(@PathVariable String message) throws ExecutionException, InterruptedException {
ListenableFuture> future = template.send("springboot-kafka-01", 0, 0, message);
//同步发送数据
SendResult SendResult = future.get();
String topic = SendResult.getRecordmetadata().topic();
long offset = SendResult.getRecordmetadata().offset();
int partition = SendResult.getRecordmetadata().partition();
log.info("topic:{},offset:{},partition:{}" ,topic,offset,partition);
return "success";
}
}
2.5 kafkaAsyncController 异步发送数据
@RestController
@Slf4j
public class kafkaAsyncController {
@Autowired
private KafkaTemplate template;
@RequestMapping("/sentdata1/{message}")
public String sendData1(@PathVariable String message){
ListenableFuture> send = template.send("springboot-kafka-02", 0, 1, message);
send.addCallback(new ListenableFutureCallback>() {
//设置回调函数 异步等待broker 端返回结果
@Override
public void onFailure(Throwable ex) {
System.out.println("发送消息失败:" + ex.getMessage());
}
@Override
public void onSuccess(SendResult result) {
Recordmetadata metadata = result.getRecordmetadata();
int partition = metadata.partition();
long offset = metadata.offset();
String topic = metadata.topic();
log.info("发送消息成功:");
log.info("topic:{},offset:{},partition:{}" ,topic,offset,partition);
}
});
return "success";
}
}
2.6 kafkaConsumer
@Slf4j
@Component
public class kafkaConsumer {
@KafkaListener(topics ="springboot-kafka-01" )
public void onMessage(ConsumerRecord record) {
log.info("消费者收到的消息");
log.info("topic:{},offset:{},partition:{},key:{},value:{}" ,
record.topic(),
record.offset(),
record.partition(),
record.key(),
record.value());
}
}
可以用postman 触发接口
http://localhost:8088//sentdata1/hello
可以看到输出台日志中已经接收到数据了.



