java应用程序接入kafka的方式非常多,在不同的架构体系有着不同的接入方式。比如
SpringMVC项目,可以使用kafka-clients
SpringBoot项目,可以使用Spring-kafka
SpringCloud项目,可以使用Spring-cloud-starter-stream-kafka
其中kafka-clients的通用性比较强,生产者、消费者都需要程序员手动去配置,也比较灵活,对于初学者而言,还是比较推荐此方式去开发。
不过,当我们使用SpringBoot项目时,尽可能以“规约大于配置”的方式去进行项目开发,可以缩短开发成本(尽管屏蔽了很多底层原理)。
本文将以Spring-kafka的方式给大家介绍,如何将kafka集成到SpringBoot应用中。
配置依赖配置
pom.xml
org.springframework.kafka spring-kafka
kafka配置
bootstart.yml
spring:
kafka:
bootstrap-servers: 10.14.8.149:9092
producer:
retries: 3
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: 1
consumer:
enable-auto-commit: false
key-serializer: org.apache.kafka.common.serialization.StringDeserializer
value-serializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
listener:
concurrency: 1 # 配置消费者数量
ack-mode: manual # 手动模式
kafka初始化配置
KafkaConfig.java
@Component
public class KafkaConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory
kafka生产者
@Autowired KafkaTemplate kafkaTemplate;
消费者
@KafkaListener(topics = {KafkaConstants.TOPIC_WHALE_DATA}, groupId = KafkaConstants.GROUP_WHALE_DATA, containerFactory = "kafkaListenerContainerFactory")
@Transactional(rollbackFor = Exception.class)
public void receiveMsg(ConsumerRecord record, Acknowledgment ack) throws Exception {
logger.info("kafka processMessage start");
logger.info("processMessage, topic = {}, msg = {}", record.topic(), record.value());
// do something...
ack.acknowledge();
logger.info("kafka processMessage end");
}



