栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

《Kafka权威指南》——问题1——onParitionsAssigned

《Kafka权威指南》——问题1——onParitionsAssigned

四、Kafka消费者——从Kafka读取数据 4.8 从特定偏移量处开始处理数据

4.7节中说到,在调用subcribe()方法时传进去一个ConsumerRebalanceListener实例,可以在为消费者分配新分区或移除分区时,可以调用执行一些代码。

  1. onPartitionsRevoked 方法会在再均衡开始之前和消费者读取消息之后被调用
  2. onParitionsAssigned 方法会在重新分配分区之后和消费者开始读取消息之前被调用

4.8节中,使用seek()方法,并在消费者启动或分配到新分区时,可以使用seek()方法查找保存在数据库里的偏移量。
示例:

public class SaveOffsetsOnrebalnce implements ConsumerRebalanceListener{
	public void onPartitionsRevoked(Collection partitions){
		commitDBTransaction(); // 1
	}
	public void onParitionsAssigned(Collection partitions){
		for (TopicPartition partition : partitions)
			consumer.seek(partition, getOffsetFromDB(partition)); // 2
	}
	consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer));
	consumer.poll(0);
	for (TopicPartition partition : consumer.assignment())
		consumer.seek(partition, getOffsetFromDB(partition)); // 3

	while (true){
		ConsumerRecords records = consumer.poll(100);
		...
	}
}

书上说
2处,从数据库获取偏移量,在分配到新分区的时候,使用seek()方法定位到那些记录。
3处,订阅主题后,开始启动消费者,我们马上调用一次poll()方法,让消费者加入到消费者群组里,并获取分配到的分区,然后马上调用seek()方法定位分区的偏移量。

问题

  1. 为什么在有 onParitionsAssigned 中调用seek()的情况下(消费者加入后应该会调用),还要在3处seek()第二遍?
  2. 为什么需要poll(0)?不是很理解 <让消费者加入到消费者群组里>的解释。
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/699700.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号