栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

GroupCoordinator 源码之HEARTBEAT

GroupCoordinator 源码之HEARTBEAT

GroupCoordinator 源码之HEARTBEAT 前言:

​ 上一节GroupCoordinator 源码之FIND_COORDINATOR、JOIN_GROUP、SYNC_GROUP_aLivable_Dedode的博客-CSDN博客 ,介绍了从KafkaServer端GroupCoordinator 启动后,同一个Topic同一个GroupId下consumer的注册过程以及Group的状态变更。Topic会选举出来一个GroupCoordinator,之后Group会从第一次出现时被创建初始状态是Empty,然后第一个consumer加入会被选举为Leader,接着Group的状态会变为PrepareRebalance。通过延迟队列的Task,将Group的状态变更为CompletingRebalance。

​ 之后SYNC_GROUP请求会争对group下的所有consumer,下发消费策略通过回调函数返回给各个consumer。并且完成之后Group的状态会变为Stable。

HEARTBEAT

​ group下的所有consumer通过心跳机制HEARTBEAT,确定当前的消费策略是不是有效的,无效的情况下会发起rejoin请求以便回去正确的消费策略。

入口:

case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)

kafka.coordinator.group.GroupCoordinator#handleHeartbeat(groupId: String,
                      memberId: String,
                      generationId: Int,
                      responseCallback: Errors => Unit)
                      
// 会根据请求中的groupId,获取对应的Groupmetadata信息 
groupManager.getGroup(groupId)
//根据当前group的currentState 做分支判断
group.currentState match {
		// Dead 状态,The coordinator is not aware of this member
          case Dead =>
            // if the group is marked as dead, it means some other thread has just removed the group
            // from the coordinator metadata; this is likely that the group has migrated to some other
            // coordinator OR the group is in a transient unstable phase. Let the member retry
            // joining without the specified member id,
            responseCallback(Errors.UNKNOWN_MEMBER_ID)

		// Empty 状态,The coordinator is not aware of this member
          case Empty =>
            responseCallback(Errors.UNKNOWN_MEMBER_ID)

          case CompletingRebalance =>
          	// 当前group 中不含该member,不识别
            if (!group.has(memberId))
              responseCallback(Errors.UNKNOWN_MEMBER_ID)
            else
              responseCallback(Errors.REBALANCE_IN_PROGRESS)

          case PreparingRebalance =>
          // 当前group 中不含该member,不识别
            if (!group.has(memberId)) {
              responseCallback(Errors.UNKNOWN_MEMBER_ID)
            } else if (generationId != group.generationId) {
            // 当前group的分代Id不匹配,generationId过期
              responseCallback(Errors.ILLEGAL_GENERATION)
            } else {
              val member = group.get(memberId)
              completeAndScheduleNextHeartbeatExpiration(group, member)
              responseCallback(Errors.REBALANCE_IN_PROGRESS)
            }

          case Stable =>
          // 当前group 中不含该member,不识别
            if (!group.has(memberId)) {
              responseCallback(Errors.UNKNOWN_MEMBER_ID)
            } else if (generationId != group.generationId) {
            // 当前group的分代Id不匹配,generationId过期
              responseCallback(Errors.ILLEGAL_GENERATION)
            } else {
              val member = group.get(memberId)
              completeAndScheduleNextHeartbeatExpiration(group, member)
              responseCallback(Errors.NONE)
            }
        }

过程不复杂,挺简单的:

    Empty、Dead 直接是不是别当前memberId的异常,说明该groupmetadata不包含。CompletingRebalance、PreparingRebalance、Stable
      校验memberId,不识别则抛出异常校验generationId,保证consumer的generationId是有效的,这样才能保证consumer的消费策略是正确的,反正则抛出异常。尝试完成Heartbeat延迟队列中的任务,并添加下次Heartbeat的Task,如果是Stable则异常内容是NONE

上图:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-f2arX3EW-1646056968744)(GroupCoordinator 源码之HEARTBEAT.assets/1646056856360.png)]

Heartbeat的Task,如果是Stable则异常内容是NONE

上图:

这一次很简单,心跳Heartbeat的意义就是保证Group下consumer跟group的状态、年代Id能同步,这样才可保证消费策略是对的。

个人总结,错漏之处,还请多指教。
公众号:大数据下挣扎,欢迎感兴趣的同学关注。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/752494.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号