1.kafka再次消费消费过的消息
出现问题的原因:
1.强行kill线程,导致消费后的数据,offset没有提交(消费系统宕机、重启等
2.设置offset为自动提交,关闭kafka时,如果在close之前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费
3.(常见)消费后的数据,当offset还没有提交时,partition就断开连接。比如,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间(0.10.x版本默认是30秒),那么就会re-blance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费
4.当消费者重新分配partition的时候,可能出现从头开始消费的情况,导致重发问题
5.当消费者消费的速度很慢的时候,可能在一个session周期内还未完成,导致心跳机制检测报告出问题
2.手动设置数据重复消费
1.如果想再次看到重复数据,再次设置一个消费者SimpleConsumer中设置topic01 重新读之前的数据
3.消费完的数据会删掉吗
不会删掉,offset会增加,数据在partition中存在
4.kafka中数据存在哪里
config/server.config中,log.dir设置的
3.kafka怎么删除掉过去的信息
1.修改server.properties#delete.topic.enable=true
根据topic删除命令./bin/kafka-topics --delete --zookeeper 【zookeeper server:port】 -- topic 【topic name】
2.logdir删除下文件手动删除(不推荐)
private static final String KAFKA_TOPIC = "kafka-topic";
private static final String KAFKA_HOST = "host";
private static final String KAFKA_GROUP_ID = "group-id";
//获取topic列表所有信息 手动设置偏移量
public void textResetOffset(Long offset){
//form a properties to new consumer
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, KAFKA_GROUP_ID);
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer consumer = new KafkaConsumer<>(properties);
//subscribe incoming topic
consumer.subscribe(Collections.singletonList(KAFKA_TOPIC));
//get consumer consume partitions
List partitionInfos = consumer.partitionsFor(KAFKA_TOPIC);
List topicPartitions = new ArrayList<>();
for(PartitionInfo partitionInfo : partitionInfos){
TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
topicPartitions.add(topicPartition);
}
// poll data from kafka server to prevent lazy operation
consumer.poll(Duration.ofSeconds(200L));
//reset offset from beginning
consumer.seekToBeginning(topicPartitions);
//reset designated partition offset by designated spot
consumer.seek(topicPartitions.get(0), offset);
//reset offset to end
consumer.seekToEnd(topicPartitions);
//consume message as usual
ConsumerRecords records = consumer.poll(Duration.ofSeconds(1));
Iterator> iterator = records.iterator();
while (iterator.hasNext()){
ConsumerRecord record = iterator.next();
System.out.println("topic2:"+record.topic());
System.out.println("value2:"+record.value());
}
}