简介jar引入配置文件KafkaConfiguration消费工程配置kafka消息发送、消费示例
简介本示例用于kafka在springboot中的配置、消息发送及消息消费使用代码示例。jar引入
代码示例:
org.springframework.kafka
spring-kafka
配置文件
#kafka配置 #指定kafka代理地址(集群配多个、中间、逗号隔开) spring.kafka.bootstrap-servers=ip:9092 #producer生产环境配置=========================== #重试次数 spring.kafka.producer.retries=1 #默认批量大小(produce积累到一定数据,一次发送) spring.kafka.producer.batch-size=16384 #缓冲总内存大小(32M) spring.kafka.producer.buffer-memory=33554432 #kafka原生的StringSerializer编码序列化方式 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer #kafka原生的StringSerializer解码序列化方式 spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #consumer消费环境配置=========================== #消费者标识字符串(自定义、标记消费者是谁) spring.kafka.consumer.boot.group-id=boot_group_id #kafka偏移量设置(earliest:从头开始消费) spring.kafka.consumer.auto-offset-reset=earliest #在一次 poll() 调用中返回的最大记录数 spring.kafka.consumer.max-poll-records=100 #设置自动提交offset spring.kafka.consumer.enable-auto-commit=true #消费者偏移自动提交给Kafka的频率(以毫秒为单位)、默认值为5000 spring.kafka.consumer.auto-commit-interval=1000 #kafka原生的StringSerializer编码序列化方式 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer #kafka原生的StringSerializer解码序列化方式 spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializerKafkaConfiguration消费工程配置
代码示例:
package com.gxl.springbootproject.config.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${spring.kafka.consumer.max-poll-records}")
private Integer maxPollRecords;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private Boolean autoCommit;
@Value("${spring.kafka.consumer.auto-commit-interval}")
private Integer autoCommitInterval;
@Value("${spring.kafka.consumer.key-deserializer}")
private String keyDeserializer;
@Value("${spring.kafka.consumer.value-deserializer}")
private String valueDeserializer;
@Value("${spring.kafka.consumer.boot.group-id}")
private String bootGroupId;
@Bean
public Map consumerConfigs() throws ClassNotFoundException {
Map props = new HashMap<>();
//指定kafka代理地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
//kafka偏移量设置
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
//在一次 poll() 调用中返回的最大记录数
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
//设置自动提交offset
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
//消费者偏移自动提交给Kafka的频率(以毫秒为单位)、默认值为5000
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
//kafka原生的StringSerializer编码序列化方式
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Class.forName(keyDeserializer));
//kafka原生的StringSerializer解码序列化方式
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Class.forName(valueDeserializer));
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
return props;
}
@Bean
public KafkaListenerContainerFactory> boot_batchFactory() throws ClassNotFoundException {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
Map props = consumerConfigs();
//指定消费者group-id
props.put(ConsumerConfig.GROUP_ID_CONFIG, bootGroupId);
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
//设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
factory.setBatchListener(true);
return factory;
}
}
kafka消息发送、消费示例
代码示例:
package com.gxl.springbootproject.controller;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@Api(tags = "kafka接口管理")
@RestController
@RequestMapping("/kafka")
@Slf4j
public class KafkaController {
@Resource
KafkaTemplate kafkaTemplate;
@ApiOperation("kafka消息发送")
@PostMapping("/send/message")
public void send(@RequestParam("message") String message){
//kafka消息发送(topic:【自定义,与消费者topic一致】、message【消息内容】)
kafkaTemplate.send("boot_topic",message);
log.info("kafka消息发送成功,message=" + message);
}
@KafkaListener(topics = "boot_topic", containerFactory = "boot_batchFactory")
public void bootTopic(String message){
log.info("kafka消息接收成功,message=" + message);
}
}



