栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

kafka重复消费问题

kafka重复消费问题

事件经过

自己本地写了一个kafka的demo,起了一个消费者

@KafkaListener(topics = KafkaConstants.SMS_TOPIC_NAME, groupId = "sms")
    public void listenGroup(ConsumerRecord record, Acknowledgment ack){

        String value = record.value();
        System.out.println("value1:" + value);
        System.out.println("record1:" + record);

        SmsThreadPool.submit(value);
        ack.acknowledge();
    }

但是如果过两天,offset就会从0开始重新消费。

解决

见kafka官方文档https://kafka.apache.org/documentation.html#upgrade

  • offsets.retention.minutes

    After a consumer group loses all its consumers (i.e. becomes empty) its offsets will be kept for this retention period before getting discarded. For standalone consumers (using manual assignment), offsets will be expired after the time of last commit plus this retention period.

    Type:int
    Default:10080
    Valid Values:[1,...]
    importance:high
    Update Mode:read-only

当一个消费组的所有消费者断开链接后,offset的位移量会保留的时间,默认10080分钟,即7天。

Notable changes in 2.0.0

  • KIP-186 increases the default offset retention time from 1 day to 7 days. This makes it less likely to "lose" offsets in an application that commits infrequently. It also increases the active set of offsets and therefore can increase memory usage on the broker. Note that the console consumer currently enables offset commit by default and can be the source of a large number of offsets which this change will now preserve for 7 days instead of 1. You can preserve the existing behavior by setting the broker config offsets.retention.minutes to 1440.

2.0.0版本之后,默认值是7天,之前版本是1天。

我的本地版本是1.0.1,所以保留时间是1天,推测是2天后启动,offset就被重置了。

所以安全起见,修改配置文件,将offsets.retention.minutes=10080,与log.retention.hours默认的168(7天)一致。文件路径:kafka/config/server.properties

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/688332.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号