- 前言
- 源码分析
- 1. Kafka 服务端的启动流程
- 2. Kafka 服务端新建连接的处理
- 3. Kafka 服务端请求处理流程
在 Kafka 3.0 源码笔记(1)-Kafka 服务端的网络通信架构 中笔者介绍了 Kafka 3.0 版本的组件构成,其实由此也可以将本文内容分为三个部分,主要时序如下图所示:
源码分析 1. Kafka 服务端的启动流程
- Kafka 服务端的启动流程
- Kafka 服务端新建连接的处理
- Kafka 服务端请求处理流程
-
Kafka 服务端的启动由 Kafka.scala#main() 方法为入口,可以看到主要步骤如下:
- 调用 Kafka.scala#getPropsFromArgs() 方法将启动参数中指定的配置文件加载到内存中
- 调用 Kafka.scala#buildServer() 方法创建 kafka 的服务端实例对象
- 调用创建的服务端实例对象的接口方法 Server.scala#startup() 方法启动服务端
def main(args: Array[String]): Unit = { try { val serverProps = getPropsFromArgs(args) val server = buildServer(serverProps) try { if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk) new LoggingSignalHandler().register() } catch { case e: ReflectiveOperationException => warn("Failed to register optional signal handler that logs a message when the process is terminated " + s"by a signal. Reason for registration failure is: $e", e) } // attach shutdown handler to catch terminating signals as well as normal termination Exit.addShutdownHook("kafka-shutdown-hook", { try server.shutdown() catch { case _: Throwable => fatal("Halting Kafka.") // Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit. Exit.halt(1) } }) try server.startup() catch { case _: Throwable => // KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code fatal("Exiting Kafka.") Exit.exit(1) } server.awaitShutdown() } catch { case e: Throwable => fatal("Exiting Kafka due to fatal exception", e) Exit.exit(1) } Exit.exit(0) } -
Kafka.scala#getPropsFromArgs() 方法的核心是调用 Utils#loadProps() 加载指定的配置文件,这部分逻辑比较简单,不做深入
def getPropsFromArgs(args: Array[String]): Properties = { val optionParser = new OptionParser(false) val overrideOpt = optionParser.accepts("override", "Optional property that should override values set in server.properties file") .withRequiredArg() .ofType(classOf[String]) // This is just to make the parameter show up in the help output, we are not actually using this due the // fact that this class ignores the first parameter which is interpreted as positional and mandatory // but would not be mandatory if --version is specified // This is a bit of an ugly crutch till we get a chance to rework the entire command line parsing optionParser.accepts("version", "Print version information and exit.") if (args.length == 0 || args.contains("--help")) { CommandLineUtils.printUsageAndDie(optionParser, "USAGE: java [options] %s server.properties [--override property=value]*".format(this.getClass.getCanonicalName.split('$').head)) } if (args.contains("--version")) { CommandLineUtils.printVersionAndDie() } val props = Utils.loadProps(args(0)) if (args.length > 1) { val options = optionParser.parse(args.slice(1, args.length): _*) if (options.nonOptionArguments().size() > 0) { CommandLineUtils.printUsageAndDie(optionParser, "Found non argument parameters: " + options.nonOptionArguments().toArray.mkString(",")) } props ++= CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt).asScala) } props } -
Kafka.scala#buildServer()方法创建服务端实例对象是非常关键的一步,需要注意的点如下:
- 通过 KafkaConfig.scala#fromProps() 方法将加载到内存中的配置转化构建为 KafkaConfig 对象
- 调用 KafkaConfig.scala#requiresZookeeper() 方法确定 Kafa 服务端的启动模式。此处主要是通过 process.roles 配置的存在与否来判断,如果这个配置存在则以移除 zk 依赖的 KRaft模式启动,否则以依赖 zk 的旧模式启动
- 本文基于 Kafka 3.0 版本,此版本的 KRaft 支持已经比较稳定,故以 KRaft 模式为例进行分析,此处将创建 KafkaRaftServer 对象
private def buildServer(props: Properties): Server = { val config = KafkaConfig.fromProps(props, false) if (config.requiresZookeeper) { new KafkaServer( config, Time.SYSTEM, threadNamePrefix = None, enableForwarding = false ) } else { new KafkaRaftServer( config, Time.SYSTEM, threadNamePrefix = None ) } } -
Scala 的语法与 Java 有不少差异,比如 Scala 中构造函数是直接与类声明相关联的,另外KafkaRaftServer对象的创建动作会触发执行不少关键成员对象的创建,与本文直接相关的如下:
- broker:BrokerServer 对象,当节点的配置 process.roles 中指定了 broker 角色时才会创建,处理消息数据类请求,例如消息的生产消费等
- controller: ControllerServer 对象,当节点的配置 process.roles 中指定了 broker 角色时才会创建,处理元数据类请求,包括 topic 创建删除等
class KafkaRaftServer( config: KafkaConfig, time: Time, threadNamePrefix: Option[String] ) extends Server with Logging { KafkaMetricsReporter.startReporters(VerifiableProperties(config.originals)) KafkaYammerMetrics.INSTANCE.configure(config.originals) private val (metaProps, offlineDirs) = KafkaRaftServer.initializeLogDirs(config) private val metrics = Server.initializeMetrics( config, time, metaProps.clusterId ) private val controllerQuorumVotersFuture = CompletableFuture.completedFuture( RaftConfig.parseVoterConnections(config.quorumVoters)) private val raftManager = new KafkaRaftManager[ApiMessageAndVersion]( metaProps, config, new metadataRecordSerde, KafkaRaftServer.metadataPartition, KafkaRaftServer.metadataTopicId, time, metrics, threadNamePrefix, controllerQuorumVotersFuture ) private val broker: Option[BrokerServer] = if (config.processRoles.contains(BrokerRole)) { Some(new BrokerServer( config, metaProps, raftManager, time, metrics, threadNamePrefix, offlineDirs, controllerQuorumVotersFuture, Server.SUPPORTED_FEATURES )) } else { None } private val controller: Option[ControllerServer] = if (config.processRoles.contains(ControllerRole)) { Some(new ControllerServer( metaProps, config, raftManager, time, metrics, threadNamePrefix, controllerQuorumVotersFuture )) } else { None } ... } -
经过以上步骤,Kafka 服务端的 Server 对象创建完毕,最终创建了一个 KafkaRaftServer 对象,则在本节步骤1第三步将KafkaRaftServer.scala#startup() 方法启动服务端,可以看到和本文相关的重点如下:
- controller.foreach(_.startup()) 启动节点上可能存在的 ControllerServer,调用其 ControllerServer.scala#startup() 方法
- broker.foreach(_.startup()) 启动节点上可能存在的 BrokerServer,调用其BrokerServer.scala#startup() 方法
本文将以 BrokerServer 的启动为例进行分析,其实从网络通信结构的角度来看,BrokerServer 和 ControllerServer 几乎是完全一致的
override def startup(): Unit = { Mx4jLoader.maybeLoad() raftManager.startup() controller.foreach(_.startup()) broker.foreach(_.startup()) AppInfoParser.registerAppInfo(Server.MetricsPrefix, config.brokerId.toString, metrics, time.milliseconds()) info(KafkaBroker.STARTED_MESSAGE) } -
BrokerServer.scala#startup() 方法比较长,其中涉及的关键对象如下。不过去芜存菁,和网络通信相关的重点其实只有两个,分别是SocketServer 底层网络服务器的创建及配置启动 和 KafkaRequestHandlerPool 上层请求处理器池的创建启动
- kafkaScheduler:KafkaScheduler 对象,定时任务的线程池
- metadataCache: KRaftmetadataCache 对象,集群元数据管理组件
- clientToControllerChannelManager :BrokerToControllerChannelManager 对象,broker 到 controller 的连接管理器
- forwardingManager:ForwardingManagerImpl 对象,持有 clientToControllerChannelManager 对象,负责转发应该由 controller 处理的请求
- socketServer: SocketServer 对象,面向底层网络的服务器对象
- _replicaManager: ReplicaManager 对象,副本管理器,负责消息的存储读取
- groupCoordinator: GroupCoordinator 对象,普通消费者组的协调器,负责辅助完成消费者组内各个消费者消费分区的协调分配
- dataPlaneRequestProcessor: KafkaApis 对象,上层的请求处理器,持有底层网络服务器的请求队列socketServer.dataPlaneRequestChannel,负责从队列中取出请求进行处理
- dataPlaneRequestHandlerPool : KafkaRequestHandlerPool 对象,上层的请求处理器线程池
def startup(): Unit = { if (!maybeChangeStatus(SHUTDOWN, STARTING)) return try { info("Starting broker") kafkaScheduler = new KafkaScheduler(config.backgroundThreads) kafkaScheduler.startup() _brokerTopicStats = new BrokerTopicStats quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse("")) logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size) metadataCache = metadataCache.kRaftmetadataCache(config.nodeId) // Create log manager, but don't start it because we need to delay any potential unclean shutdown log recovery // until we catch up on the metadata log and have up-to-date topic and broker configs. logManager = LogManager(config, initialOfflineDirs, metadataCache, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel, keepPartitionmetadataFile = true) // Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update. // This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically. tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames) credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache) val controllerNodes = RaftConfig.voterConnectionsToNodes(controllerQuorumVotersFuture.get()).asScala val controllerNodeProvider = RaftControllerNodeProvider(raftManager, config, controllerNodes) clientToControllerChannelManager = BrokerToControllerChannelManager( controllerNodeProvider, time, metrics, config, channelName = "forwarding", threadNamePrefix, retryTimeoutMs = 60000 ) clientToControllerChannelManager.start() forwardingManager = new ForwardingManagerImpl(clientToControllerChannelManager) val apiVersionManager = ApiVersionManager( ListenerType.BROKER, config, Some(forwardingManager), brokerFeatures, featureCache ) // Create and start the socket server acceptor threads so that the bound port is known. // Delay starting processors until the end of the initialization sequence to ensure // that credentials have been loaded before processing authentications. socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager) socketServer.startup(startProcessingRequests = false) clientQuotametadataManager = new ClientQuotametadataManager(quotaManagers, socketServer.connectionQuotas) val alterIsrChannelManager = BrokerToControllerChannelManager( controllerNodeProvider, time, metrics, config, channelName = "alterIsr", threadNamePrefix, retryTimeoutMs = Long.MaxValue ) alterIsrManager = new DefaultAlterIsrManager( controllerChannelManager = alterIsrChannelManager, scheduler = kafkaScheduler, time = time, brokerId = config.nodeId, brokerEpochSupplier = () => lifecycleManager.brokerEpoch ) alterIsrManager.start() this._replicaManager = new ReplicaManager(config, metrics, time, None, kafkaScheduler, logManager, isShuttingDown, quotaManagers, brokerTopicStats, metadataCache, logDirFailureChannel, alterIsrManager, threadNamePrefix) if (config.tokenAuthEnabled) { throw new UnsupportedOperationException("Delegation tokens are not supported") } tokenManager = new DelegationTokenManager(config, tokenCache, time , null) tokenManager.startup() // does nothing, we just need a token manager in order to compile right now... // Create group coordinator, but don't start it until we've started replica manager. // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue groupCoordinator = GroupCoordinator(config, replicaManager, Time.SYSTEM, metrics) val producerIdManagerSupplier = () => ProducerIdManager.rpc( config.brokerId, brokerEpochSupplier = () => lifecycleManager.brokerEpoch, clientToControllerChannelManager, config.requestTimeoutMs ) // Create transaction coordinator, but don't start it until we've started replica manager. // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), producerIdManagerSupplier, metrics, metadataCache, Time.SYSTEM) autoTopicCreationManager = new DefaultAutoTopicCreationManager( config, Some(clientToControllerChannelManager), None, None, groupCoordinator, transactionCoordinator) config.dynamicConfig.addReconfigurables(this) dynamicConfigHandlers = Map[String, ConfigHandler]( ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers, None), ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers)) if (!config.processRoles.contains(ControllerRole)) { // If no controller is defined, we rely on the broker to generate snapshots. metadataSnapshotter = Some(new BrokermetadataSnapshotter( config.nodeId, time, threadNamePrefix, new BrokerSnapshotWriterBuilder(raftManager.client) )) } metadataListener = new BrokermetadataListener(config.nodeId, time, threadNamePrefix, config.metadataSnapshotMaxNewRecordBytes, metadataSnapshotter) val networkListeners = new ListenerCollection() config.advertisedListeners.foreach { ep => networkListeners.add(new Listener(). setHost(if (Utils.isBlank(ep.host)) InetAddress.getLocalHost.getCanonicalHostName else ep.host). setName(ep.listenerName.value()). setPort(if (ep.port == 0) socketServer.boundPort(ep.listenerName) else ep.port). setSecurityProtocol(ep.securityProtocol.id)) } lifecycleManager.start(() => metadataListener.highestmetadataOffset(), BrokerToControllerChannelManager(controllerNodeProvider, time, metrics, config, "heartbeat", threadNamePrefix, config.brokerSessionTimeoutMs.toLong), metaProps.clusterId, networkListeners, supportedFeatures) // Register a listener with the Raft layer to receive metadata event notifications raftManager.register(metadataListener) val endpoints = new util.ArrayList[Endpoint](networkListeners.size()) var interBrokerListener: Endpoint = null networkListeners.iterator().forEachRemaining(listener => { val endPoint = new Endpoint(listener.name(), SecurityProtocol.forId(listener.securityProtocol()), listener.host(), listener.port()) endpoints.add(endPoint) if (listener.name().equals(config.interBrokerListenerName.value())) { interBrokerListener = endPoint } }) if (interBrokerListener == null) { throw new RuntimeException("Unable to find inter-broker listener " + config.interBrokerListenerName.value() + ". Found listener(s): " + endpoints.asScala.map(ep => ep.listenerName().orElse("(none)")).mkString(", ")) } val authorizerInfo = ServerInfo(new ClusterResource(clusterId), config.nodeId, endpoints, interBrokerListener) authorizer = config.authorizer authorizer.foreach(_.configure(config.originals)) val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match { case Some(authZ) => authZ.start(authorizerInfo).asScala.map { case (ep, cs) => ep -> cs.toCompletableFuture } case None => authorizerInfo.endpoints.asScala.map { ep => ep -> CompletableFuture.completedFuture[Void](null) }.toMap } val fetchManager = new FetchManager(Time.SYSTEM, new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots, KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)) // Create the request processor objects. val raftSupport = RaftSupport(forwardingManager, metadataCache) dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, raftSupport, replicaManager, groupCoordinator, transactionCoordinator, autoTopicCreationManager, config.nodeId, config, metadataCache, metadataCache, metrics, authorizer, quotaManagers, fetchManager, brokerTopicStats, clusterId, time, tokenManager, apiVersionManager) dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time, config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix) if (socketServer.controlPlaneRequestChannelOpt.isDefined) { throw new RuntimeException(KafkaConfig.ControlPlaneListenerNameProp + " is not " + "supported when in KRaft mode.") } // Block until we've caught up with the latest metadata from the controller quorum. lifecycleManager.initialCatchUpFuture.get() // Apply the metadata log changes that we've accumulated. metadataPublisher = new BrokermetadataPublisher(config, metadataCache, logManager, replicaManager, groupCoordinator, transactionCoordinator, clientQuotametadataManager, featureCache, dynamicConfigHandlers.toMap) // Tell the metadata listener to start publishing its output, and wait for the first // publish operation to complete. This first operation will initialize logManager, // replicaManager, groupCoordinator, and txnCoordinator. The log manager may perform // a potentially lengthy recovery-from-unclean-shutdown operation here, if required. metadataListener.startPublishing(metadataPublisher).get() // Log static broker configurations. new KafkaConfig(config.originals(), true) // Enable inbound TCP connections. socketServer.startProcessingRequests(authorizerFutures) // We're now ready to unfence the broker. This also allows this broker to transition // from RECOVERY state to RUNNING state, once the controller unfences the broker. lifecycleManager.setReadyToUnfence() maybeChangeStatus(STARTING, STARTED) } catch { case e: Throwable => maybeChangeStatus(STARTING, STARTED) fatal("Fatal error during broker startup. Prepare to shutdown", e) shutdown() throw e } } -
SocketServer 对象创建时会触发内部的请求队列 RequestChannel 对象的创建,其内部关键成员对象如下:
- maxQueuedRequests:请求队列的大小,由 queued.max.requests 配置决定
- dataPlaneProcessors: 缓存网络连接上数据平面数据处理器的 Map
- dataPlaneAcceptors: 缓存网络连接的数据平面接收器的 Map
- dataPlaneRequestChannel: 数据平面 RequestChannel请求队列对象,缓存收到的请求
- controlPlaneRequestChannelOpt: 控制平面的 RequestChannel请求队列对象,默认大小为 20,由配置 control.plane.listener.name 决定是否创建,在 Kafka 3.0 版本的 KRaft 模式下如果该配置存在将报异常
class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time, val credentialProvider: CredentialProvider, val apiVersionManager: ApiVersionManager) extends Logging with KafkaMetricsGroup with BrokerReconfigurable { private val maxQueuedRequests = config.queuedMaxRequests private val nodeId = config.brokerId private val logContext = new LogContext(s"[SocketServer listenerType=${apiVersionManager.listenerType}, nodeId=$nodeId] ") this.logIdent = logContext.logPrefix private val memoryPoolSensor = metrics.sensor("MemoryPoolUtilization") private val memoryPoolDepletedPercentMetricName = metrics.metricName("MemoryPoolAvgDepletedPercent", MetricsGroup) private val memoryPoolDepletedTimeMetricName = metrics.metricName("MemoryPoolDepletedTimeTotal", MetricsGroup) memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName)) private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE // data-plane private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]() private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]() val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time, apiVersionManager.newRequestMetrics) // control-plane private var controlPlaneProcessorOpt : Option[Processor] = None private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ => new RequestChannel(20, ControlPlaneMetricPrefix, time, apiVersionManager.newRequestMetrics)) private var nextProcessorId = 0 val connectionQuotas = new ConnectionQuotas(config, time, metrics) private var startedProcessingRequests = false private var stoppedProcessingRequests = false ...... } -
在本节步骤6中可以看到 SocketServer 对象被创建后立即就被调用了启动方法 SocketServer.scala#startup(),该方法的核心如下:
- 调用 SocketServer.scala#createControlPlaneAcceptorAndProcessor() 方法创建控制平面的连接接收器及连接处理器,此处是兼容旧版本的处理,在 Kafka 3.0 版本的 KRaft 模式下不支持控制平面的 control.plane.listener.name配置,故此方法调用可忽略
- 调用 SocketServer.scala#createDataPlaneAcceptorsAndProcessors() 方法创建数据平面的连接接收器及连接处理器,此处该方法参数来源于默认参数,即 KafkaConfig.scala#dataPlaneListeners() 方法的返回值
- 根据 startProcessingRequests 参数决定是否启动底层网络监听,此处是不启动的
def startup(startProcessingRequests: Boolean = true, controlPlaneListener: Option[EndPoint] = config.controlPlaneListener, dataPlaneListeners: Seq[EndPoint] = config.dataPlaneListeners): Unit = { this.synchronized { createControlPlaneAcceptorAndProcessor(controlPlaneListener) createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, dataPlaneListeners) if (startProcessingRequests) { this.startProcessingRequests() } } newGauge(s"${DataPlaneMetricPrefix}NetworkProcessorAvgIdlePercent", () => SocketServer.this.synchronized { val ioWaitRatioMetricNames = dataPlaneProcessors.values.asScala.iterator.map { p => metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags) } ioWaitRatioMetricNames.map { metricName => Option(metrics.metric(metricName)).fold(0.0)(m => Math.min(m.metricValue.asInstanceOf[Double], 1.0)) }.sum / dataPlaneProcessors.size }) newGauge(s"${ControlPlaneMetricPrefix}NetworkProcessorAvgIdlePercent", () => SocketServer.this.synchronized { val ioWaitRatioMetricName = controlPlaneProcessorOpt.map { p => metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags) } ioWaitRatioMetricName.map { metricName => Option(metrics.metric(metricName)).fold(0.0)(m => Math.min(m.metricValue.asInstanceOf[Double], 1.0)) }.getOrElse(Double.NaN) }) newGauge("MemoryPoolAvailable", () => memoryPool.availableMemory) newGauge("MemoryPoolUsed", () => memoryPool.size() - memoryPool.availableMemory) newGauge(s"${DataPlaneMetricPrefix}ExpiredConnectionsKilledCount", () => SocketServer.this.synchronized { val expiredConnectionsKilledCountMetricNames = dataPlaneProcessors.values.asScala.iterator.map { p => metrics.metricName("expired-connections-killed-count", MetricsGroup, p.metricTags) } expiredConnectionsKilledCountMetricNames.map { metricName => Option(metrics.metric(metricName)).fold(0.0)(m => m.metricValue.asInstanceOf[Double]) }.sum }) newGauge(s"${ControlPlaneMetricPrefix}ExpiredConnectionsKilledCount", () => SocketServer.this.synchronized { val expiredConnectionsKilledCountMetricNames = controlPlaneProcessorOpt.map { p => metrics.metricName("expired-connections-killed-count", MetricsGroup, p.metricTags) } expiredConnectionsKilledCountMetricNames.map { metricName => Option(metrics.metric(metricName)).fold(0.0)(m => m.metricValue.asInstanceOf[Double]) }.getOrElse(0.0) }) } -
SocketServer.scala#createDataPlaneAcceptorsAndProcessors() 方法根据配置文件中 listeners 配置的监听器列表,遍历监听器创建对应的 Acceptor 和 Processor,其关键如下:
- 首先调用 SocketServer.scala#createAcceptor() 创建连接接收器 Acceptor
- 再调用 SocketServer.scala#addDataPlaneProcessors() 为连接接收器创建属于它的 Processor,Processor 个数由 num.network.threads 配置决定
private def createDataPlaneAcceptorsAndProcessors(dataProcessorsPerListener: Int, endpoints: Seq[EndPoint]): Unit = { endpoints.foreach { endpoint => connectionQuotas.addListener(config, endpoint.listenerName) val dataPlaneAcceptor = createAcceptor(endpoint, DataPlaneMetricPrefix) addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener) dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor) info(s"Created data-plane acceptor and processors for endpoint : ${endpoint.listenerName}") } } -
SocketServer.scala#createAcceptor() 方法如下,可以看到主要操作是新建一个 SocketServer.scala#Acceptor 对象
private def createAcceptor(endPoint: EndPoint, metricPrefix: String) : Acceptor = { val sendBufferSize = config.socketSendBufferBytes val recvBufferSize = config.socketReceiveBufferBytes new Acceptor(endPoint, sendBufferSize, recvBufferSize, nodeId, connectionQuotas, metricPrefix, time) } -
SocketServer.scala#Acceptor 为内部类,其比较关键的成员对象如下:
- nioSelector: Java 中的 Selector 对象,负责监听网络连接
- serverChannel: Java 中的服务端 ServerSocketChannel 对象,该对象由 SocketServer.scala#Acceptor#openServerSocket() 方法创建,创建时会绑定监听端口
private[kafka] class Acceptor(val endPoint: EndPoint, val sendBufferSize: Int, val recvBufferSize: Int, nodeId: Int, connectionQuotas: ConnectionQuotas, metricPrefix: String, time: Time, logPrefix: String = "") extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup { this.logIdent = logPrefix private val nioSelector = NSelector.open() val serverChannel = openServerSocket(endPoint.host, endPoint.port) private val processors = new ArrayBuffer[Processor]() private val processorsStarted = new AtomicBoolean private val blockedPercentMeter = newMeter(s"${metricPrefix}AcceptorBlockedPercent", "blocked time", TimeUnit.NANOSECONDS, Map(ListenerMetricTag -> endPoint.listenerName.value)) private var currentProcessorIndex = 0 private[network] val throttledSockets = new mutable.PriorityQueue[DelayedCloseSocket]() ...... private def openServerSocket(host: String, port: Int): ServerSocketChannel = { val socketAddress = if (Utils.isBlank(host)) new InetSocketAddress(port) else new InetSocketAddress(host, port) val serverChannel = ServerSocketChannel.open() serverChannel.configureBlocking(false) if (recvBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) serverChannel.socket().setReceiveBufferSize(recvBufferSize) try { serverChannel.socket.bind(socketAddress) info(s"Awaiting socket connections on ${socketAddress.getHostString}:${serverChannel.socket.getLocalPort}.") } catch { case e: SocketException => throw new KafkaException(s"Socket server failed to bind to ${socketAddress.getHostString}:$port: ${e.getMessage}.", e) } serverChannel } ······ } -
回到本节步骤9,SocketServer.scala#addDataPlaneProcessors() 方法实现如下,可以看到核心 for 循环创建 Processor,重要处理如下:
- 调用 SocketServer.scala#newProcessor() 方法创建 Processor
- 调用RequestChannel.scala#addProcessor() 方法将新创建的 Processor 对象保存在内部列表,后续将用于请求处理完成后响应的分配处理
- for 循环结束,调用Acceptor#addProcessors() 将创建的所有 Processor 缓存到内部,后续将用于新建连接的分配
private def addDataPlaneProcessors(acceptor: Acceptor, endpoint: EndPoint, newProcessorsPerListener: Int): Unit = { val listenerName = endpoint.listenerName val securityProtocol = endpoint.securityProtocol val listenerProcessors = new ArrayBuffer[Processor]() val isPrivilegedListener = controlPlaneRequestChannelOpt.isEmpty && config.interBrokerListenerName == listenerName for (_ <- 0 until newProcessorsPerListener) { val processor = newProcessor(nextProcessorId, dataPlaneRequestChannel, connectionQuotas, listenerName, securityProtocol, memoryPool, isPrivilegedListener) listenerProcessors += processor dataPlaneRequestChannel.addProcessor(processor) nextProcessorId += 1 } listenerProcessors.foreach(p => dataPlaneProcessors.put(p.id, p)) acceptor.addProcessors(listenerProcessors, DataPlaneThreadPrefix) } -
SocketServer.scala#Processor 定义如下,可以看到内部比较关键的属性
- newConnections: 新连接列表,负责缓存由 Acceptor 接收后分配至 Processor 处理的连接将
- responseQueue: 响应列表,负责缓存请求处理完成后的响应
- selector: Kafak 的 KSelector 对象,其内部封装着 Java 的 Selector,负责监听分配给 Processor 处理的连接
private[kafka] class Processor(val id: Int, time: Time, maxRequestSize: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, connectionsMaxIdleMs: Long, failedAuthenticationDelayMs: Int, listenerName: ListenerName, securityProtocol: SecurityProtocol, config: KafkaConfig, metrics: Metrics, credentialProvider: CredentialProvider, memoryPool: MemoryPool, logContext: LogContext, connectionQueueSize: Int, isPrivilegedListener: Boolean, apiVersionManager: ApiVersionManager) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup { ...... private val newConnections = new ArrayBlockingQueue[SocketChannel](connectionQueueSize) private val inflightResponses = mutable.Map[String, RequestChannel.Response]() private val responseQueue = new linkedBlockingDeque[RequestChannel.Response]() private[kafka] val metricTags = mutable.linkedHashMap( ListenerMetricTag -> listenerName.value, NetworkProcessorMetricTag -> id.toString ).asJava newGauge(IdlePercentMetricName, () => { Option(metrics.metric(metrics.metricName("io-wait-ratio", MetricsGroup, metricTags))).fold(0.0)(m => Math.min(m.metricValue.asInstanceOf[Double], 1.0)) }, // for compatibility, only add a networkProcessor tag to the Yammer Metrics alias (the equivalent Selector metric // also includes the listener name) Map(NetworkProcessorMetricTag -> id.toString) ) val expiredConnectionsKilledCount = new CumulativeSum() private val expiredConnectionsKilledCountMetricName = metrics.metricName("expired-connections-killed-count", MetricsGroup, metricTags) metrics.addMetric(expiredConnectionsKilledCountMetricName, expiredConnectionsKilledCount) private val selector = createSelector( ChannelBuilders.serverChannelBuilder( listenerName, listenerName == config.interBrokerListenerName, securityProtocol, config, credentialProvider.credentialCache, credentialProvider.tokenCache, time, logContext, () => apiVersionManager.apiVersionResponse(throttleTimeMs = 0) ) ) // Visible to override for testing protected[network] def createSelector(channelBuilder: ChannelBuilder): KSelector = { channelBuilder match { case reconfigurable: Reconfigurable => config.addReconfigurable(reconfigurable) case _ => } new KSelector( maxRequestSize, connectionsMaxIdleMs, failedAuthenticationDelayMs, metrics, time, "socket-server", metricTags, false, true, channelBuilder, memoryPool, logContext) } ...... } -
回到本节步骤6 BrokerServer.scala#startup() 方法内成员变量 dataPlaneRequestHandlerPool 的赋值,可以看到实例为 KafkaRequestHandler.scala#KafkaRequestHandlerPool 类对象,并且持有了 KafkaApis 对象作为上层的请求处理器,其关键属性如下:
- threadPoolSize: 处理器线程池大小,由配置num.io.threads 决定
- runnables: 处理器KafkaRequestHandler的数组,各个处理器由 KafkaRequestHandler.scala#KafkaRequestHandlerPool#createHandler() 方法创建,可以看到实际处理器对象为封装了 KafkaApis 对象的KafkaRequestHandler 对象,该对象被创建后就扔进了新建线程中执行
class KafkaRequestHandlerPool(val brokerId: Int, val requestChannel: RequestChannel, val apis: ApiRequestHandler, time: Time, numThreads: Int, requestHandlerAvgIdleMetricName: String, logAndThreadNamePrefix : String) extends Logging with KafkaMetricsGroup { private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads) private val aggregateIdleMeter = newMeter(requestHandlerAvgIdleMetricName, "percent", TimeUnit.NANOSECONDS) this.logIdent = "[" + logAndThreadNamePrefix + " Kafka Request Handler on Broker " + brokerId + "], " val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads) for (i <- 0 until numThreads) { createHandler(i) } def createHandler(id: Int): Unit = synchronized { runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time) KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start() } ...... } -
KafkaRequestHandler 的类定义如下,可以看到其作为线程任务启动后会在 KafkaRequestHandler.scala#run() 方法中死循环不断处理底层接收到的请求,关键步骤如下:
- requestChannel.receiveRequest()不断轮询请求队列,获取请求,如没有请求则线程阻塞
- apis.handle 调用接口方法 ApiRequestHandler#handle() ,将请求投递到处理器中进行处理
class KafkaRequestHandler(id: Int, brokerId: Int, val aggregateIdleMeter: Meter, val totalHandlerThreads: AtomicInteger, val requestChannel: RequestChannel, apis: ApiRequestHandler, time: Time) extends Runnable with Logging { this.logIdent = s"[Kafka Request Handler $id on Broker $brokerId], " private val shutdownComplete = new CountDownLatch(1) private val requestLocal = RequestLocal.withThreadConfinedCaching @volatile private var stopped = false def run(): Unit = { while (!stopped) { // We use a single meter for aggregate idle percentage for the thread pool. // Since meter is calculated as total_recorded_value / time_window and // time_window is independent of the number of threads, each recorded idle // time should be discounted by # threads. val startSelectTime = time.nanoseconds val req = requestChannel.receiveRequest(300) val endTime = time.nanoseconds val idleTime = endTime - startSelectTime aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get) req match { case RequestChannel.ShutdownRequest => debug(s"Kafka request handler $id on broker $brokerId received shut down command") completeShutdown() return case request: RequestChannel.Request => try { request.requestDequeueTimeNanos = endTime trace(s"Kafka request handler $id on broker $brokerId handling request $request") apis.handle(request, requestLocal) } catch { case e: FatalExitError => completeShutdown() Exit.exit(e.statusCode) case e: Throwable => error("Exception when handling request", e) } finally { request.releaseBuffer() } case null => // continue } } completeShutdown() } ...... } -
上层的请求处理器已经启动,此时回到步骤6启动底层网络服务器的方法SocketServer.scala#startProcessingRequests(),从源码中可以看到,此处核心为调用 SocketServer.scala#startDataPlaneProcessorsAndAcceptors() 方法,最终启动 Acceptor 和 Prrocessor 的逻辑在 SocketServer.scala#startAcceptorAndProcessors() 方法中
def startProcessingRequests(authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = Map.empty): Unit = { info("Starting socket server acceptors and processors") this.synchronized { if (!startedProcessingRequests) { startControlPlaneProcessorAndAcceptor(authorizerFutures) startDataPlaneProcessorsAndAcceptors(authorizerFutures) startedProcessingRequests = true } else { info("Socket server acceptors and processors already started") } } info("Started socket server acceptors and processors") } private def startDataPlaneProcessorsAndAcceptors(authorizerFutures: Map[Endpoint, CompletableFuture[Void]]): Unit = { val interBrokerListener = dataPlaneAcceptors.asScala.keySet .find(_.listenerName == config.interBrokerListenerName) val orderedAcceptors = interBrokerListener match { case Some(interBrokerListener) => List(dataPlaneAcceptors.get(interBrokerListener)) ++ dataPlaneAcceptors.asScala.filter { case (k, _) => k != interBrokerListener }.values case None => dataPlaneAcceptors.asScala.values } orderedAcceptors.foreach { acceptor => val endpoint = acceptor.endPoint startAcceptorAndProcessors(DataPlaneThreadPrefix, endpoint, acceptor, authorizerFutures) } } -
SocketServer.scala#startAcceptorAndProcessors() 方法的实现如下,简单来说分为两步,至此服务端的启动告一段落
- 首先调用SocketServer.scala#Acceptor#startProcessors() 将 Acceptor 内部的 Processor 都扔进线程中启动
- 其次新起线程,将当前 Acceptor 扔进线程中启动
private def startAcceptorAndProcessors(threadPrefix: String, endpoint: EndPoint, acceptor: Acceptor, authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = Map.empty): Unit = { debug(s"Wait for authorizer to complete start up on listener ${endpoint.listenerName}") waitForAuthorizerFuture(acceptor, authorizerFutures) debug(s"Start processors on listener ${endpoint.listenerName}") acceptor.startProcessors(threadPrefix) debug(s"Start acceptor thread on listener ${endpoint.listenerName}") if (!acceptor.isStarted()) { KafkaThread.nonDaemon( s"${threadPrefix}-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}", acceptor ).start() acceptor.awaitStartup() } info(s"Started $threadPrefix acceptor and processor(s) for endpoint : ${endpoint.listenerName}") }
-
Acceptor 连接接收器启动后,会触发 SocketServer.scala#Acceptor#run() 方法执行,可以看到起关键操作如下:
- 首先通过 serverChannel.register() 将服务端 ServerSocketChannel注册到 Selector 上,并设置监听的事件为 SelectionKey.OP_ACCEPT
- 在死循环中不断调用 SocketServer.scala#Acceptor#acceptNewConnections() 方法接收远端连接
def run(): Unit = { serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT) startupComplete() try { while (isRunning) { try { acceptNewConnections() closeThrottledConnections() } catch { // We catch all the throwables to prevent the acceptor thread from exiting on exceptions due // to a select operation on a specific channel or a bad request. We don't want // the broker to stop responding to requests from other clients in these scenarios. case e: ControlThrowable => throw e case e: Throwable => error("Error occurred", e) } } } finally { debug("Closing server socket, selector, and any throttled sockets.") CoreUtils.swallow(serverChannel.close(), this, Level.ERROR) CoreUtils.swallow(nioSelector.close(), this, Level.ERROR) throttledSockets.foreach(throttledSocket => closeSocket(throttledSocket.socket)) throttledSockets.clear() shutdownComplete() } } -
SocketServer.scala#Acceptor#acceptNewConnections() 方法其实就是标准的 NIO 处理,可以看到核心如下:
- 首先调用 nioSelector.select() 轮询底层连接,如有连接就绪则调用 nioSelector.selectedKeys() 获取事件处理
- 如果是接收连接事件,则调用SocketServer.scala#Acceptor#accept()进行连接接收,随后按照连接计数器取模选定一个 Processor,最后调用SocketServer.scala#Acceptor#assignNewConnection() 方法将连接分配给选定的 Processor
private def acceptNewConnections(): Unit = { val ready = nioSelector.select(500) if (ready > 0) { val keys = nioSelector.selectedKeys() val iter = keys.iterator() while (iter.hasNext && isRunning) { try { val key = iter.next iter.remove() if (key.isAcceptable) { accept(key).foreach { socketChannel => // Assign the channel to the next processor (using round-robin) to which the // channel can be added without blocking. If newConnections queue is full on // all processors, block until the last one is able to accept a connection. var retriesLeft = synchronized(processors.length) var processor: Processor = null do { retriesLeft -= 1 processor = synchronized { // adjust the index (if necessary) and retrieve the processor atomically for // correct behaviour in case the number of processors is reduced dynamically currentProcessorIndex = currentProcessorIndex % processors.length processors(currentProcessorIndex) } currentProcessorIndex += 1 } while (!assignNewConnection(socketChannel, processor, retriesLeft == 0)) } } else throw new IllegalStateException("Unrecognized key state for acceptor thread.") } catch { case e: Throwable => error("Error while accepting connection", e) } } } } -
SocketServer.scala#Acceptor#accept()内部调用 serverSocketChannel.accept() 进行标准的连接接收处理,可以看到此处配置了新连接的一些关键属性
private def accept(key: SelectionKey): Option[SocketChannel] = { val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel] val socketChannel = serverSocketChannel.accept() try { connectionQuotas.inc(endPoint.listenerName, socketChannel.socket.getInetAddress, blockedPercentMeter) socketChannel.configureBlocking(false) socketChannel.socket().setTcpNoDelay(true) socketChannel.socket().setKeepAlive(true) if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) socketChannel.socket().setSendBufferSize(sendBufferSize) Some(socketChannel) } catch { case e: TooManyConnectionsException => info(s"Rejected connection from ${e.ip}, address already has the configured maximum of ${e.count} connections.") close(endPoint.listenerName, socketChannel) None case e: ConnectionThrottledException => val ip = socketChannel.socket.getInetAddress debug(s"Delaying closing of connection from $ip for ${e.throttleTimeMs} ms") val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs throttledSockets += DelayedCloseSocket(socketChannel, endThrottleTimeMs) None } } -
SocketServer.scala#Acceptor#assignNewConnection() 方法的核心是调用 SocketServer.scala#Processor#accept() 方法将新建立的链接丢到 Processor 的新连接队列
private def assignNewConnection(socketChannel: SocketChannel, processor: Processor, mayBlock: Boolean): Boolean = { if (processor.accept(socketChannel, mayBlock, blockedPercentMeter)) { debug(s"Accepted connection from ${socketChannel.socket.getRemoteSocketAddress} on" + s" ${socketChannel.socket.getLocalSocketAddress} and assigned it to processor ${processor.id}," + s" sendBufferSize [actual|requested]: [${socketChannel.socket.getSendBufferSize}|$sendBufferSize]" + s" recvBufferSize [actual|requested]: [${socketChannel.socket.getReceiveBufferSize}|$recvBufferSize]") true } else false } -
SocketServer.scala#Processor#accept()方法主要处理是新连接入队,其次也会调用 SocketServer.scala#Processor#wakeup() 唤醒 Processor 的 Selector ,至此新连接的处理分配告一段落
def accept(socketChannel: SocketChannel, mayBlock: Boolean, acceptorIdlePercentMeter: com.yammer.metrics.core.Meter): Boolean = { val accepted = { if (newConnections.offer(socketChannel)) true else if (mayBlock) { val startNs = time.nanoseconds newConnections.put(socketChannel) acceptorIdlePercentMeter.mark(time.nanoseconds() - startNs) true } else false } if (accepted) wakeup() accepted }
-
新连接入队后,需要将其注册到 Processor的 Selector 上才能实现读写事件监听,这些主要在SocketServer.scala#Processor#run()方法中处理,可以看到这个方法关键逻辑如下:
- 首先调用 SocketServer.scala#Processor#configureNewConnections()方法将新连接列表中的连接注册到 Selector 上
- 接下来调用 SocketServer.scala#Processor#processNewResponses()方法将响应列表中的响应存入连接缓存,注册监听可写事件SelectionKey.OP_WRITE
- 调用 SocketServer.scala#Processor#poll()方法处理连接上的可读事件,将网络数据暂存到接收缓冲区
- 调用 SocketServer.scala#Processor#processCompletedReceives()方法将接收缓冲区的网络数据解析为 Kafka 请求,并将其扔进请求队列,等待请求处理器轮询处理
override def run(): Unit = { startupComplete() try { while (isRunning) { try { // setup any new connections that have been queued up configureNewConnections() // register any new responses for writing processNewResponses() poll() processCompletedReceives() processCompletedSends() processDisconnected() closeExcessConnections() } catch { // We catch all the throwables here to prevent the processor thread from exiting. We do this because // letting a processor exit might cause a bigger impact on the broker. This behavior might need to be // reviewed if we see an exception that needs the entire broker to stop. Usually the exceptions thrown would // be either associated with a specific socket channel or a bad request. These exceptions are caught and // processed by the individual methods above which close the failing channel and continue processing other // channels. So this catch block should only ever see ControlThrowables. case e: Throwable => processException("Processor got uncaught exception.", e) } } } finally { debug(s"Closing selector - processor $id") CoreUtils.swallow(closeAll(), this, Level.ERROR) shutdownComplete() } } -
SocketServer.scala#Processor#configureNewConnections()方法的关键处理其实就是从新连接列表 newConnections 中拿到连接对象,随后调用 selector.register() 将其注册到 Selector上并设置监听事件为 SelectionKey.OP_READ
private def configureNewConnections(): Unit = { var connectionsProcessed = 0 while (connectionsProcessed < connectionQueueSize && !newConnections.isEmpty) { val channel = newConnections.poll() try { debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}") selector.register(connectionId(channel.socket), channel) connectionsProcessed += 1 } catch { // We explicitly catch all exceptions and close the socket to avoid a socket leak. case e: Throwable => val remoteAddress = channel.socket.getRemoteSocketAddress // need to close the channel here to avoid a socket leak. close(listenerName, channel) processException(s"Processor $id closed connection from $remoteAddress", e) } } } -
SocketServer.scala#Processor#poll()方法如下,可以看到核心是调用 Selector.java#poll() 方法去处理底层连接的读写事件
private def poll(): Unit = { val pollTimeout = if (newConnections.isEmpty) 300 else 0 try selector.poll(pollTimeout) catch { case e @ (_: IllegalStateException | _: IOException) => // The exception is not re-thrown and any completed sends/receives/connections/disconnections // from this poll will be processed. error(s"Processor $id poll failed", e) } } -
Selector.java#poll() 方法内部也是标准的 NIO 操作处理,轮询出来的读写就绪连接都交给了 Selector.java#pollSelectionKeys()方法处理
public void poll(long timeout) throws IOException { if (timeout < 0) throw new IllegalArgumentException("timeout should be >= 0"); boolean madeReadProgressLastCall = madeReadProgressLastPoll; clear(); boolean dataInBuffers = !keysWithBufferedRead.isEmpty(); if (!immediatelyConnectedKeys.isEmpty() || (madeReadProgressLastCall && dataInBuffers)) timeout = 0; if (!memoryPool.isOutOfMemory() && outOfMemory) { //we have recovered from memory pressure. unmute any channel not explicitly muted for other reasons log.trace("Broker no longer low on memory - unmuting incoming sockets"); for (KafkaChannel channel : channels.values()) { if (channel.isInMutableState() && !explicitlyMutedChannels.contains(channel)) { channel.maybeUnmute(); } } outOfMemory = false; } long startSelect = time.nanoseconds(); int numReadyKeys = select(timeout); long endSelect = time.nanoseconds(); this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds()); if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) { SetreadyKeys = this.nioSelector.selectedKeys(); // Poll from channels that have buffered data (but nothing more from the underlying socket) if (dataInBuffers) { keysWithBufferedRead.removeAll(readyKeys); //so no channel gets polled twice Set toPoll = keysWithBufferedRead; keysWithBufferedRead = new HashSet<>(); //poll() calls will repopulate if needed pollSelectionKeys(toPoll, false, endSelect); } // Poll from channels where the underlying socket has more data pollSelectionKeys(readyKeys, false, endSelect); // Clear all selected keys so that they are included in the ready count for the next select readyKeys.clear(); pollSelectionKeys(immediatelyConnectedKeys, true, endSelect); immediatelyConnectedKeys.clear(); } else { madeReadProgressLastPoll = true; //no work is also "progress" } long endIo = time.nanoseconds(); this.sensors.ioTime.record(endIo - endSelect, time.milliseconds()); // Close channels that were delayed and are now ready to be closed completeDelayedChannelClose(endIo); // we use the time at the end of select to ensure that we don't close any connections that // have just been processed in pollSelectionKeys maybeCloseOldestConnection(endSelect); } -
Selector.java#pollSelectionKeys()方法的关键处理有两处:
- 调用 Selector.java#attemptRead()方法接收可读连接上的数据,并将其暂存在接收缓冲区
- 调用 Selector.java#attemptWrite()方法尝试将该连接发送缓冲区的数据写入Socket
void pollSelectionKeys(Set
selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) { for (SelectionKey key : determineHandlingOrder(selectionKeys)) { KafkaChannel channel = channel(key); long channelStartTimeNanos = recordTimePerConnection ? time.nanoseconds() : 0; boolean sendFailed = false; String nodeId = channel.id(); // register all per-connection metrics at once sensors.maybeRegisterConnectionMetrics(nodeId); if (idleExpiryManager != null) idleExpiryManager.update(nodeId, currentTimeNanos); try { if (isImmediatelyConnected || key.isConnectable()) { if (channel.finishConnect()) { this.connected.add(nodeId); this.sensors.connectionCreated.record(); SocketChannel socketChannel = (SocketChannel) key.channel(); log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}", socketChannel.socket().getReceiveBufferSize(), socketChannel.socket().getSendBufferSize(), socketChannel.socket().getSoTimeout(), nodeId); } else { continue; } } if (channel.isConnected() && !channel.ready()) { channel.prepare(); if (channel.ready()) { long readyTimeMs = time.milliseconds(); boolean isReauthentication = channel.successfulAuthentications() > 1; if (isReauthentication) { sensors.successfulReauthentication.record(1.0, readyTimeMs); if (channel.reauthenticationLatencyMs() == null) log.warn( "Should never happen: re-authentication latency for a re-authenticated channel was null; continuing..."); else sensors.reauthenticationLatency .record(channel.reauthenticationLatencyMs().doublevalue(), readyTimeMs); } else { sensors.successfulAuthentication.record(1.0, readyTimeMs); if (!channel.connectedClientSupportsReauthentication()) sensors.successfulAuthenticationNoReauth.record(1.0, readyTimeMs); } log.debug("Successfully {}authenticated with {}", isReauthentication ? "re-" : "", channel.socketDescription()); } } if (channel.ready() && channel.state() == ChannelState.NOT_CONNECTED) channel.state(ChannelState.READY); Optional responseReceivedDuringReauthentication = channel.pollResponseReceivedDuringReauthentication(); responseReceivedDuringReauthentication.ifPresent(receive -> { long currentTimeMs = time.milliseconds(); addToCompletedReceives(channel, receive, currentTimeMs); }); //if channel is ready and has bytes to read from socket or buffer, and has no //previous completed receive then read from it if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasCompletedReceive(channel) && !explicitlyMutedChannels.contains(channel)) { attemptRead(channel); } if (channel.hasBytesBuffered() && !explicitlyMutedChannels.contains(channel)) { //this channel has bytes enqueued in intermediary buffers that we could not read //(possibly because no memory). it may be the case that the underlying socket will //not come up in the next poll() and so we need to remember this channel for the //next poll call otherwise data may be stuck in said buffers forever. If we attempt //to process buffered data and no progress is made, the channel buffered status is //cleared to avoid the overhead of checking every time. keysWithBufferedRead.add(key); } long nowNanos = channelStartTimeNanos != 0 ? channelStartTimeNanos : currentTimeNanos; try { attemptWrite(key, channel, nowNanos); } catch (Exception e) { sendFailed = true; throw e; } if (!key.isValid()) close(channel, CloseMode.GRACEFUL); } catch (Exception e) { String desc = channel.socketDescription(); if (e instanceof IOException) { log.debug("Connection with {} disconnected", desc, e); } else if (e instanceof AuthenticationException) { boolean isReauthentication = channel.successfulAuthentications() > 0; if (isReauthentication) sensors.failedReauthentication.record(); else sensors.failedAuthentication.record(); String exceptionMessage = e.getMessage(); if (e instanceof DelayedResponseAuthenticationException) exceptionMessage = e.getCause().getMessage(); log.info("Failed {}authentication with {} ({})", isReauthentication ? "re-" : "", desc, exceptionMessage); } else { log.warn("Unexpected error from {}; closing connection", desc, e); } if (e instanceof DelayedResponseAuthenticationException) maybeDelayCloseOnAuthenticationFailure(channel); else close(channel, sendFailed ? CloseMode.NOTIFY_onLY : CloseMode.GRACEFUL); } finally { maybeRecordTimePerConnection(channel, channelStartTimeNanos); } } } -
Selector.java#attemptRead()方法会从连接上读取网络数据,并调用 Selector.java#addToCompletedReceives()方法将接收到的网络数据暂存到缓冲区
private void attemptRead(KafkaChannel channel) throws IOException { String nodeId = channel.id(); long bytesReceived = channel.read(); if (bytesReceived != 0) { long currentTimeMs = time.milliseconds(); sensors.recordBytesReceived(nodeId, bytesReceived, currentTimeMs); madeReadProgressLastPoll = true; NetworkReceive receive = channel.maybeCompleteReceive(); if (receive != null) { addToCompletedReceives(channel, receive, currentTimeMs); } } if (channel.isMuted()) { outOfMemory = true; //channel has muted itself due to memory pressure. } else { madeReadProgressLastPoll = true; } } -
回到本节步骤1, SocketServer.scala#Processor#processCompletedReceives()方法会将上一步接收到的网路数据解析为上层处理器能够处理的 Kafka 请求,其关键点如下,至此 Kafka 请求处理的关键流程基本结束
- 调用 SocketServer.scala#Processor#parseRequestHeader() 解析出请求的头信息
- 调用 RequestChannel.scala#Request 构造方法将网络数据入参创建 Kafka 请求,其内部将根据 Kafka 自定义的协议完成请求的解析工作
- requestChannel.sendRequest() 触发 RequestChannel.scala#sendRequest() 方法,将解析完成的请求扔进请求队列,上层处理器 KafkaRequestHandler 将调用 RequestChannel.scala#receiveRequest() 方法轮询请求队列完成请求处理,也就是本文第一节步骤15的流程
private def processCompletedReceives(): Unit = { selector.completedReceives.forEach { receive => try { openOrClosingChannel(receive.source) match { case Some(channel) => val header = parseRequestHeader(receive.payload) if (header.apiKey == ApiKeys.SASL_HANDSHAKE && channel.maybeBeginServerReauthentication(receive, () => time.nanoseconds())) trace(s"Begin re-authentication: $channel") else { val nowNanos = time.nanoseconds() if (channel.serverAuthenticationSessionExpired(nowNanos)) { // be sure to decrease connection count and drop any in-flight responses debug(s"Disconnecting expired channel: $channel : $header") close(channel.id) expiredConnectionsKilledCount.record(null, 1, 0) } else { val connectionId = receive.source val context = new RequestContext(header, connectionId, channel.socketAddress, channel.principal, listenerName, securityProtocol, channel.channelmetadataRegistry.clientInformation, isPrivilegedListener, channel.principalSerde) val req = new RequestChannel.Request(processor = id, context = context, startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics, None) // KIP-511: ApiVersionsRequest is intercepted here to catch the client software name // and version. It is done here to avoid wiring things up to the api layer. if (header.apiKey == ApiKeys.API_VERSIONS) { val apiVersionsRequest = req.body[ApiVersionsRequest] if (apiVersionsRequest.isValid) { channel.channelmetadataRegistry.registerClientInformation(new ClientInformation( apiVersionsRequest.data.clientSoftwareName, apiVersionsRequest.data.clientSoftwareVersion)) } } requestChannel.sendRequest(req) selector.mute(connectionId) handleChannelMuteEvent(connectionId, ChannelMuteEvent.REQUEST_RECEIVED) } } case None => // This should never happen since completed receives are processed immediately after `poll()` throw new IllegalStateException(s"Channel ${receive.source} removed from selector before processing completed receive") } } catch { // note that even though we got an exception, we can assume that receive.source is valid. // Issues with constructing a valid receive object were handled earlier case e: Throwable => processChannelException(receive.source, s"Exception while processing request from ${receive.source}", e) } } selector.clearCompletedReceives() }



