栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Kafka踩坑记

Kafka踩坑记

重试消费 之前认为手动提交可以重试消费

之前一直认为将消费端设置为手动提交,如果消费者方法程序因为逻辑异常就不提交当前的offset,只有正常执行完毕才提交,认为不提交的offset会自动放到kafka自动重试再次消费,然而其实并非如此,Consumder程序消费的offset只有在程序第一次启动采取服务端(_consumer_offsets队列中获取),然后在此基础上递增offset进行消费,也就是offset是一直增长的,不管是否有的消息没有被提交,如果有的offset没有提交,会在程序重新启动时,会去拉取那些没有提交的offset进行消费

通过异常的方式实现重复消费

所以手动提交无法实现,就通过在程序中手动抛异常的方式,然后通过设置setErrorHandler捕获到异常进行重试的,代码 配置如下:

异常可以拿到consumerRecord 消息对象,进而获取到所属队列,消息内容,然后就可以将该消息传入其他队列(比如自定义的死信队列)进行再次消费或者记录日志

Configuration
public class KafkaAckCusumerConfig {
    
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    
    @Value("${spring.kafka.producer.key-serializer}")
    private  String producerKeySer;
    @Value("${spring.kafka.producer.value-serializer}")
    private  String producerValSer;

    
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
    
    @Value("${spring.kafka.consumer.key-deserializer}")
    private String consumerKeySer;
    @Value("${spring.kafka.consumer.value-deserializer}")
    private String consumerValSer;
    
    @Value("${spring.kafka.consumer.max-poll-records}")
    private  String maxPollRecords;
    
    @Value("${spring.kafka.consumer.session.timeout}")
    private String sessionTimeout;
    
    @Value("${spring.kafka.consumer.heartbeat.interval}")
    private  String heartBeatInter;
    
    @Value("${spring.kafka.consumer.pollTimeout}")
    private String pollTimeOut;
        private static final Logger log = LoggerFactory.getLogger(ClusterStatePublisher.AckListener.class);

        private Map consumerProps() {

            Map props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,consumerKeySer);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,consumerValSer);
            props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxPollRecords);
            props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,sessionTimeout);
            props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,heartBeatInter);
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
            props.put(ConsumerConfig.GROUP_ID_CONFIG,"kcy");
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

            return props;

        }

        @Bean("ackContainerFactory")
        public ConcurrentKafkaListenerContainerFactory ackContainerFactory() {

            ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();

            factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));


            // 最大重试次数3次
            SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler((consumerRecord, e) -> {
                log.error("队列:{}异常,消息内容:{}.抛弃这个消息==,{}", consumerRecord.topic(),consumerRecord.toString(), e);
            }, new FixedBackOff(1000, 3));
            factory.setErrorHandler(seekToCurrentErrorHandler);


            // ack模式,详细见下文注释
           // factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

            return factory;

        }


}
auto.offset.reset

earliest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费。
latest
当创建一个新分组的消费者时,auto.offset.reset值为latest时,表示消费新的数据(从consumer创建开始,后生产的数据),之前产生的数据不消费。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/746971.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号