栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

KafkaController内部的监听器

KafkaController内部的监听器

一、KafkaController内部的监听器

作为Kafka的重要模块,KafkaController主要通过监听zookeeper节点的变化来感知集群中元数据的变化,KafkaController内置多个zookeeper监听函数,监听的对象包括:在线的Broker Server列表,Topic列表,Partition对应的AR列表,Partition对应的ISR列表等等。KafkaController把这些信息都保存在ControllerContext中,ControllerContext内部保存的信息如下:

class ControllerContext(val zkClient: ZkClient,
                        val zkSessionTimeout: Int) {
  var controllerChannelManager: ControllerChannelManager = null
  val controllerLock: ReentrantLock = new ReentrantLock()
  var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty
  val brokerShutdownLock: Object = new Object
  var epoch: Int = KafkaController.InitialControllerEpoch - 1
  var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion - 1
  val correlationId: AtomicInteger = new AtomicInteger(0)
  var allTopics: Set[String] = Set.empty
  var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map.empty
  var partitionLeadershipInfo: mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty
  var partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = new mutable.HashMap
  var partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] = new mutable.HashSet

  private var liveBrokersUnderlying: Set[Broker] = Set.empty
  private var liveBrokerIdsUnderlying: Set[Int] = Set.empty
......
}

它们的含义如下:

变量名含义
shuttingDownBrokerIds处于关机状态的Brokers列表
liveBrokersUnderlying处于在线状态的Brokers列表,存储的是Broker对象
liveBrokerIdsUnderlying
处于在线状态的Brokers列表,存储的时Broker Id。
allTopics当前的Topic列表
partitionReplicaAssignment当前TopicAndPartition的AR列表
partitionLeadershipInfo当前TopicAndPartition的Leader和ISR列表
partitionsBeingReassigned处于TopicAndPartition重分配状态的AR列表
partitionsUndergoingPreferredReplicaElection为了负载均衡,处于正在经历首选副本选举状态的TopicAndPartition列表

下面以Topic状态监听器TopicChangeListener为例,说明zookeeper监听器在KafkaController中的运行方式。

二、TopicChangeListener工作原理
PartitionStateMachine在路径为/brokers/topics的zookeeper上注册监听器TopicChangeListener,该监听器主要监听Topic的创建,当Topic被创建时,就会触发TopicChangeListener中的处理函数。
def handleChildChange(parentPath : String, children : java.util.List[String]) {
      inLock(controllerContext.controllerLock) {
        if (hasStarted.get) {
          try {
            ///获取brokers/topics目录下所有的topic
            val currentChildren = {
              import JavaConversions._
              (children: Buffer[String]).toSet
            }
             
            val newTopics = currentChildren -- controllerContext.allTopics
            
            val deletedTopics = controllerContext.allTopics -- currentChildren
            //更新所有的topic
            controllerContext.allTopics = currentChildren
            //获取新增topic的AR列表
            val addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, newTopics.toSeq)
              //获取已经删除的Topic的AR列表
            controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
              !deletedTopics.contains(p._1.topic))
            //将新增的Topic的AR列表更新至partitionReplicaAssignment
            controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
           
            if(newTopics.size > 0)
              //开始创建新增Topic的分区。
              controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSet)
          } catch {
            ......
          }
        }
      }
    }

如果发现了新增的topic,就要进入创建topic的流程,流程如下:

def onNewTopicCreation(topics: Set[String], newPartitions: Set[TopicAndPartition]) {
    
    // 注册 partition 变化的监听器
    topics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
   //创建分区
    onNewPartitionCreation(newPartitions)
  }

可见,topic的创建主要说注册partition变化的监听器,然后再创建分区,如果创建分区成功,topic就创建成功了。创建分区的流程如下:

def onNewPartitionCreation(newPartitions: Set[TopicAndPartition]) {
    //设置分区状态为NewPartition,主要进行初始化分区的状态
    partitionStateMachine.handleStateChanges(newPartitions, NewPartition)
     //设置分区副本的状态为NewReplica,主要进行初始化副本的状态   replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica)
    //将分区的状态由NewPartition转换为OnlinePartition。
    partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector)
   //将副本状态由 newPartitions转换为onlineReplica
replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica)
  }

经过上面的流程,topic就创建成功了。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/752782.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号