栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Kafka时间相关参数作用的理解及使用

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Kafka时间相关参数作用的理解及使用

session.timeout.ms 与 max.poll.interval.ms区别

在Kafka 0.10.0和更早的版本,只有session.timeout.ms。而max.poll.interval.ms是Kafka 0.10.1才引入的。

在老版本(0.10.0和更早的版本,以后都简称老版本,之后的版本称为新版本)中,处理线程和心跳线程是没有剥离的,是在poll()逻辑中做心跳包的,假设处理一条消息需要1分钟。如果heartbeat和poll是耦合的,你需要设置session.timeout.ms大于1分钟,以防止消费者超时,进而触发rebalance。这样做正常情况下也没有问题,但是,如果一个消费者Consumer真的挂掉,极端情况下它需要超过1分钟的时间才能检测到失败的消费者,再开始rebalance,也就是说对消费者的状态检测强依赖于数据的处理时效。

所以为了解决之种强耦合,在之后的版本,通过将心跳检测从poll()的调用中解耦,把心跳检测做守护线程,允许数据处理时间超过心跳间隔(即:两个连续poll()之间的时间)。

新版本解耦轮询和心跳,允许在两个连续轮询之间发送心跳,也就是说心跳检测线程和数据处理线程互不干扰。现在有两个线程在运行,即心跳线程和处理线程,因此新版本每个线程引入了超时。Session.timeout.ms表示心跳线程,而max.poll.interval.ms表示处理线程处理最大时间。

假设设置了session.timeout.Ms=30000,因此,消费者心跳线程必须在此时间到期之前向代理发送心跳。另一方面,如果单个消息的处理需要1分钟,则可以将max.poll.interval.ms设置为大于1分钟的值,以便给处理线程更多的时间来处理消息。

如果处理线程死亡,则需要max.poll.interval.ms来检测。但是,如果整个Consumer死亡(一个死亡的处理线程很可能使包括heartbeat线程在内的整个使用者崩溃),只需要session.timeout.ms就可以检测到它。

其思想是,即使处理本身需要相当长的时间,也允许快速检测失败的消费者。

max.poll.interval.ms主要是一个客户端概念:如果在max.poll.interval.ms内没有调用poll(),心跳线程将检测到这种情况,并向Broker发送离开组请求(心跳线程里会检查如果poll间隔超出了maxPollIntervalMs,就会发起LeaveGroup请求,导致rebalance)。max.poll.interval.ms还与消费者组的rebalance相关:如果rebalance被触发,消费者有max.poll.interval.ms时间通过调用poll()客户端来重新加入组,该客户端触发了一个加入组的请求。

总结来说,session.timeout.ms主要处理的是Consumer的状态,而max.poll.interval.ms监控是处理线程的状态,可以防止处理线程卡死,导致的处理超时。

heartbeat.interval.ms

heartbeat.interval.ms 表示 consumer 每次向 broker 发送心跳的时间间隔。heartbeat.interval.ms = 60000 表示 consumer 每 60 秒向 broker 发送一次心跳。一般来说,session.timeout.ms 的值是 heartbeat.interval.ms 值的 3 倍以上,因为我们不能因为一次没有收到心跳包就认为这个consumer挂掉,踢出group。这种好处主要是可以避免网络抖动,服务发生GC时没有发送心跳包等问题.

max.poll.interval.ms

max.poll.interval.ms:每次消费的处理时间,注意他会检测当前处理线程的处理时间是否超过:max.poll.interval.ms,如果超过,就会发起LeaveGroup请求,导致rebalance

max.poll.records:每次消费的最大拉取消息数,确定一批处理的时间不会超过max.poll.interval.ms

auto.commit.interval.ms

auto.commit.interval.ms的默认值是 5000,单位是毫秒,这个参数启用前提是enable.auto.commit=true。kafka consumer是在每次poll()之前去判断:是否已经到了要commint的时间( commitTime <= now()),如果已经达到提交时间,就会提交offset,同时计算好下一次要commit的时间戳,所以他实际不是严格的每5秒提交一次,他的提交时间间隔是大于等于5秒,因为当你一次poll()请求处理超过5秒时,他也是在下一次poll()时才去提交offset(不是定时任务提交offset)。这个参数,根据数据的处理的频率进行设置,在没commit offset之前如果consumer挂掉,发生rebalance,他所处理的数据就会重新被其它consumer重复处理。所以也不宜设置过大,导致有大量数据实际已经被处理,但是未提交offset,导致产生“数据积压”假象。

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/760645.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号