栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

spring-kafka集成demo

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

spring-kafka集成demo

前言

java应用程序接入kafka的方式非常多,在不同的架构体系有着不同的接入方式。比如

SpringMVC项目,可以使用kafka-clients
SpringBoot项目,可以使用Spring-kafka
SpringCloud项目,可以使用Spring-cloud-starter-stream-kafka

其中kafka-clients的通用性比较强,生产者、消费者都需要程序员手动去配置,也比较灵活,对于初学者而言,还是比较推荐此方式去开发。

不过,当我们使用SpringBoot项目时,尽可能以“规约大于配置”的方式去进行项目开发,可以缩短开发成本(尽管屏蔽了很多底层原理)。

本文将以Spring-kafka的方式给大家介绍,如何将kafka集成到SpringBoot应用中。

配置

依赖配置
pom.xml


    org.springframework.kafka
    spring-kafka

kafka配置
bootstart.yml

spring:
  kafka:
    bootstrap-servers: 10.14.8.149:9092
    producer:
      retries: 3
      batch-size: 16384
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: 1
    consumer:
      enable-auto-commit: false
      key-serializer: org.apache.kafka.common.serialization.StringDeserializer
      value-serializer: org.apache.kafka.common.serialization.StringDeserializer
      auto-offset-reset: earliest
    listener:
      concurrency: 1 # 配置消费者数量
      ack-mode: manual # 手动模式

kafka初始化配置
KafkaConfig.java

@Component
public class KafkaConfig {
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory kafkaConsumerFactory,
            KafkaTemplate template) {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        //最大重试三次
        BackOff backOff = new FixedBackOff(10 * 1000L, 3L);
        BiFunction, Exception, TopicPartition> DEFAULT_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() + "_retry", cr.partition());
        factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template,DEFAULT_DESTINATION_RESOLVER),backOff));
        return factory;
    }
}

kafka生产者

@Autowired
KafkaTemplate kafkaTemplate;

消费者

@KafkaListener(topics = {KafkaConstants.TOPIC_WHALE_DATA}, groupId = KafkaConstants.GROUP_WHALE_DATA, containerFactory = "kafkaListenerContainerFactory")
@Transactional(rollbackFor = Exception.class)
public void receiveMsg(ConsumerRecord record, Acknowledgment ack) throws Exception {
    logger.info("kafka processMessage start");
    logger.info("processMessage, topic = {}, msg = {}", record.topic(), record.value());
    // do something...
    ack.acknowledge();
    logger.info("kafka processMessage end");
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/777665.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号