Kafka的偏移量其实是顺序递增,简单理解就是自增id,从0开始。
Kafka一般不会重复消费的,因为Kafka的broke会记录每个消费者组的消费offset,每次开启消费的时候,会从这个offset开始消费。
Kafka如何消费已经消费过的数据办法1:开启另一个消费者组,进行消费。因为新的消费者组的offset肯定是从0开始的。
办法2:在这个消费者组中,重新设置偏移量进行消费。
还有其他的啥办法不,这个我不知道了。就先记到这里吧。
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "173.14.22.49:9092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//设置消费的偏移量,如果以前消费过则接着消费,如果没有就从头开始消费
properties.put("auto.offset.reset", "earliest");
properties.put("enable.auto.commit", false);
//消费者组
properties.put("group.id", "he1qdaqwq2iow_1234");
KafkaConsumer consumer = new KafkaConsumer(properties);
//注意一定不要用订阅模式
//consumer.subscribe(Collections.singleton("topicName"));
TopicPartition topicPartition = new TopicPartition("topicName", 0);
consumer.assign(Arrays.asList(new TopicPartition("topicName",0),new TopicPartition("topicName",1)));
consumer.seek(new TopicPartition("topicName",0), 6);
while (flag) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(3000));
System.out.println("Get record size : " + records.count());
for (ConsumerRecord record : records) {
// 循环打印消息记录
//处理消息
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
consumer.commitAsync();
}
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
consumer.close();
}
} 


