栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Kafka核心组件之Controller源码解析(6)

Kafka核心组件之Controller源码解析(6)

Controller核心源码解析

下文基于Kafka 2.2对Controller进行分析,涉及内容较多,从Controller启动流程、选举流程、初始化工作、以及Controller的主要工作原理都有详细说明,还请大家耐心浏览

1、Controller启动流程【主要看写的源码注释】

def startup() = {

zkClient.registerStateChangeHandler(new StateChangeHandler {

override val name: String = StateChangeHandlers.ControllerHandler

override def afterInitializingSession(): Unit = {

eventManager.put(RegisterBrokerAndReelect)

}

override def beforeInitializingSession(): Unit = {

val expireEvent = new Expire

eventManager.clearAndPut(expireEvent)

// Block initialization of the new session until the expiration event is being handled,

// which ensures that all pending events have been processed before creating the new session

expireEvent.waitUntilProcessingStarted()

}

})

eventManager.put(Startup)

eventManager.start()

}

2、Controller选举流程【主要看写的源码注释】

private def elect(): Unit = {

activeControllerId = zkClient.getControllerId.getOrElse(-1)

if (activeControllerId != -1) {

debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.")

return

}

try {

val (epoch, epochZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(config.brokerId)

controllerContext.epoch = epoch

controllerContext.epochZkVersion = epochZkVersion

activeControllerId = config.brokerId

info(s"${config.brokerId} successfully elected as the controller. Epoch incremented to ${controllerContext.epoch} " + s"and epoch zk version is now ${controllerContext.epochZkVersion}")

onControllerFailover()

} catch {

case e: ControllerMovedException => maybeResign()

if (activeControllerId != -1)

debug(s"Broker $activeControllerId was elected as controller instead of broker ${config.brokerId}", e)

else

warn("A controller has been elected but just resigned, this will result in another round of election", e)

case t: Throwable =>

error(s"Error while electing or becoming controller on broker ${config.brokerId}. " + s"Trigger controller movement immediately", t)

triggerControllerMove()

}

}

3、成为Controller后的初始化工作【主要看写的源码注释】

private def onControllerFailover() {

info("Registering handlers")

// before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks

val childChangeHandlers = Seq(brokerChangeHandler, topicChangeHandler, topicDeletionHandler, logDirEventNotificationHandler, isrChangeNotificationHandler)

childChangeHandlers.foreach(zkClient.registerZNodeChildChangeHandler)

val nodeChangeHandlers = Seq(preferredReplicaElectionHandler, partitionReassignmentHandler) nodeChangeHandlers.foreach(zkClient.registerZNodeChangeHandlerAndCheckExistence)

info("Deleting log dir event notifications")

zkClient.deleteLogDirEventNotifications(controllerContext.epochZkVersion)

info("Deleting isr change notifications")

zkClient.deleteIsrChangeNotifications(controllerContext.epochZkVersion)

info("Initializing controller context")

initializeControllerContext()

info("Fetching topic deletions in progress")

val (topicsToBeDeleted, topicsIneligibleForDeletion) = fetchTopicDeletionsInProgress()

info("Initializing topic deletion manager")

topicDeletionManager.init(topicsToBeDeleted, topicsIneligibleForDeletion)

info("Sending update metadata request")

sendUpdatemetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty)

replicaStateMachine.startup()

partitionStateMachine.startup()

info(s"Ready to serve as the new controller with epoch $epoch")

maybeTriggerPartitionReassignment(controllerContext.partitionsBeingReassigned.keySet)

topicDeletionManager.tryTopicDeletion()

val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections()

onPreferredReplicaElection(pendingPreferredReplicaElections, ZkTriggered)

info("Starting the controller scheduler")

kafkaScheduler.startup()

if (config.autoLeaderRebalanceEnable) {

scheduleAutoLeaderRebalanceTask(delay = 5, unit = TimeUnit.SECONDS)

}

if (config.tokenAuthEnabled) {

info("starting the token expiry check scheduler")

tokenCleanScheduler.startup()

tokenCleanScheduler.schedule(name = "delete-expired-tokens", fun = () => tokenManager.expireTokens, period = config.delegationTokenExpiryCheckIntervalMs, unit = TimeUnit.MILLISECONDS)

}

}

4、从KafkaController类看Controller的主要工作【主要看写的源码注释】

class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Time, metrics: Metrics, initialBrokerInfo: BrokerInfo, initialBrokerEpoch: Long, tokenManager: DelegationTokenManager, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {

this.logIdent = s"[Controller id=${config.brokerId}] "

@volatile private var brokerInfo = initialBrokerInfo

@volatile private var _brokerEpoch = initialBrokerEpoch

private val stateChangeLogger = new StateChangeLogger(config.brokerId, inControllerContext = true, None)

val controllerContext = new ControllerContext

// have a separate scheduler for the controller to be able to start and stop independently of the kafka server

// visible for testing

private[controller] val kafkaScheduler = new KafkaScheduler(1)

// visible for testing ,

private[controller] val eventManager = new ControllerEventManager(config.brokerId, controllerContext.stats.rateAndTimeMetrics, _ => updateMetrics(), () => maybeResign())

val topicDeletionManager = new TopicDeletionManager(this, eventManager, zkClient)

private val brokerRequestBatch = new ControllerBrokerRequestBatch(this, stateChangeLogger)

val replicaStateMachine = new ReplicaStateMachine(config, stateChangeLogger, controllerContext, topicDeletionManager, zkClient, mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger))

val partitionStateMachine = new PartitionStateMachine(config, stateChangeLogger, controllerContext, zkClient, mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger)) partitionStateMachine.setTopicDeletionManager(topicDeletionManager)

private val controllerChangeHandler = new ControllerChangeHandler(this, eventManager)

private val brokerChangeHandler = new BrokerChangeHandler(this, eventManager)

private val brokerModificationsHandlers: mutable.Map[Int, BrokerModificationsHandler] = mutable.Map.empty

private val topicChangeHandler = new TopicChangeHandler(this, eventManager)

private val topicDeletionHandler = new TopicDeletionHandler(this, eventManager)

private val partitionModificationsHandlers: mutable.Map[String, PartitionModificationsHandler] = mutable.Map.empty

private val partitionReassignmentHandler = new PartitionReassignmentHandler(this, eventManager)

private val preferredReplicaElectionHandler = new PreferredReplicaElectionHandler(this, eventManager)

private val isrChangeNotificationHandler = new IsrChangeNotificationHandler(this, eventManager)

private val logDirEventNotificationHandler = new LogDirEventNotificationHandler(this, eventManager)

5、其他源码部分

Controller还有几个重要部分的源码:

Controller 发送模型NetWorkControllerChannelManagerController-Partition状态机Controller-Replica状态机Controller-分区副本重分配(PartitionReassignment)与Preferred leader副本选举Controller-Broker的上线与下线Controller-LeaderAndIsr请求Topic 的新建/扩容/删除

由于代码和注释比较多,在此略过。

以上内容均转载自云加社区 ,作者袁吉,喜欢的同学可以去关注下,谢谢

国内最大最权威的 Kafka中文社区 ,在这里你可以结交各大互联网Kafka大佬以及近2000+Kafka爱好者,一起实现知识共享,实时掌控最新行业资讯,免费加入中~

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/701754.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号