- 1.max.poll.records
- 2.max.poll.interval.ms
- 3.enable.auto.commit
- 4.heartbeat.interval.ms
- 5.session.timeout.ms
- 6.auto.offset.reset
- 6.指定分区消费
- 7.消息回溯消费
- 8.指定offset消费
- 9.指定时间点开始消费
- 默认情况下,消费者一次会poll500条消息
- 一次poll到500条,就直接执行for循环
- 一次没有poll到500条,且时间在1秒内,长轮询继续poll,要么到500条,要么到1s
- 如果多次poll都没到500条,且1秒时间到了,那么直接for循环
props.put("max.poll.records",500);
实例:
while (true) { //1)
ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
for(ConsumerRecord record : records){
System.out.printf("topic = %s, partition = %s, offset = %d, key = %s, value =%sn",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
2.max.poll.interval.ms
- 可以根据消费速度的快慢来设置,因为如果两次poll的时间超出了30s的时间间隔,kafka会认为消费能力过弱,将其踢出消费组,将分区分配给其他消费者。
- 会触发rebalance机制,rebalance机制会造成性能开销,可以通过设置max.poll.records参数,让一次poll的消息条数少一点。
props.put("max.poll.interval.ms",30*1000);
3.enable.auto.commit
- 指定了消费者是否自动提交消费位移,默认为true。
- 如果需要减少重复消费或者数据丢失,你可以设置为false。
- 如果为true,需要关注自动提交的时间间隔,该间隔由auto.commit.interval.ms设置。
props.put("enable.auto.commit",false);
4.heartbeat.interval.ms
- consumer给broker发送心跳的间隔时刻,每隔1s
props.put("heartbeat.interval.ms",1000);
5.session.timeout.ms
props.put("session.timeout.ms",10*1000);
- kafka如果超过10秒没有收到消费者宕心跳,则会把消费者提出消费者组,进行rebalance,把分区分配给其他消费者
- 默认latest:新消费组中的消费者在启动以后,默认会从当前分区的最后一条消息的offset+1开始消费(消费新消息)。
- earliest:可以通过以下设置,让新的消费者第一次从头开始消费,之后开始消费新消息(最后消费的位置的偏移量+1)
//新消费组从头消费
props.put("auto.offset.reset","earliest");
6.指定分区消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));7.消息回溯消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0))); consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));8.指定offset消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0))); consumer.seek(new TopicPartition(TOPIC_NAME,0),10);9.指定时间点开始消费
//从指定时间点开始消费
List partitionInfos = consumer.partitionsFor(TOPIC_NAME);
//从1小时前开始消费
long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
HashMap map = new HashMap<>();
for (PartitionInfo partitionInfo : partitionInfos){
map.put(new TopicPartition(TOPIC_NAME, partitionInfo.partition()),fetchDataTime);
}
//分区,偏移量
Map parMap = consumer.offsetsForTimes(map);
for(Map.Entry entry : parMap.entrySet()){
TopicPartition key = entry.getKey();
OffsetAndTimestamp value = entry.getValue();
if(key == null || value == null) continue;
long offset = value.offset();
System.out.println("partition-" + key.partition() + " |offset-" + offset);
System.out.println();
consumer.assign(Collections.singletonList(key));
consumer.seek(key,offset);
}



