导语 | Controller作为Apache Kafka的核心组件,本文将从背景、原理以及源码与监控等方面来深入剖析Kafka Controller,希望带领大家去了解Controller在整个Kafka集群中的作用。
一、背景
Controller,是Apache Kafka的核心组件非常重要。它的主要作用是在Apache Zookeeper的帮助下管理和协调控制整个Kafka集群。
在整个Kafka集群中,如果Controller故障异常,有可能会影响到生产和消费。所以,我们需要对其状态、选举、日志等做全面的监控。
二、Controller是什么
Controller,是Apache Kafka的核心组件。它的主要作用是在Apache Zookeeper的帮助下管理和协调控制整个Kafka集群。
集群中的任意一台Broker都能充当Controller的角色,但是,在整个集群运行过程中,只能有一个Broker成为Controller。也就是说,每个正常运行的Kafka集群,在任何时刻都有且只有一个Controller。
三、Controller保存的数据
其中比较重要的数据有:
所有主题信息。包括具体的分区信息,比如领导者副本是谁,ISR集合中有哪些副本等。
所有Broker信息。包括当前都有哪些运行中的Broker,哪些正在关闭中的Broker等。
所有涉及运维任务的分区。包括当前正在进行Preferred领导者选举以及分区重分配的分区列表。
这些数据其实在ZooKeeper中也保存了一份。每当控制器初始化时,它都会从ZooKeeper上读取对应的元数据并填充到自己的缓存中。
而Broker上元数据的更新都是由Controller通知完成的,Broker并不从Zookeeper获取元数据信息。
四、Controller职责
Controller职责大致分为5种:
主题管理,分区重分配,Preferred leader选举,集群成员管理(Broker上下线),数据服务(向其他Broker提供数据服务) 。
它们分别是:
UpdatemetadataRequest:更新元数据请求。topic分区状态经常会发生变更(比如leader重新选举了或副本集合变化了等)。由于当前clients只能与分区的leader Broker进行交互,那么一旦发生变更,controller会将最新的元数据广播给所有存活的Broker。具体方式就是给所有Broker发送UpdatemetadataRequest请求。
CreateTopics: 创建topic请求。当前不管是通过API方式、脚本方式抑或是CreateTopics请求方式来创建topic,做法几乎都是在Zookeeper的/brokers/topics下创建znode来触发创建逻辑,而controller会监听该path下的变更来执行真正的“创建topic”逻辑。
DeleteTopics:删除topic请求。和CreateTopics类似,也是通过创建Zookeeper下的/admin/delete_topics/
节点来触发删除topic,controller执行真正的逻辑。 分区重分配:即kafka-reassign-partitions脚本做的事情。同样是与Zookeeper结合使用,脚本写入/admin/reassign_partitions节点来触 发,controller负责按照方案分配分区。
Preferred leader分配:preferred leader选举当前有两种触发方式:自动触发(auto.leader.rebalance.enable=true)和kafka-preferred-replica-election脚本触发。两者“玩法”相同,向Zookeeper的/admin/preferred_replica_election写数据,controller提取数据执行preferred leader分配。
分区扩展:即增加topic分区数。标准做法也是通过kafka-reassign-partitions脚本完成,不过用户可直接往Zookeeper中写数据来实现,比如直接把新增分区的副本集合写入到/brokers/topics/
下,然后controller会为你自动地选出leader并增加分区。 集群扩展:新增broker时Zookeeper中/brokers/ids下会新增znode,controller自动完成服务发现的工作。
broker崩溃:同样地,controller通过Zookeeper可实时侦测broker状态。一旦有broker挂掉了,controller可立即感知并为受影响分区选举新的leader。
ControlledShutdown:broker除了崩溃,还能“优雅”地退出。broker一旦自行终止,controller会接收到一个 ControlledShudownRequest请求,然后controller会妥善处理该请求并执行各种收尾工作。
Controller leader选举:controller必然要提供自己的leader选举以防这个全局唯一的组件崩溃宕机导致服务中断。这个功能也是通过 Zookeeper的帮助实现的。
源码位置可以看后面段落9源码的说明。
五、Broker如何成为Controller
和解决可能的脑裂问题
(一)Broker如何成为Controller最先在Zookeeper上创建临时节点/controller成功的Broker就是Controller。
源码路径(Kafka2.2):
Kafka#main->KafkaServerStartable#startup()->KafkaServer#startup()->KafkaController#startup()->eventManager.put(Startup)->elect()-> zkClient.registerControllerAndIncrementControllerEpoch
Controller重度依赖Zookeeper,依赖zookeepr保存元数据,依赖zookeeper进行服务发现。Controller大量使用Watch功能实现对集群的协调管理。
当broker节点因故障离开Kafka集群时,broker中存在的leader分区将不可用(因为客户端只对leader分区进行读写)。
为了最大限度地减少停机时间,需要快速找到替代的领导分区。Controller可以从zookeeper watch获取通知信息。Zookeeper给了客户端监听znode变化的能力,也就是所谓的watch通知功能。一旦znode节点创建、删除、子节点数量发生变化,或者znode中存储的数据本身发生变化,Zookeeper会通过节点变化处理程序显式通知客户端。
当Broker宕机或主动关闭时,Broker与Zookeeper的会话结束,znode会被自动删除。同样的,Zookeeper的watch机制把这个变化推送给Controller,让Controller知道有Broker down或者up,这样Controller就可以进行后续的协调操作。
Controller将收到通知并对其采取行动,以确定Broker上的哪些分区将成为Leader partition。然后,它会通知每个相关的Broker,或者Broker上的topic partition将成为leader partition,或者LeaderAndIsrRequest从新的leader分区复制数据。
如果Controller所在的Broker故障,Kafka集群必须有新的Controller,否则集群将无法正常工作。这儿存在一个问题。很难确定Broker是宕机还是只是暂时的故障。但是,为了使集群正常运行,必须选择新的Controller。如果之前更换的Controller又正常了,不知道自己已经更换了,那么集群中就会出现两个Controller。
其实这种情况是很容易发生的。例如,由于垃圾回收(GC),一个Controller被认为是死的,并选择了一个新的控制器。在GC的情况下,在原Controller眼里没有任何变化,Broker甚至不知道自己已经被暂停了。因此,它将继续充当当前Controller,这在分布式系统中很常见,称为裂脑。
现在,集群中有两个Controller,可能会一起发出相互冲突的事件,这会导致脑裂。可能会导致严重的不一致。所以需要一种方法来区分谁是集群的最新Controller。
Kafka是通过使用epoch number来处理,epoch number只是一个单调递增的数。第一次选择控制器时,epoch number值为1。如果再次选择新控制器,epoch number为2,依次单调递增。
每个新选择的Controller通过zookeeper的条件递增操作获得一个新的更大的epoch number。当其他Broker知道当前的epoch number时,如果他们从Controller收到包含旧(较小)epoch number的消息,则它们将被忽略。即Broker根据最大的epoch number来区分最新的Controller。
epoch number记录在Zookeepr的一个永久节点controller_epoch。
上图中,Broker3向Broker1下发命令:将Broker1上的partitionA做为leader,消息的epoch number值为1,同时Broker2也向Broker1发送同样的命令。不同的是,消息的epoch number值为2,此时broker1只监听broker2的命令(由于其epoch号大),而会忽略broker3的命令,以免发生脑裂。
六、Controller在版本上的改进
在Kafka2.2之前
网络处理模型:Kafka Server在启动时会初始化SocketServer、KafkaApis和KafkaRequestHandlerPool对象,这也是Server网络处理模型的主要组成部分。Kafka Server的网络处理模型也是基于Java NIO机制实现的,实现模式与Reactor模式类似。
如上图,所有请求共享一个requestQueue队列。
问题:当前Broker对入站请求类型不做任何优先级处理。
不论是PRODUCE请求、FETCH请求还是Controller类的请求。对Controller发送的消息非常不公平,因为这个类请求应该优先级更高。
这就可能造成一个问题:即clients发送的数据类请求积压导致controller推迟了管理类请求的处理。设想这样的场景。假设controller向broker广播了leader发生变更。于是新leader开始接收clients端请求,而同时老leader所在的broker由于出现了数据类请求的积压使得它一直忙于处理这些请求而无法处理controller发来的LeaderAndIsrRequest请求,因此这是就会出现“双主”的情况——也就是所谓的脑裂。
在Kafka 2.2
将控制器发送的请求与普通数据类请求分开处理,源码SocketServer.scala#startup()->KafkaServer.scala。
在0.11版本上也做了大的改进,会在后面段落8中说明。
七、Controller的监控
在整个集群运行过程中,只能有一个Broker成为Controller。所以要监控Controller的数量以及Controller的变更史。
可以用Kafka的JMXTool,进行轻量级的监控。
${KAFKA_PATH}/bin/kafka-run-class.sh kafka.tools.JmxTool --jmx-url service:jmx:rmi:///jndi/rmi://"${BrokerIP}":"${JMXPort}"/jmxrmi --object-name kafka.controller:type=KafkaController,name=ActiveControllerCount --date-format "YYYY-MM-dd_HH:mm" --reporting-interval -1 | grep -v type
记录Controller变更历:
function inter_controller_history()
{
#第一次检测集群Controller
if [ ! -f "${clusterID}"_controller_history ]; then
awk '/,1$/ {print $0}' "${clusterID}"_controller >> "${clusterID}"_controller_history
#记录Controller变更历史
else
nowController=$(awk '/,1$/ {print $0}' "${clusterID}"_controller | awk -F ',' '{print $1}')
LastTimeController=$(tail -n 1 "${clusterID}"_controller_history | awk '/,1$/ {print $0}' | awk -F ',' '{print $1}')
if [ "${nowController}_X" != "${LastTimeController}_X" ];then
awk '/,1$/ {print $0}' "${clusterID}"_controller >> "${clusterID}"_controller_history
msg="${msg_tmp} clusterID:${clusterID} ${ClusterNameCN} Controller From ${LastTimeController} to ${nowController}"
echo "$msg" >> $log_file_name
send_warning
fi
fi
}
监控效果:
通过JMXTool,还可以拉取Kafka的其他指标进行监控。
例如:
under_replicated_partitions有非同步副本监控。
OfflinePartitionsCount分区丢失leader监控。
${KAFKA_PATH}/bin/kafka-run-class.sh kafka.tools.JmxTool --jmx-url service:jmx:rmi:///jndi/rmi://${BrokerIP}:"${JMXPort}"/jmxrmi --object-name kafka.controller:type=KafkaController,name=OfflinePartitionsCount --date-format "YYYY-MM-dd_HH:mm" --reporting-interval -1
ZooKeeper_SessionState Broker与Zookeeper断开连接监控。
MessagesInPerSec,进入Broker消费数量监控。
${KAFKA_PATH}/bin/kafka-run-class.sh kafka.tools.JmxTool --jmx-url service:jmx:rmi:///jndi/rmi://"${BrokerIP}":"${JMXPort}"/jmxrmi --object-name kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec --date-format "YYYY-MM-dd HH:mm" --attributes Count --reporting-interval -1
ISR扩缩容率等。
监控可以有很多方式,这样做主要是简单方便,不需要依赖太多监控系统,同时监控程序可以快速部署到海外或者合作伙伴机房。
八、关于Controller的架构改进
Kafka中的一台Broker充当Controller的角色,此台Broker不仅对生产者消费者提供服务,还要协调整个集群的管理工作。如果使用0.11版本之前的Kafka而且分区很多时,建议将几台机器配置为只能成为Controller(当然这里需要修改源码,编译)。
0.11版本之前
同步操作Zookeeper使用同步的API,性能差。当Broker宕机,大量主题分区发生变更时,自动恢复时间长。Controller是一个分区一个分区进行写入的,对于分区数很多的集群来说,这无疑是个巨大的性能瓶颈。
0.11 版本
异步操作Zookeeper使用async API,写入提升了10倍。
如果机器性能较好,可以将Zookeeper和Controller部署在相同的机器。Kafka对Zookeeper写请求比较少。
注意:消费方式有基于Zookeeper消费和基于Broker消息。基于Zookeeper消费,就是将消费位移提交到Zookeeper上,这种方式对Zookeeper有大量写操作。不要将Zookeeper和其他机器共用。
Zookeeper官网上有对读写占比的压测说明:
九、Controller的源码
源码(基于kafka 2.2)的内容较多:
(一)Controller启动流程【主要看写的源码注释】
def startup() = {
zkClient.registerStateChangeHandler(new StateChangeHandler {
override val name: String = StateChangeHandlers.ControllerHandler
override def afterInitializingSession(): Unit = {
eventManager.put(RegisterBrokerAndReelect)
}
override def beforeInitializingSession(): Unit = {
val expireEvent = new Expire
eventManager.clearAndPut(expireEvent)
// Block initialization of the new session until the expiration event is being handled,
// which ensures that all pending events have been processed before creating the new session
expireEvent.waitUntilProcessingStarted()
}
})
eventManager.put(Startup)
eventManager.start()
}
(二)Controller选举流程【主要看写的源码注释】
private def elect(): Unit = {
activeControllerId = zkClient.getControllerId.getOrElse(-1)
if (activeControllerId != -1) {
debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.")
return
}
try {
val (epoch, epochZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(config.brokerId)
controllerContext.epoch = epoch
controllerContext.epochZkVersion = epochZkVersion
activeControllerId = config.brokerId
info(s"${config.brokerId} successfully elected as the controller. Epoch incremented to ${controllerContext.epoch} " +
s"and epoch zk version is now ${controllerContext.epochZkVersion}")
onControllerFailover()
} catch {
case e: ControllerMovedException =>
maybeResign()
if (activeControllerId != -1)
debug(s"Broker $activeControllerId was elected as controller instead of broker ${config.brokerId}", e)
else
warn("A controller has been elected but just resigned, this will result in another round of election", e)
case t: Throwable =>
error(s"Error while electing or becoming controller on broker ${config.brokerId}. " +
s"Trigger controller movement immediately", t)
triggerControllerMove()
}
}
(三)成为Controller后的初始化工作【主要看写的源码注释】
private def onControllerFailover() {
info("Registering handlers")
// before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
val childChangeHandlers = Seq(brokerChangeHandler, topicChangeHandler, topicDeletionHandler, logDirEventNotificationHandler,
isrChangeNotificationHandler)
childChangeHandlers.foreach(zkClient.registerZNodeChildChangeHandler)
val nodeChangeHandlers = Seq(preferredReplicaElectionHandler, partitionReassignmentHandler)
nodeChangeHandlers.foreach(zkClient.registerZNodeChangeHandlerAndCheckExistence)
info("Deleting log dir event notifications")
zkClient.deleteLogDirEventNotifications(controllerContext.epochZkVersion)
info("Deleting isr change notifications")
zkClient.deleteIsrChangeNotifications(controllerContext.epochZkVersion)
info("Initializing controller context")
initializeControllerContext()
info("Fetching topic deletions in progress")
val (topicsToBeDeleted, topicsIneligibleForDeletion) = fetchTopicDeletionsInProgress()
info("Initializing topic deletion manager")
topicDeletionManager.init(topicsToBeDeleted, topicsIneligibleForDeletion)
info("Sending update metadata request")
sendUpdatemetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty)
replicaStateMachine.startup()
partitionStateMachine.startup()
info(s"Ready to serve as the new controller with epoch $epoch")
maybeTriggerPartitionReassignment(controllerContext.partitionsBeingReassigned.keySet)
topicDeletionManager.tryTopicDeletion()
val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections()
onPreferredReplicaElection(pendingPreferredReplicaElections, ZkTriggered)
info("Starting the controller scheduler")
kafkaScheduler.startup()
if (config.autoLeaderRebalanceEnable) {
scheduleAutoLeaderRebalanceTask(delay = 5, unit = TimeUnit.SECONDS)
}
if (config.tokenAuthEnabled) {
info("starting the token expiry check scheduler")
tokenCleanScheduler.startup()
tokenCleanScheduler.schedule(name = "delete-expired-tokens",
fun = () => tokenManager.expireTokens,
period = config.delegationTokenExpiryCheckIntervalMs,
unit = TimeUnit.MILLISECONDS)
}
}
(四)从KafkaController类看Controller的主要工作【主要看写的源码注释】
class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Time, metrics: Metrics,
initialBrokerInfo: BrokerInfo, initialBrokerEpoch: Long, tokenManager: DelegationTokenManager,
threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
this.logIdent = s"[Controller id=${config.brokerId}] "
@volatile private var brokerInfo = initialBrokerInfo
@volatile private var _brokerEpoch = initialBrokerEpoch
private val stateChangeLogger = new StateChangeLogger(config.brokerId, inControllerContext = true, None)
val controllerContext = new ControllerContext
// have a separate scheduler for the controller to be able to start and stop independently of the kafka server
// visible for testing
private[controller] val kafkaScheduler = new KafkaScheduler(1)
// visible for testing ,
private[controller] val eventManager = new ControllerEventManager(config.brokerId,
controllerContext.stats.rateAndTimeMetrics, _ => updateMetrics(), () => maybeResign())
val topicDeletionManager = new TopicDeletionManager(this, eventManager, zkClient)
private val brokerRequestBatch = new ControllerBrokerRequestBatch(this, stateChangeLogger)
val replicaStateMachine = new ReplicaStateMachine(config, stateChangeLogger, controllerContext, topicDeletionManager, zkClient, mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger))
ff
val partitionStateMachine = new PartitionStateMachine(config, stateChangeLogger, controllerContext, zkClient, mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger))
partitionStateMachine.setTopicDeletionManager(topicDeletionManager)
private val controllerChangeHandler = new ControllerChangeHandler(this, eventManager)
private val brokerChangeHandler = new BrokerChangeHandler(this, eventManager)
private val brokerModificationsHandlers: mutable.Map[Int, BrokerModificationsHandler] = mutable.Map.empty
private val topicChangeHandler = new TopicChangeHandler(this, eventManager)
private val topicDeletionHandler = new TopicDeletionHandler(this, eventManager)
private val partitionModificationsHandlers: mutable.Map[String, PartitionModificationsHandler] = mutable.Map.empty
private val partitionReassignmentHandler = new PartitionReassignmentHandler(this, eventManager)
private val preferredReplicaElectionHandler = new PreferredReplicaElectionHandler(this, eventManager)
private val isrChangeNotificationHandler = new IsrChangeNotificationHandler(this, eventManager)
private val logDirEventNotificationHandler = new LogDirEventNotificationHandler(this, eventManager)
(五)其他源码部分
Controller还有几个重要部分的源码:
Controller 发送模型NetWork
ControllerChannelManager
Controller-Partition状态机
Controller-Replica状态机
Controller-分区副本重分配(PartitionReassignment)与Preferred leader副本选举
Controller-Broker的上线与下线
Controller-LeaderAndIsr请求
Topic 的新建/扩容/删除
由于代码和注释比较多,在此略过。
参考资料:
1.Kafka运维填坑
2.Matt's Blog
3.What is Kafka’s controller broker
4.ZooKeeper:A Distributed Coordination Service for Distributed Applications
作者简介
袁吉
腾讯运营规划工程师
腾讯运营规划工程师,目前负责腾讯游戏万亿级实时数仓、BG数据中台的运营工作。有丰富的消息中间件,分布式大数据处理引擎的运营管理经验。
推荐阅读
LLVM极简教程:9个步骤!实现一个简单编译器
避坑指南!手把手带你解读html2canvas的实现原理
10分钟了解Flutter跨平台运行原理!
如何在C++20中实现Coroutine及相关任务调度器?(实例教学)



