从Producer和Consumer两个角度分析重复消费的问题。
Producer端 消息重复场景Producer的send()方法可能会出现异常,配合生产者参数retries>0,生产者会在出现可恢复异常的时候进行重试。
若出现不可恢复异常的时候,配合send()的异步发送方式,则可能在回调函数中进行消息重发。上述均可能导致消息重复。
解决方法Kafka的幂等性就是为了避免出现生产者重试的时候出现重复写入消息的情况。
开启幂等性功能配置(该配置默认为false)如下:
prop.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);Consumer端 重复消费场景 一、自动提交消费位移
kafka默认消费位移的提交是自动提交,由消费者参数enable.auto.commit配置,默认为true。
这个自动提交并不是每消费一条消息就自动提交消费位移,而是定期提交,这个定期提交的时间由客户端参数auto.commit.interval.ms配置,默认5秒。
也就是说,在默认情况下,消费者每隔5秒会将拉取到的每个分区中最大的消息位移进行提交。
注意:自动提交的动作在poll()方法的逻辑中,在每次向服务器端发起拉取请求之前会检查是否可以进行位移提交,如果可以就提交上一次轮询的位移。
如果在拉取消息进行消费,但是下一次提交位移之前消费者崩溃了,或者在消费者关闭之前调用了consumer.unsubscribe()方法取消订阅,那么下一次就还得在上一次消费位移的位置重新开始消费,造成重复消费!
解决办法设置手动提交消费位移。
二、手动提交消费位移当开启手动提交消费位移之后,依然会出现重复消费的场景。
例如当我们的代码逻辑是拉取消息之后先处理消息,然后进行位移提交。
若处理消息的时候,提交位移之前消费者宕机,消费者重启后,则会出现重复消费的问题。
解决办法根据业务需求处理,在合适的时候进行手动位移提交。
三、再均衡再均衡是指分区的所属权从一个消费者转移到另一消费者的行为,它为消费组具备高可用
性和伸缩性提供保障,使我们可以既方便又安全地删除消费组内的消费者或往消费组内添加消
费者。
需要注意的是在再均衡发生期间,消费组内的消费者是无法读取消息的。另外,当一个分区被重新分配给另一个消费者时,消费者当前的状态也会丢失。一般情况下,应尽量避免不必要的再均衡的发生。
比如消费者消费完某个分区中的一部分消息时还没有来得及提交消费位移就发生了再均衡操作 ,之后这个分区又被分配给了消费组内的另一个消费者,原来被消费完的那部分消息又被重新消费一遍,也就是发生了重复消费。
什么情况下会发生再均衡?- 消费者、分区数量发生变化,使用正则表达式订阅主题,有符合条件的主题被创建。使用consumer.unsubscribe()取消对某些主题的订阅。max.poll.interval.ms,当通过消费组管理消费者时,该配置指定拉取消息线程最长空闲时间,若超过这个时间间隔还没有发起poll操作,则消费组认为该消费者己离开了消费组,将进行再均衡操作。
使用ConsumerRebalanceListener,再均衡监听器,它可以用来设定发生再均衡动作前后的一些准备或者收尾工作。
ConsumerRebalanceListener是一个接口,包含两个方法,具体代码如下:
//partitions 表示再均衡之前分配到的分区 void onpartitionRevoked(Collectionpartitions)
这个方法会在再均衡开始之前和消费者停止读取消息之后被调用。我们可以用它来处理消费位移的提交。
//partitions 表示再均衡之后分配到的分区 void onpartitionAssigned(Collectionpartitions)
这个方法会在重新分配分区之后和消费者开始读取消费之前被调用。
用法如下:
HashMapcurrentOffsets = new HashMap<>(); KafkaConsumer consumer = new KafkaConsumer<>(initConfig()); consumer.subscribe(Collections.singletonList(KafkaConfig.TOPIC), new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection collection) { consumer.commitSync(currentOffsets); currentOffsets.clear(); } @Override public void onPartitionsAssigned(Collection collection) { //... } }); try { while (IS_RUNNING.get()) { ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) { System.out.println("topic:" + record.topic() + ",offset:" + record.offset() + ",value:" + record.value()); currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndmetadata(record.offset() + 1)); } consumer.commitAsync(currentOffsets, new OffsetCommitCallback() { @Override public void onComplete(Map offsets, Exception e) { if (e != null) { e.printStackTrace(); } else { logger.error("fail to commit offsets {}", offsets); } } }); } } catch (Exception e) { e.printStackTrace(); }
我们将消费位移暂存在一个局部变量currentOffsets中,正常消费的时候就可以通过commitAsync()方法来异步提交消费位移。
若发生再均衡,则在发生再均衡之前通过再均衡监听器的onPartitionRevoked()回调执行commitSync()方法来同步提交位移。



