Controller核心源码解析
下文基于Kafka 2.2对Controller进行分析,涉及内容较多,从Controller启动流程、选举流程、初始化工作、以及Controller的主要工作原理都有详细说明,还请大家耐心浏览
1、Controller启动流程【主要看写的源码注释】
def startup() = {
zkClient.registerStateChangeHandler(new StateChangeHandler {
override val name: String = StateChangeHandlers.ControllerHandler
override def afterInitializingSession(): Unit = {
eventManager.put(RegisterBrokerAndReelect)
}
override def beforeInitializingSession(): Unit = {
val expireEvent = new Expire
eventManager.clearAndPut(expireEvent)
// Block initialization of the new session until the expiration event is being handled,
// which ensures that all pending events have been processed before creating the new session
expireEvent.waitUntilProcessingStarted()
}
})
eventManager.put(Startup)
eventManager.start()
}
2、Controller选举流程【主要看写的源码注释】
private def elect(): Unit = {
activeControllerId = zkClient.getControllerId.getOrElse(-1)
if (activeControllerId != -1) {
debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.")
return
}
try {
val (epoch, epochZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(config.brokerId)
controllerContext.epoch = epoch
controllerContext.epochZkVersion = epochZkVersion
activeControllerId = config.brokerId
info(s"${config.brokerId} successfully elected as the controller. Epoch incremented to ${controllerContext.epoch} " + s"and epoch zk version is now ${controllerContext.epochZkVersion}")
onControllerFailover()
} catch {
case e: ControllerMovedException => maybeResign()
if (activeControllerId != -1)
debug(s"Broker $activeControllerId was elected as controller instead of broker ${config.brokerId}", e)
else
warn("A controller has been elected but just resigned, this will result in another round of election", e)
case t: Throwable =>
error(s"Error while electing or becoming controller on broker ${config.brokerId}. " + s"Trigger controller movement immediately", t)
triggerControllerMove()
}
}
3、成为Controller后的初始化工作【主要看写的源码注释】
private def onControllerFailover() {
info("Registering handlers")
// before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
val childChangeHandlers = Seq(brokerChangeHandler, topicChangeHandler, topicDeletionHandler, logDirEventNotificationHandler, isrChangeNotificationHandler)
childChangeHandlers.foreach(zkClient.registerZNodeChildChangeHandler)
val nodeChangeHandlers = Seq(preferredReplicaElectionHandler, partitionReassignmentHandler) nodeChangeHandlers.foreach(zkClient.registerZNodeChangeHandlerAndCheckExistence)
info("Deleting log dir event notifications")
zkClient.deleteLogDirEventNotifications(controllerContext.epochZkVersion)
info("Deleting isr change notifications")
zkClient.deleteIsrChangeNotifications(controllerContext.epochZkVersion)
info("Initializing controller context")
initializeControllerContext()
info("Fetching topic deletions in progress")
val (topicsToBeDeleted, topicsIneligibleForDeletion) = fetchTopicDeletionsInProgress()
info("Initializing topic deletion manager")
topicDeletionManager.init(topicsToBeDeleted, topicsIneligibleForDeletion)
info("Sending update metadata request")
sendUpdatemetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty)
replicaStateMachine.startup()
partitionStateMachine.startup()
info(s"Ready to serve as the new controller with epoch $epoch")
maybeTriggerPartitionReassignment(controllerContext.partitionsBeingReassigned.keySet)
topicDeletionManager.tryTopicDeletion()
val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections()
onPreferredReplicaElection(pendingPreferredReplicaElections, ZkTriggered)
info("Starting the controller scheduler")
kafkaScheduler.startup()
if (config.autoLeaderRebalanceEnable) {
scheduleAutoLeaderRebalanceTask(delay = 5, unit = TimeUnit.SECONDS)
}
if (config.tokenAuthEnabled) {
info("starting the token expiry check scheduler")
tokenCleanScheduler.startup()
tokenCleanScheduler.schedule(name = "delete-expired-tokens", fun = () => tokenManager.expireTokens, period = config.delegationTokenExpiryCheckIntervalMs, unit = TimeUnit.MILLISECONDS)
}
}
4、从KafkaController类看Controller的主要工作【主要看写的源码注释】
class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Time, metrics: Metrics, initialBrokerInfo: BrokerInfo, initialBrokerEpoch: Long, tokenManager: DelegationTokenManager, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
this.logIdent = s"[Controller id=${config.brokerId}] "
@volatile private var brokerInfo = initialBrokerInfo
@volatile private var _brokerEpoch = initialBrokerEpoch
private val stateChangeLogger = new StateChangeLogger(config.brokerId, inControllerContext = true, None)
val controllerContext = new ControllerContext
// have a separate scheduler for the controller to be able to start and stop independently of the kafka server
// visible for testing
private[controller] val kafkaScheduler = new KafkaScheduler(1)
// visible for testing ,
private[controller] val eventManager = new ControllerEventManager(config.brokerId, controllerContext.stats.rateAndTimeMetrics, _ => updateMetrics(), () => maybeResign())
val topicDeletionManager = new TopicDeletionManager(this, eventManager, zkClient)
private val brokerRequestBatch = new ControllerBrokerRequestBatch(this, stateChangeLogger)
val replicaStateMachine = new ReplicaStateMachine(config, stateChangeLogger, controllerContext, topicDeletionManager, zkClient, mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger))
val partitionStateMachine = new PartitionStateMachine(config, stateChangeLogger, controllerContext, zkClient, mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger)) partitionStateMachine.setTopicDeletionManager(topicDeletionManager)
private val controllerChangeHandler = new ControllerChangeHandler(this, eventManager)
private val brokerChangeHandler = new BrokerChangeHandler(this, eventManager)
private val brokerModificationsHandlers: mutable.Map[Int, BrokerModificationsHandler] = mutable.Map.empty
private val topicChangeHandler = new TopicChangeHandler(this, eventManager)
private val topicDeletionHandler = new TopicDeletionHandler(this, eventManager)
private val partitionModificationsHandlers: mutable.Map[String, PartitionModificationsHandler] = mutable.Map.empty
private val partitionReassignmentHandler = new PartitionReassignmentHandler(this, eventManager)
private val preferredReplicaElectionHandler = new PreferredReplicaElectionHandler(this, eventManager)
private val isrChangeNotificationHandler = new IsrChangeNotificationHandler(this, eventManager)
private val logDirEventNotificationHandler = new LogDirEventNotificationHandler(this, eventManager)
5、其他源码部分
Controller还有几个重要部分的源码:
Controller 发送模型NetWorkControllerChannelManagerController-Partition状态机Controller-Replica状态机Controller-分区副本重分配(PartitionReassignment)与Preferred leader副本选举Controller-Broker的上线与下线Controller-LeaderAndIsr请求Topic 的新建/扩容/删除
由于代码和注释比较多,在此略过。
以上内容均转载自云加社区 ,作者袁吉,喜欢的同学可以去关注下,谢谢
国内最大最权威的 Kafka中文社区 ,在这里你可以结交各大互联网Kafka大佬以及近2000+Kafka爱好者,一起实现知识共享,实时掌控最新行业资讯,免费加入中~



