栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Kafka消费者处理消息失败后,无限消费该失败消息的问题

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Kafka消费者处理消息失败后,无限消费该失败消息的问题

结论:如果消费者开启了批量消费的配置,那么必须同时配置一个当批量消费出现异常的处理器。否则仅配置启动消费者批量消费是会出现问题的。

@Bean("batchConsumerFactory")
    public KafkaListenerContainerFactory consumerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig()));
        // 设置为批量消费,每个批次数量在Kafka配置参数中设置
        factory.setBatchListener(true);
        // 批量消费失败时的处理函数
        factory.setBatchErrorHandler((e, consumerRecords) -> {
            for (ConsumerRecord consumerRecord : consumerRecords) {
                log.error("批量消费异常:{}, 请求参数:{}", e.getMessage(), consumerRecord);
            }
        });
        // 设置消费者poll方法长轮询时间
        factory.getContainerProperties().setPollTimeout(500);
        return factory;
    }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/781457.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号