基本概念
消费者和消费组 流程配置参数订阅消费
消费位置 提交消费位移(offset)
提交offset失败的处理 消息回溯再均衡消费者拦截器多线程实现
基本概念 消费者和消费组消费者是具体实例,消费组是一个约束、划分消费者的东西。
在同一时间内,同一个消费组内,一个partition只能分配给一个消费者,而一个消费者可以消费多个partition;在多个消费组内,一个partition可以分配给多个消费组(同样每个消费组内只有一个消费者能消费该partition)。
一个partition对应一台机器,维护offset比较简单;一个partition对应多台机器从技术上肯定能做到,但要么需要解决冲突,要么允许重复消费,这样就损耗了性能、资源,弊大于利。
可以参考huxihx大佬的回答
https://www.zhihu.com/question/328057678?ivk_sa=1024320u
消息的消费一般有两种模式:推模式和拉模式。
推模式是服务端主动将消息推送给消费者拉模式是消费者主动向服务端发起请求来拉取消息。
Kafka中的消费是基于拉模式的。
流程一个正常的消费逻辑需要具备以下几个步骤:
- 配置消费者客户端参数及创建相应的消费者实例。订阅主题。拉取消息并消费。提交消费位移。关闭消费者实例。
bootstrap.servers:同生产者key.deserializer 和 value.deserializer:反序列化器,类比生产者的序列化器group.id:消费者隶属的消费组的名称client.id:消费者实例的名称,默认为“consumer-{number}”fetch.min/max.bytes:一次拉取(poll)能从Kafka中拉取的最小/大数据量,默认值为1max.poll.records:Consumer在一次拉取请求中拉取的最大消息数,默认值为500条 订阅
subscribe方法订阅主题
assign方法指定分区
轮询poll方法消费
public ConsumerRecordspoll(final Duration timeout)
ConsumerRecords的records方法可根据分区维度或topic维度消费消息
消费位置每当消费者查找不到所记录的消费位移时,如:
当一个新的消费组建立的时候,它根本没有可以查找的消费位移消费组内的一个新消费者订阅了一个新的主题,它也没有可以查找的消费位移当__consumer_offsets主题中有关这个消费组的位移信息过期而被删除后,它也没有可以查找的消费位移
就会根据消费者客户端参数auto.offset.reset的配置来决定从何处开始进行消费,这个参数的默认值为“latest”,表示从分区末尾开始消费消息。
位移越界也会触发 auto.offset.reset参数的执行
提交消费位移(offset)偏移量:对于Kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中对应的位置。
位移:对于消费者而言,它也有一个offset的概念,消费者使用offset来表示消费到分区中某个消息所在的位置。
offset必须要持久化,不然宕机就丢失了。kafka的做法是,消费位移存储在Kafka内部的主题__consumer_offsets中。这里把将消费位移存储起来(持久化)的动作称为“提交”,消费者在消费完消息之后需要执行消费位移的提交。
需要非常明确的是,当前消费者需要提交的消费位移并不是 x,而是 x+1
默认情况下,每个5s自动进行一次提交消费位移;
这样的逻辑虽然简单,但存在问题:
没消费完就提交了,造成消息丢失消费完,在提交过程中消费者宕机,重启后会重复消费
因此实际上需要根据代码逻辑手动提交
同步提交:commitSync方法异步:commitAsync方法,不阻塞消费线程,可能提交消费位移的结果未返回就开始了新一轮的poll 提交offset失败的处理
可以自己维护一个递增的提交顺序,相同时重试提交,比原值大时不需要重试,
重试会增加代码逻辑的复杂度,不重试会增加重复消费的概率,需要综合考虑。
//指定分区和该分区offset public void seek(TopicPartition partition, long offset)
可以通过offsetsForTimes方法根据timestamp获取offset间接seek
再均衡再均衡是指分区的所属权从一个消费者转移到另一消费者的行为,它为消费组具备高可用性和伸缩性提供保障,使我们可以既方便又安全地删除消费组内的消费者或往消费组内添加消费者。不过在再均衡发生期间,消费组会变得不可用,消费者当前的状态也会丢失。
比如消费者消费完还没有来得及提交消费位移就发生了再均衡操作,原来被消费完的那部分消息就被重复消费了。
public interface ConsumerRebalanceListener {
//在再均衡开始之前和消费者停止读取消息之后被调用。可以通过这个回调方法来处理消费位移的提交,以此来避免一些不必要的重复消费现象的发生。参数partitions表示再均衡前所分配到的分区。
void onPartitionsRevoked(Collection partitions);
//在重新分配分区之后和消费者开始读取消费之前被调用。参数partitions表示再均衡后所分配到的分区。
void onPartitionsAssigned(Collection partitions);
}
消费者拦截器
略
多线程实现KafkaProducer是线程安全的,KafkaConsumer是非线程安全的。
KafkaConsumer中的每个公用方法(除了wakeup)在执行所要执行的动作之前都会调用acquire方法,类似加锁,但它是轻量级锁,仅通过线程操作计数标记的方式来检测线程是否发生了并发操作,以此保证只有一个线程在操作。release方法解锁。
常见的多线程消费为一个线程一个KafkaConsumer实例
多个消费线程同时消费同一个分区也是可以的,通过 assign、seek等方法实现,这样可以打破原有的消费线程的个数不能超过分区数的限制。不过这种实现方式对于位移提交和顺序控制的处理就会变得非常复杂,实际应用中使用得极少。
实际上poll的速度很快,瓶颈在拉取消息后的处理逻辑,所以使用线程池多线程处理即可。



