前言:
本文是建立在已整合nacos 的基础上进行的扩展,如需要整合Nacos 可以参考:
https://blog.csdn.net/l123lgx/article/details/121624988
本文是建立在服务器已经部署Kafka的基础只上进行的整合,如需要部署Kafka可以参考:https://blog.csdn.net/l123lgx/article/details/122047659
1 Kafka 介绍:
Spring for Apache Kafka (spring-kafka) 项目将核心 Spring 概念应用于基于 Kafka 的消息传递解决方案的开发;
官网地址:https://docs.spring.io/spring-kafka/docs/current/reference/html/
2 springcloud 整合kafka:
2.1 引入kafka jar:
org.springframework.kafka spring-kafka 2.6.0
2.2 nacos kafka 配置文件:
spring:
kafka:
# kafka 服务地址
bootstrap-servers: kafkaip:9092
# 消费者配置
consumer:
autostartup: false
# 消费者分组
group-id: consumer-1
properties:
# 设置从什么位置进行消费,这里设置从最早开始消费
auto-offset-reset: earliest
# 消费偏移量是否自动提交,这里设置false
enable-auto-commit: false
# 心跳间隔
heartbeat-interval-ms: 2000
# enable-auto-commit为true 时使用,自动提交的间隔时间
auto-commit-interval-ms: 500
# kafka 每次拉取的记录数
max-poll-records: 10
# session 超时时间
session-timeout-ms: 6000
# 加密
security-protocol: SASL_PLAINTEXT
sasl-mechanism: PLAIN
# 加密的实现类及配置 账号和密码
sasl-jaas-config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="12345678";
# 生产者配置
producer:
# 生产者id
client-id: producer-1
properties:
# 失败时重试次数
retries: 3
# 批量发送的消息数量
batch-size: 16384
# 32MB的批处理缓冲区
buffer-memory: 33554432
# 发送信息时所有的机器都写入成功,此消息为发送成功
acks: all
# 加密
security-protocol: SASL_PLAINTEXT
sasl-mechanism: PLAIN
sasl-jaas-config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="12345678";
2.3 kafka 配置文件kafkaConfig.java:
import com.google.common.collect.Maps;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import java.util.Map;
@Configuration
@EnableKafka
public class kafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.properties.enable-auto-commit}")
private Boolean autoCommit;
@Value("${spring.kafka.consumer.properties.auto-commit-interval-ms}")
private Integer autoCommitInterval;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.properties.max-poll-records}")
private Integer maxPollRecords;
@Value("${spring.kafka.consumer.properties.auto-offset-reset}")
private String autoOffsetReset;
@Value("${spring.kafka.consumer.properties.security-protocol:PLAINTEXT}")
private String securityProtocol;
@Value("${spring.kafka.consumer.properties.sasl-mechanism:GSSAPI}")
private String saslMechanism;
@Value("${spring.kafka.consumer.properties.sasl-jaas-config:null}")
private String saslJaasConfig;
@Value("${spring.kafka.consumer.autoStartup:false}")
private boolean autoStartup;
@Value("${spring.kafka.producer.properties.retries}")
private Integer retries;
@Value("${spring.kafka.producer.properties.batch-size}")
private Integer batchSize;
@Value("${spring.kafka.producer.properties.buffer-memory}")
private Integer bufferMemory;
@Value("${spring.kafka.producer.properties.acks}")
private String acks;
@Value("${spring.kafka.producer.client-id}")
private String clientId;
@Value("${spring.kafka.producer.properties.security-protocol:PLAINTEXT}")
private String securityProtocolProducer;
@Value("${spring.kafka.producer.properties.sasl-mechanism:GSSAPI}")
private String saslMechanismProducer;
@Value("${spring.kafka.producer.properties.sasl-jaas-config:null}")
private String saslJaasConfigProducer;
@Value("${spring.cric.bi.environment:dev}")
private String environment;
@Bean
public Map producerConfigs() {
Map props = Maps.newHashMap();
props.put(ProducerConfig.ACKS_CONFIG, acks);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.CLIENT_ID_CONFIG,clientId);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
props.put("security.protocol", securityProtocolProducer);
props.put("sasl.mechanism", saslMechanismProducer);
props.put("sasl.jaas.config", saslJaasConfigProducer);
return props;
}
@Bean
public ProducerFactory producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate kafkaTemplateCustomer() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Map batchConsumerConfigs() {
Map props = Maps.newHashMap();
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,autoCommit);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,autoCommitInterval);
props.put("security.protocol", securityProtocol);
props.put("sasl.mechanism", saslMechanism);
props.put("sasl.jaas.config", saslJaasConfig);
return props;
}
@Bean
public KafkaListenerContainerFactory> batchFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(batchConsumerConfigs()));
//设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
factory.setBatchListener(true);
factory.setConcurrency(1);
//设置提交偏移量的方式
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
// if (!autoStartup){
// factory.setAutoStartup(false);
// }
return factory;
}
}
3 测试:
3.1 测试生产者 KafkaTestProducerController.java:
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@RestController
public class KafkaTestProducerController {
static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//24小时制
@Autowired
@Qualifier("kafkaTemplateCustomer")
private KafkaTemplate kafkaTemplate;
@RequestMapping(value ="message/send", method = RequestMethod.GET, produces = "application/json;charset=utf-8")
public Map send(@RequestParam String msg){
Map mapData = new HashMap<>();
try {
ListenableFuture data = kafkaTemplate.send("test1","weixin_123");
ListenableFuture data1 = kafkaTemplate.send("test1","weixin_123");
ListenableFuture data2 = kafkaTemplate.send("test1","weixin_123");
Object obj = data.get();
mapData.put("success",true);
}catch (Exception e){
log.error(e.getMessage());
mapData.put("success",false);
mapData.put("errorMs",e.getMessage());
}
return mapData;
}
}
3.2 测试消费者KafkaTestConsumer.java:
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.List;
@Slf4j
@Component
public class KafkaTestConsumer {
@KafkaListener(topics = "test1",containerFactory = "batchFactory")
public void articleConsumerWx(List> records, Consumer consumer){
Long time1 = System.currentTimeMillis();
records.stream().forEach(e->{
System.out.println("e.value() = " + e.value());
});
// 提交偏移量
consumer.commitAsync();
// consumer.commitSync();
Long time2 = System.currentTimeMillis();
log.debug("消费{}条数据,耗时:{}",records.size(),(time2-time1)/1000);
}
}
参考:https://docs.spring.io/spring-kafka/docs/current/reference/html



