- 强行kill线程,导致消费后的数据,offset没有提交(消费系统宕机、重启等)
- 网络波动,导致offset没提交
- 当消费者消费的速度很慢的时候,可能在一个session周期内还未完成,导致心跳机制检测报告出问题
- 消费后的数据,当offset还没有提交时,partition就断开连接
- 最根本的原因是消费之后的offset未提交
- 第一种思路是提高消费能力,提高单条消息的处理速度,例如对消息处理中比 较耗时的步骤可通过异步的方式进行处理、利用多线程处理等。在缩短单条消息消费时常的同时,根据实际场景可将max.poll.interval.ms值设置大一点,避免不 必要的rebalance,此外可适当减小max.poll.records的值,默认值是500,可根 据实际消息速率适当调小。这种思路可解决因消费时间过长导致的重复消费问题, 对代码改动较小,但无法绝对避免重复消费问题。
- 第二种思路是引入单独去重机制,例如生成消息时,在消息中加入唯一标识符如消息id等。在消费端,我们可以保存最近的1000条消息id到redis或mysql表中,配置max.poll.records的值小于1000。在消费消息时先通过前置表去重后再进行消息的处理。
- 模拟网络问题
package com.demo.demo.kafka;
import com.demo.demo.pojo.MsgInfo;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
@Component
public class KafkaTest {
//topic
private final static String TOPIC_NAME = "my-replicated-topic";
//程序执行的初始时间,只会保留一份
private static final AtomicLong lastRecieveMessage = new AtomicLong(System.currentTimeMillis());
//时间转换
private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//缓存
private final List> DataList = new ArrayList<>(500);
//json
private final Gson gson = new GsonBuilder().create();
//kafka
@Autowired
private KafkaTemplate
- 观察结果
- 恢复网络,正常通信
@KafkaListener(topics = TOPIC_NAME,groupId = "MyGroup1")
public void listenGroup(ConsumerRecord record, Acknowledgment ack) {
DataList.add(record);
//⼿动提交offset
ack.acknowledge();
}
- 观察结果
- 对比
结论:造成了重复消费
1.4 解决package com.demo.demo.config;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://ip:6379")
.setPassword("admin123");
return Redisson.create(config);
}
}
package com.demo.demo.kafka;
import com.demo.demo.pojo.MsgInfo;
import com.demo.demo.utils.RedisUtil;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
@Component
public class RedissonKafka {
private final static String TOPIC_NAME = "my-replicated-topic";
//程序执行的初始时间,只会保留一份
private static final AtomicLong lastRecieveMessage = new AtomicLong(System.currentTimeMillis());
//时间转换
private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//前缀
private static final String KEY_PREFIX = "key";
//缓存
private final List> DataList = new ArrayList<>(500);
//json
private final Gson gson = new GsonBuilder().create();
//kafka
@Resource
private KafkaTemplate 


