4.7节中说到,在调用subcribe()方法时传进去一个ConsumerRebalanceListener实例,可以在为消费者分配新分区或移除分区时,可以调用执行一些代码。
- onPartitionsRevoked 方法会在再均衡开始之前和消费者读取消息之后被调用
- onParitionsAssigned 方法会在重新分配分区之后和消费者开始读取消息之前被调用
4.8节中,使用seek()方法,并在消费者启动或分配到新分区时,可以使用seek()方法查找保存在数据库里的偏移量。
示例:
public class SaveOffsetsOnrebalnce implements ConsumerRebalanceListener{
public void onPartitionsRevoked(Collection partitions){
commitDBTransaction(); // 1
}
public void onParitionsAssigned(Collection partitions){
for (TopicPartition partition : partitions)
consumer.seek(partition, getOffsetFromDB(partition)); // 2
}
consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer));
consumer.poll(0);
for (TopicPartition partition : consumer.assignment())
consumer.seek(partition, getOffsetFromDB(partition)); // 3
while (true){
ConsumerRecords records = consumer.poll(100);
...
}
}
书上说
2处,从数据库获取偏移量,在分配到新分区的时候,使用seek()方法定位到那些记录。
3处,订阅主题后,开始启动消费者,我们马上调用一次poll()方法,让消费者加入到消费者群组里,并获取分配到的分区,然后马上调用seek()方法定位分区的偏移量。
问题
- 为什么在有 onParitionsAssigned 中调用seek()的情况下(消费者加入后应该会调用),还要在3处seek()第二遍?
- 为什么需要poll(0)?不是很理解 <让消费者加入到消费者群组里>的解释。



