栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Kafka 3.0 源码笔记(2)-Kafka 服务端的启动与请求处理源码分析

Kafka 3.0 源码笔记(2)-Kafka 服务端的启动与请求处理源码分析

文章目录
  • 前言
  • 源码分析
    • 1. Kafka 服务端的启动流程
    • 2. Kafka 服务端新建连接的处理
    • 3. Kafka 服务端请求处理流程

前言

在 Kafka 3.0 源码笔记(1)-Kafka 服务端的网络通信架构 中笔者介绍了 Kafka 3.0 版本的组件构成,其实由此也可以将本文内容分为三个部分,主要时序如下图所示:

  1. Kafka 服务端的启动流程
  2. Kafka 服务端新建连接的处理
  3. Kafka 服务端请求处理流程

源码分析 1. Kafka 服务端的启动流程
  1. Kafka 服务端的启动由 Kafka.scala#main() 方法为入口,可以看到主要步骤如下:

    1. 调用 Kafka.scala#getPropsFromArgs() 方法将启动参数中指定的配置文件加载到内存中
    2. 调用 Kafka.scala#buildServer() 方法创建 kafka 的服务端实例对象
    3. 调用创建的服务端实例对象的接口方法 Server.scala#startup() 方法启动服务端
    def main(args: Array[String]): Unit = {
     try {
       val serverProps = getPropsFromArgs(args)
       val server = buildServer(serverProps)
    
       try {
         if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk)
           new LoggingSignalHandler().register()
       } catch {
         case e: ReflectiveOperationException =>
           warn("Failed to register optional signal handler that logs a message when the process is terminated " +
             s"by a signal. Reason for registration failure is: $e", e)
       }
    
       // attach shutdown handler to catch terminating signals as well as normal termination
       Exit.addShutdownHook("kafka-shutdown-hook", {
         try server.shutdown()
         catch {
           case _: Throwable =>
             fatal("Halting Kafka.")
             // Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit.
             Exit.halt(1)
         }
       })
    
       try server.startup()
       catch {
         case _: Throwable =>
           // KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code
           fatal("Exiting Kafka.")
           Exit.exit(1)
       }
    
       server.awaitShutdown()
     }
     catch {
       case e: Throwable =>
         fatal("Exiting Kafka due to fatal exception", e)
         Exit.exit(1)
     }
     Exit.exit(0)
    }
    
  2. Kafka.scala#getPropsFromArgs() 方法的核心是调用 Utils#loadProps() 加载指定的配置文件,这部分逻辑比较简单,不做深入

    def getPropsFromArgs(args: Array[String]): Properties = {
     val optionParser = new OptionParser(false)
     val overrideOpt = optionParser.accepts("override", "Optional property that should override values set in server.properties file")
       .withRequiredArg()
       .ofType(classOf[String])
     // This is just to make the parameter show up in the help output, we are not actually using this due the
     // fact that this class ignores the first parameter which is interpreted as positional and mandatory
     // but would not be mandatory if --version is specified
     // This is a bit of an ugly crutch till we get a chance to rework the entire command line parsing
     optionParser.accepts("version", "Print version information and exit.")
    
     if (args.length == 0 || args.contains("--help")) {
       CommandLineUtils.printUsageAndDie(optionParser,
         "USAGE: java [options] %s server.properties [--override property=value]*".format(this.getClass.getCanonicalName.split('$').head))
     }
    
     if (args.contains("--version")) {
       CommandLineUtils.printVersionAndDie()
     }
    
     val props = Utils.loadProps(args(0))
    
     if (args.length > 1) {
       val options = optionParser.parse(args.slice(1, args.length): _*)
    
       if (options.nonOptionArguments().size() > 0) {
         CommandLineUtils.printUsageAndDie(optionParser, "Found non argument parameters: " + options.nonOptionArguments().toArray.mkString(","))
       }
    
       props ++= CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt).asScala)
     }
     props
    }
    
  3. Kafka.scala#buildServer()方法创建服务端实例对象是非常关键的一步,需要注意的点如下:

    1. 通过 KafkaConfig.scala#fromProps() 方法将加载到内存中的配置转化构建为 KafkaConfig 对象
    2. 调用 KafkaConfig.scala#requiresZookeeper() 方法确定 Kafa 服务端的启动模式。此处主要是通过 process.roles 配置的存在与否来判断,如果这个配置存在则以移除 zk 依赖的 KRaft模式启动,否则以依赖 zk 的旧模式启动
    3. 本文基于 Kafka 3.0 版本,此版本的 KRaft 支持已经比较稳定,故以 KRaft 模式为例进行分析,此处将创建 KafkaRaftServer 对象
      private def buildServer(props: Properties): Server = {
     val config = KafkaConfig.fromProps(props, false)
     if (config.requiresZookeeper) {
       new KafkaServer(
         config,
         Time.SYSTEM,
         threadNamePrefix = None,
         enableForwarding = false
       )
     } else {
       new KafkaRaftServer(
         config,
         Time.SYSTEM,
         threadNamePrefix = None
       )
     }
    }
    
  4. Scala 的语法与 Java 有不少差异,比如 Scala 中构造函数是直接与类声明相关联的,另外KafkaRaftServer对象的创建动作会触发执行不少关键成员对象的创建,与本文直接相关的如下:

    1. broker:BrokerServer 对象,当节点的配置 process.roles 中指定了 broker 角色时才会创建,处理消息数据类请求,例如消息的生产消费等
    2. controller: ControllerServer 对象,当节点的配置 process.roles 中指定了 broker 角色时才会创建,处理元数据类请求,包括 topic 创建删除等
    class KafkaRaftServer(
     config: KafkaConfig,
     time: Time,
     threadNamePrefix: Option[String]
    ) extends Server with Logging {
    
    KafkaMetricsReporter.startReporters(VerifiableProperties(config.originals))
    KafkaYammerMetrics.INSTANCE.configure(config.originals)
    
    private val (metaProps, offlineDirs) = KafkaRaftServer.initializeLogDirs(config)
    
    private val metrics = Server.initializeMetrics(
     config,
     time,
     metaProps.clusterId
    )
    
    private val controllerQuorumVotersFuture = CompletableFuture.completedFuture(
     RaftConfig.parseVoterConnections(config.quorumVoters))
    
    private val raftManager = new KafkaRaftManager[ApiMessageAndVersion](
     metaProps,
     config,
     new metadataRecordSerde,
     KafkaRaftServer.metadataPartition,
     KafkaRaftServer.metadataTopicId,
     time,
     metrics,
     threadNamePrefix,
     controllerQuorumVotersFuture
    )
    
    private val broker: Option[BrokerServer] = if (config.processRoles.contains(BrokerRole)) {
     Some(new BrokerServer(
       config,
       metaProps,
       raftManager,
       time,
       metrics,
       threadNamePrefix,
       offlineDirs,
       controllerQuorumVotersFuture,
       Server.SUPPORTED_FEATURES
     ))
    } else {
     None
    }
    
    private val controller: Option[ControllerServer] = if (config.processRoles.contains(ControllerRole)) {
     Some(new ControllerServer(
       metaProps,
       config,
       raftManager,
       time,
       metrics,
       threadNamePrefix,
       controllerQuorumVotersFuture
     ))
    } else {
     None
    }
    
    ...
    
    }
    
  5. 经过以上步骤,Kafka 服务端的 Server 对象创建完毕,最终创建了一个 KafkaRaftServer 对象,则在本节步骤1第三步将KafkaRaftServer.scala#startup() 方法启动服务端,可以看到和本文相关的重点如下:

    1. controller.foreach(_.startup()) 启动节点上可能存在的 ControllerServer,调用其 ControllerServer.scala#startup() 方法
    2. broker.foreach(_.startup()) 启动节点上可能存在的 BrokerServer,调用其BrokerServer.scala#startup() 方法

    本文将以 BrokerServer 的启动为例进行分析,其实从网络通信结构的角度来看,BrokerServer 和 ControllerServer 几乎是完全一致的

    override def startup(): Unit = {
     Mx4jLoader.maybeLoad()
     raftManager.startup()
     controller.foreach(_.startup())
     broker.foreach(_.startup())
     AppInfoParser.registerAppInfo(Server.MetricsPrefix, config.brokerId.toString, metrics, time.milliseconds())
     info(KafkaBroker.STARTED_MESSAGE)
    }
    
  6. BrokerServer.scala#startup() 方法比较长,其中涉及的关键对象如下。不过去芜存菁,和网络通信相关的重点其实只有两个,分别是SocketServer 底层网络服务器的创建及配置启动 和 KafkaRequestHandlerPool 上层请求处理器池的创建启动

    1. kafkaScheduler:KafkaScheduler 对象,定时任务的线程池
    2. metadataCache: KRaftmetadataCache 对象,集群元数据管理组件
    3. clientToControllerChannelManager :BrokerToControllerChannelManager 对象,broker 到 controller 的连接管理器
    4. forwardingManager:ForwardingManagerImpl 对象,持有 clientToControllerChannelManager 对象,负责转发应该由 controller 处理的请求
    5. socketServer: SocketServer 对象,面向底层网络的服务器对象
    6. _replicaManager: ReplicaManager 对象,副本管理器,负责消息的存储读取
    7. groupCoordinator: GroupCoordinator 对象,普通消费者组的协调器,负责辅助完成消费者组内各个消费者消费分区的协调分配
    8. dataPlaneRequestProcessor: KafkaApis 对象,上层的请求处理器,持有底层网络服务器的请求队列socketServer.dataPlaneRequestChannel,负责从队列中取出请求进行处理
    9. dataPlaneRequestHandlerPool : KafkaRequestHandlerPool 对象,上层的请求处理器线程池
    def startup(): Unit = {
     if (!maybeChangeStatus(SHUTDOWN, STARTING)) return
     try {
       info("Starting broker")
    
       
       kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
       kafkaScheduler.startup()
    
       
       _brokerTopicStats = new BrokerTopicStats
    
       quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
    
       logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
    
       metadataCache = metadataCache.kRaftmetadataCache(config.nodeId)
    
       // Create log manager, but don't start it because we need to delay any potential unclean shutdown log recovery
       // until we catch up on the metadata log and have up-to-date topic and broker configs.
       logManager = LogManager(config, initialOfflineDirs, metadataCache, kafkaScheduler, time,
         brokerTopicStats, logDirFailureChannel, keepPartitionmetadataFile = true)
    
       // Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update.
       // This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically.
       tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
       credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
    
       val controllerNodes = RaftConfig.voterConnectionsToNodes(controllerQuorumVotersFuture.get()).asScala
       val controllerNodeProvider = RaftControllerNodeProvider(raftManager, config, controllerNodes)
    
       clientToControllerChannelManager = BrokerToControllerChannelManager(
         controllerNodeProvider,
         time,
         metrics,
         config,
         channelName = "forwarding",
         threadNamePrefix,
         retryTimeoutMs = 60000
       )
       clientToControllerChannelManager.start()
       forwardingManager = new ForwardingManagerImpl(clientToControllerChannelManager)
    
       val apiVersionManager = ApiVersionManager(
         ListenerType.BROKER,
         config,
         Some(forwardingManager),
         brokerFeatures,
         featureCache
       )
    
       // Create and start the socket server acceptor threads so that the bound port is known.
       // Delay starting processors until the end of the initialization sequence to ensure
       // that credentials have been loaded before processing authentications.
       socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager)
       socketServer.startup(startProcessingRequests = false)
    
       clientQuotametadataManager = new ClientQuotametadataManager(quotaManagers, socketServer.connectionQuotas)
    
       val alterIsrChannelManager = BrokerToControllerChannelManager(
         controllerNodeProvider,
         time,
         metrics,
         config,
         channelName = "alterIsr",
         threadNamePrefix,
         retryTimeoutMs = Long.MaxValue
       )
       alterIsrManager = new DefaultAlterIsrManager(
         controllerChannelManager = alterIsrChannelManager,
         scheduler = kafkaScheduler,
         time = time,
         brokerId = config.nodeId,
         brokerEpochSupplier = () => lifecycleManager.brokerEpoch
       )
       alterIsrManager.start()
    
       this._replicaManager = new ReplicaManager(config, metrics, time, None,
         kafkaScheduler, logManager, isShuttingDown, quotaManagers,
         brokerTopicStats, metadataCache, logDirFailureChannel, alterIsrManager,
         threadNamePrefix)
    
       
       if (config.tokenAuthEnabled) {
         throw new UnsupportedOperationException("Delegation tokens are not supported")
       }
       tokenManager = new DelegationTokenManager(config, tokenCache, time , null)
       tokenManager.startup() // does nothing, we just need a token manager in order to compile right now...
    
       // Create group coordinator, but don't start it until we've started replica manager.
       // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
       groupCoordinator = GroupCoordinator(config, replicaManager, Time.SYSTEM, metrics)
    
       val producerIdManagerSupplier = () => ProducerIdManager.rpc(
         config.brokerId,
         brokerEpochSupplier = () => lifecycleManager.brokerEpoch,
         clientToControllerChannelManager,
         config.requestTimeoutMs
       )
    
       // Create transaction coordinator, but don't start it until we've started replica manager.
       // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
       transactionCoordinator = TransactionCoordinator(config, replicaManager,
         new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"),
         producerIdManagerSupplier, metrics, metadataCache, Time.SYSTEM)
    
       autoTopicCreationManager = new DefaultAutoTopicCreationManager(
         config, Some(clientToControllerChannelManager), None, None,
         groupCoordinator, transactionCoordinator)
    
       
       config.dynamicConfig.addReconfigurables(this)
    
       dynamicConfigHandlers = Map[String, ConfigHandler](
         ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers, None),
         ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
    
       if (!config.processRoles.contains(ControllerRole)) {
         // If no controller is defined, we rely on the broker to generate snapshots.
         metadataSnapshotter = Some(new BrokermetadataSnapshotter(
           config.nodeId,
           time,
           threadNamePrefix,
           new BrokerSnapshotWriterBuilder(raftManager.client)
         ))
       }
    
       metadataListener = new BrokermetadataListener(config.nodeId,
                                                     time,
                                                     threadNamePrefix,
                                                     config.metadataSnapshotMaxNewRecordBytes,
                                                     metadataSnapshotter)
    
       val networkListeners = new ListenerCollection()
       config.advertisedListeners.foreach { ep =>
         networkListeners.add(new Listener().
           setHost(if (Utils.isBlank(ep.host)) InetAddress.getLocalHost.getCanonicalHostName else ep.host).
           setName(ep.listenerName.value()).
           setPort(if (ep.port == 0) socketServer.boundPort(ep.listenerName) else ep.port).
           setSecurityProtocol(ep.securityProtocol.id))
       }
       lifecycleManager.start(() => metadataListener.highestmetadataOffset(),
         BrokerToControllerChannelManager(controllerNodeProvider, time, metrics, config,
           "heartbeat", threadNamePrefix, config.brokerSessionTimeoutMs.toLong),
         metaProps.clusterId, networkListeners, supportedFeatures)
    
       // Register a listener with the Raft layer to receive metadata event notifications
       raftManager.register(metadataListener)
    
       val endpoints = new util.ArrayList[Endpoint](networkListeners.size())
       var interBrokerListener: Endpoint = null
       networkListeners.iterator().forEachRemaining(listener => {
         val endPoint = new Endpoint(listener.name(),
           SecurityProtocol.forId(listener.securityProtocol()),
           listener.host(), listener.port())
         endpoints.add(endPoint)
         if (listener.name().equals(config.interBrokerListenerName.value())) {
           interBrokerListener = endPoint
         }
       })
       if (interBrokerListener == null) {
         throw new RuntimeException("Unable to find inter-broker listener " +
           config.interBrokerListenerName.value() + ". Found listener(s): " +
           endpoints.asScala.map(ep => ep.listenerName().orElse("(none)")).mkString(", "))
       }
       val authorizerInfo = ServerInfo(new ClusterResource(clusterId),
         config.nodeId, endpoints, interBrokerListener)
    
       
       authorizer = config.authorizer
       authorizer.foreach(_.configure(config.originals))
       val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match {
         case Some(authZ) =>
           authZ.start(authorizerInfo).asScala.map { case (ep, cs) =>
             ep -> cs.toCompletableFuture
           }
         case None =>
           authorizerInfo.endpoints.asScala.map { ep =>
             ep -> CompletableFuture.completedFuture[Void](null)
           }.toMap
       }
    
       val fetchManager = new FetchManager(Time.SYSTEM,
         new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
           KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
    
       // Create the request processor objects.
       val raftSupport = RaftSupport(forwardingManager, metadataCache)
       dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, raftSupport,
         replicaManager, groupCoordinator, transactionCoordinator, autoTopicCreationManager,
         config.nodeId, config, metadataCache, metadataCache, metrics, authorizer, quotaManagers,
         fetchManager, brokerTopicStats, clusterId, time, tokenManager, apiVersionManager)
    
       dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
         socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
         config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent",
         SocketServer.DataPlaneThreadPrefix)
    
       if (socketServer.controlPlaneRequestChannelOpt.isDefined) {
         throw new RuntimeException(KafkaConfig.ControlPlaneListenerNameProp + " is not " +
           "supported when in KRaft mode.")
       }
       // Block until we've caught up with the latest metadata from the controller quorum.
       lifecycleManager.initialCatchUpFuture.get()
    
       // Apply the metadata log changes that we've accumulated.
       metadataPublisher = new BrokermetadataPublisher(config, metadataCache,
         logManager, replicaManager, groupCoordinator, transactionCoordinator,
         clientQuotametadataManager, featureCache, dynamicConfigHandlers.toMap)
    
       // Tell the metadata listener to start publishing its output, and wait for the first
       // publish operation to complete. This first operation will initialize logManager,
       // replicaManager, groupCoordinator, and txnCoordinator. The log manager may perform
       // a potentially lengthy recovery-from-unclean-shutdown operation here, if required.
       metadataListener.startPublishing(metadataPublisher).get()
    
       // Log static broker configurations.
       new KafkaConfig(config.originals(), true)
    
       // Enable inbound TCP connections.
       socketServer.startProcessingRequests(authorizerFutures)
    
       // We're now ready to unfence the broker. This also allows this broker to transition
       // from RECOVERY state to RUNNING state, once the controller unfences the broker.
       lifecycleManager.setReadyToUnfence()
    
       maybeChangeStatus(STARTING, STARTED)
     } catch {
       case e: Throwable =>
         maybeChangeStatus(STARTING, STARTED)
         fatal("Fatal error during broker startup. Prepare to shutdown", e)
         shutdown()
         throw e
     }
    }
    
  7. SocketServer 对象创建时会触发内部的请求队列 RequestChannel 对象的创建,其内部关键成员对象如下:

    1. maxQueuedRequests:请求队列的大小,由 queued.max.requests 配置决定
    2. dataPlaneProcessors: 缓存网络连接上数据平面数据处理器的 Map
    3. dataPlaneAcceptors: 缓存网络连接的数据平面接收器的 Map
    4. dataPlaneRequestChannel: 数据平面 RequestChannel请求队列对象,缓存收到的请求
    5. controlPlaneRequestChannelOpt: 控制平面的 RequestChannel请求队列对象,默认大小为 20,由配置 control.plane.listener.name 决定是否创建,在 Kafka 3.0 版本的 KRaft 模式下如果该配置存在将报异常
    class SocketServer(val config: KafkaConfig,
                    val metrics: Metrics,
                    val time: Time,
                    val credentialProvider: CredentialProvider,
                    val apiVersionManager: ApiVersionManager)
    extends Logging with KafkaMetricsGroup with BrokerReconfigurable {
    
    private val maxQueuedRequests = config.queuedMaxRequests
    
    private val nodeId = config.brokerId
    
    private val logContext = new LogContext(s"[SocketServer listenerType=${apiVersionManager.listenerType}, nodeId=$nodeId] ")
    
    this.logIdent = logContext.logPrefix
    
    private val memoryPoolSensor = metrics.sensor("MemoryPoolUtilization")
    private val memoryPoolDepletedPercentMetricName = metrics.metricName("MemoryPoolAvgDepletedPercent", MetricsGroup)
    private val memoryPoolDepletedTimeMetricName = metrics.metricName("MemoryPoolDepletedTimeTotal", MetricsGroup)
    memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
    private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE
    // data-plane
    private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
    private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
    val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time, apiVersionManager.newRequestMetrics)
    // control-plane
    private var controlPlaneProcessorOpt : Option[Processor] = None
    private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
    val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ =>
      new RequestChannel(20, ControlPlaneMetricPrefix, time, apiVersionManager.newRequestMetrics))
    
    private var nextProcessorId = 0
    val connectionQuotas = new ConnectionQuotas(config, time, metrics)
    private var startedProcessingRequests = false
    private var stoppedProcessingRequests = false
    
    ......
    
    }
    
  8. 本节步骤6中可以看到 SocketServer 对象被创建后立即就被调用了启动方法 SocketServer.scala#startup(),该方法的核心如下:

    1. 调用 SocketServer.scala#createControlPlaneAcceptorAndProcessor() 方法创建控制平面的连接接收器及连接处理器,此处是兼容旧版本的处理,在 Kafka 3.0 版本的 KRaft 模式下不支持控制平面的 control.plane.listener.name配置,故此方法调用可忽略
    2. 调用 SocketServer.scala#createDataPlaneAcceptorsAndProcessors() 方法创建数据平面的连接接收器及连接处理器,此处该方法参数来源于默认参数,即 KafkaConfig.scala#dataPlaneListeners() 方法的返回值
    3. 根据 startProcessingRequests 参数决定是否启动底层网络监听,此处是不启动的
    def startup(startProcessingRequests: Boolean = true,
               controlPlaneListener: Option[EndPoint] = config.controlPlaneListener,
               dataPlaneListeners: Seq[EndPoint] = config.dataPlaneListeners): Unit = {
     this.synchronized {
       createControlPlaneAcceptorAndProcessor(controlPlaneListener)
       createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, dataPlaneListeners)
       if (startProcessingRequests) {
         this.startProcessingRequests()
       }
     }
    
     newGauge(s"${DataPlaneMetricPrefix}NetworkProcessorAvgIdlePercent", () => SocketServer.this.synchronized {
       val ioWaitRatioMetricNames = dataPlaneProcessors.values.asScala.iterator.map { p =>
         metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags)
       }
       ioWaitRatioMetricNames.map { metricName =>
         Option(metrics.metric(metricName)).fold(0.0)(m => Math.min(m.metricValue.asInstanceOf[Double], 1.0))
       }.sum / dataPlaneProcessors.size
     })
     newGauge(s"${ControlPlaneMetricPrefix}NetworkProcessorAvgIdlePercent", () => SocketServer.this.synchronized {
       val ioWaitRatioMetricName = controlPlaneProcessorOpt.map { p =>
         metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags)
       }
       ioWaitRatioMetricName.map { metricName =>
         Option(metrics.metric(metricName)).fold(0.0)(m => Math.min(m.metricValue.asInstanceOf[Double], 1.0))
       }.getOrElse(Double.NaN)
     })
     newGauge("MemoryPoolAvailable", () => memoryPool.availableMemory)
     newGauge("MemoryPoolUsed", () => memoryPool.size() - memoryPool.availableMemory)
     newGauge(s"${DataPlaneMetricPrefix}ExpiredConnectionsKilledCount", () => SocketServer.this.synchronized {
       val expiredConnectionsKilledCountMetricNames = dataPlaneProcessors.values.asScala.iterator.map { p =>
         metrics.metricName("expired-connections-killed-count", MetricsGroup, p.metricTags)
       }
       expiredConnectionsKilledCountMetricNames.map { metricName =>
         Option(metrics.metric(metricName)).fold(0.0)(m => m.metricValue.asInstanceOf[Double])
       }.sum
     })
     newGauge(s"${ControlPlaneMetricPrefix}ExpiredConnectionsKilledCount", () => SocketServer.this.synchronized {
       val expiredConnectionsKilledCountMetricNames = controlPlaneProcessorOpt.map { p =>
         metrics.metricName("expired-connections-killed-count", MetricsGroup, p.metricTags)
       }
       expiredConnectionsKilledCountMetricNames.map { metricName =>
         Option(metrics.metric(metricName)).fold(0.0)(m => m.metricValue.asInstanceOf[Double])
       }.getOrElse(0.0)
     })
    }
    
  9. SocketServer.scala#createDataPlaneAcceptorsAndProcessors() 方法根据配置文件中 listeners 配置的监听器列表,遍历监听器创建对应的 Acceptor 和 Processor,其关键如下:

    1. 首先调用 SocketServer.scala#createAcceptor() 创建连接接收器 Acceptor
    2. 再调用 SocketServer.scala#addDataPlaneProcessors() 为连接接收器创建属于它的 Processor,Processor 个数由 num.network.threads 配置决定
    private def createDataPlaneAcceptorsAndProcessors(dataProcessorsPerListener: Int,
                                                     endpoints: Seq[EndPoint]): Unit = {
     endpoints.foreach { endpoint =>
       connectionQuotas.addListener(config, endpoint.listenerName)
       val dataPlaneAcceptor = createAcceptor(endpoint, DataPlaneMetricPrefix)
       addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener)
       dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)
       info(s"Created data-plane acceptor and processors for endpoint : ${endpoint.listenerName}")
     }
    }
    
  10. SocketServer.scala#createAcceptor() 方法如下,可以看到主要操作是新建一个 SocketServer.scala#Acceptor 对象

    private def createAcceptor(endPoint: EndPoint, metricPrefix: String) : Acceptor = {
    val sendBufferSize = config.socketSendBufferBytes
    val recvBufferSize = config.socketReceiveBufferBytes
    new Acceptor(endPoint, sendBufferSize, recvBufferSize, nodeId, connectionQuotas, metricPrefix, time)
    }
    
  11. SocketServer.scala#Acceptor 为内部类,其比较关键的成员对象如下:

    1. nioSelector: Java 中的 Selector 对象,负责监听网络连接
    2. serverChannel: Java 中的服务端 ServerSocketChannel 对象,该对象由 SocketServer.scala#Acceptor#openServerSocket() 方法创建,创建时会绑定监听端口
    private[kafka] class Acceptor(val endPoint: EndPoint,
                              val sendBufferSize: Int,
                              val recvBufferSize: Int,
                              nodeId: Int,
                              connectionQuotas: ConnectionQuotas,
                              metricPrefix: String,
                              time: Time,
                              logPrefix: String = "") extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
    
    this.logIdent = logPrefix
    private val nioSelector = NSelector.open()
    val serverChannel = openServerSocket(endPoint.host, endPoint.port)
    private val processors = new ArrayBuffer[Processor]()
    private val processorsStarted = new AtomicBoolean
    private val blockedPercentMeter = newMeter(s"${metricPrefix}AcceptorBlockedPercent",
    "blocked time", TimeUnit.NANOSECONDS, Map(ListenerMetricTag -> endPoint.listenerName.value))
    private var currentProcessorIndex = 0
    private[network] val throttledSockets = new mutable.PriorityQueue[DelayedCloseSocket]()
    
    ......
    
    private def openServerSocket(host: String, port: Int): ServerSocketChannel = {
    val socketAddress =
      if (Utils.isBlank(host))
        new InetSocketAddress(port)
      else
        new InetSocketAddress(host, port)
    val serverChannel = ServerSocketChannel.open()
    serverChannel.configureBlocking(false)
    if (recvBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
      serverChannel.socket().setReceiveBufferSize(recvBufferSize)
    
    try {
      serverChannel.socket.bind(socketAddress)
      info(s"Awaiting socket connections on ${socketAddress.getHostString}:${serverChannel.socket.getLocalPort}.")
    } catch {
      case e: SocketException =>
        throw new KafkaException(s"Socket server failed to bind to ${socketAddress.getHostString}:$port: ${e.getMessage}.", e)
    }
    serverChannel
    }
    
     ······
    
    } 
    
  12. 回到本节步骤9,SocketServer.scala#addDataPlaneProcessors() 方法实现如下,可以看到核心 for 循环创建 Processor,重要处理如下:

    1. 调用 SocketServer.scala#newProcessor() 方法创建 Processor
    2. 调用RequestChannel.scala#addProcessor() 方法将新创建的 Processor 对象保存在内部列表,后续将用于请求处理完成后响应的分配处理
    3. for 循环结束,调用Acceptor#addProcessors() 将创建的所有 Processor 缓存到内部,后续将用于新建连接的分配
    private def addDataPlaneProcessors(acceptor: Acceptor, endpoint: EndPoint, newProcessorsPerListener: Int): Unit = {
    val listenerName = endpoint.listenerName
    val securityProtocol = endpoint.securityProtocol
    val listenerProcessors = new ArrayBuffer[Processor]()
    val isPrivilegedListener = controlPlaneRequestChannelOpt.isEmpty && config.interBrokerListenerName == listenerName
    
    for (_ <- 0 until newProcessorsPerListener) {
      val processor = newProcessor(nextProcessorId, dataPlaneRequestChannel, connectionQuotas,
        listenerName, securityProtocol, memoryPool, isPrivilegedListener)
      listenerProcessors += processor
      dataPlaneRequestChannel.addProcessor(processor)
      nextProcessorId += 1
    }
    listenerProcessors.foreach(p => dataPlaneProcessors.put(p.id, p))
    acceptor.addProcessors(listenerProcessors, DataPlaneThreadPrefix)
    }
    
  13. SocketServer.scala#Processor 定义如下,可以看到内部比较关键的属性

    1. newConnections: 新连接列表,负责缓存由 Acceptor 接收后分配至 Processor 处理的连接将
    2. responseQueue: 响应列表,负责缓存请求处理完成后的响应
    3. selector: Kafak 的 KSelector 对象,其内部封装着 Java 的 Selector,负责监听分配给 Processor 处理的连接
    private[kafka] class Processor(val id: Int,
                               time: Time,
                               maxRequestSize: Int,
                               requestChannel: RequestChannel,
                               connectionQuotas: ConnectionQuotas,
                               connectionsMaxIdleMs: Long,
                               failedAuthenticationDelayMs: Int,
                               listenerName: ListenerName,
                               securityProtocol: SecurityProtocol,
                               config: KafkaConfig,
                               metrics: Metrics,
                               credentialProvider: CredentialProvider,
                               memoryPool: MemoryPool,
                               logContext: LogContext,
                               connectionQueueSize: Int,
                               isPrivilegedListener: Boolean,
                               apiVersionManager: ApiVersionManager) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
    
    ......
    
    private val newConnections = new ArrayBlockingQueue[SocketChannel](connectionQueueSize)
    private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
    private val responseQueue = new linkedBlockingDeque[RequestChannel.Response]()
    
    private[kafka] val metricTags = mutable.linkedHashMap(
    ListenerMetricTag -> listenerName.value,
    NetworkProcessorMetricTag -> id.toString
    ).asJava
    
    newGauge(IdlePercentMetricName, () => {
    Option(metrics.metric(metrics.metricName("io-wait-ratio", MetricsGroup, metricTags))).fold(0.0)(m =>
      Math.min(m.metricValue.asInstanceOf[Double], 1.0))
    },
    // for compatibility, only add a networkProcessor tag to the Yammer Metrics alias (the equivalent Selector metric
    // also includes the listener name)
    Map(NetworkProcessorMetricTag -> id.toString)
    )
    
    val expiredConnectionsKilledCount = new CumulativeSum()
    private val expiredConnectionsKilledCountMetricName =   metrics.metricName("expired-connections-killed-count", MetricsGroup, metricTags)
    metrics.addMetric(expiredConnectionsKilledCountMetricName, expiredConnectionsKilledCount)
    
    private val selector = createSelector(
    ChannelBuilders.serverChannelBuilder(
      listenerName,
      listenerName == config.interBrokerListenerName,
      securityProtocol,
      config,
      credentialProvider.credentialCache,
      credentialProvider.tokenCache,
      time,
      logContext,
      () => apiVersionManager.apiVersionResponse(throttleTimeMs = 0)
    )
    )
    
    // Visible to override for testing
    protected[network] def createSelector(channelBuilder: ChannelBuilder): KSelector = {
    channelBuilder match {
      case reconfigurable: Reconfigurable => config.addReconfigurable(reconfigurable)
      case _ =>
    }
    new KSelector(
      maxRequestSize,
      connectionsMaxIdleMs,
      failedAuthenticationDelayMs,
      metrics,
      time,
      "socket-server",
      metricTags,
      false,
      true,
      channelBuilder,
      memoryPool,
      logContext)
    }
    
    ......
    
    }
    
  14. 回到本节步骤6 BrokerServer.scala#startup() 方法内成员变量 dataPlaneRequestHandlerPool 的赋值,可以看到实例为 KafkaRequestHandler.scala#KafkaRequestHandlerPool 类对象,并且持有了 KafkaApis 对象作为上层的请求处理器,其关键属性如下:

    1. threadPoolSize: 处理器线程池大小,由配置num.io.threads 决定
    2. runnables: 处理器KafkaRequestHandler的数组,各个处理器由 KafkaRequestHandler.scala#KafkaRequestHandlerPool#createHandler() 方法创建,可以看到实际处理器对象为封装了 KafkaApis 对象的KafkaRequestHandler 对象,该对象被创建后就扔进了新建线程中执行
    class KafkaRequestHandlerPool(val brokerId: Int,
                              val requestChannel: RequestChannel,
                              val apis: ApiRequestHandler,
                              time: Time,
                              numThreads: Int,
                              requestHandlerAvgIdleMetricName: String,
                              logAndThreadNamePrefix : String) extends Logging with KafkaMetricsGroup {
    
    private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
    
    private val aggregateIdleMeter = newMeter(requestHandlerAvgIdleMetricName, "percent", TimeUnit.NANOSECONDS)
    
    this.logIdent = "[" + logAndThreadNamePrefix + " Kafka Request Handler on Broker " + brokerId + "], "
    val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
    for (i <- 0 until numThreads) {
    createHandler(i)
    }
    
    def createHandler(id: Int): Unit = synchronized {
    runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
    KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start()
    }
    
    ......
    
    }
    
  15. KafkaRequestHandler 的类定义如下,可以看到其作为线程任务启动后会在 KafkaRequestHandler.scala#run() 方法中死循环不断处理底层接收到的请求,关键步骤如下:

    1. requestChannel.receiveRequest()不断轮询请求队列,获取请求,如没有请求则线程阻塞
    2. apis.handle 调用接口方法 ApiRequestHandler#handle() ,将请求投递到处理器中进行处理
    class KafkaRequestHandler(id: Int,
                          brokerId: Int,
                          val aggregateIdleMeter: Meter,
                          val totalHandlerThreads: AtomicInteger,
                          val requestChannel: RequestChannel,
                          apis: ApiRequestHandler,
                          time: Time) extends Runnable with Logging {
    this.logIdent = s"[Kafka Request Handler $id on Broker $brokerId], "
    private val shutdownComplete = new CountDownLatch(1)
    private val requestLocal = RequestLocal.withThreadConfinedCaching
    @volatile private var stopped = false
    
    def run(): Unit = {
    while (!stopped) {
      // We use a single meter for aggregate idle percentage for the thread pool.
      // Since meter is calculated as total_recorded_value / time_window and
      // time_window is independent of the number of threads, each recorded idle
      // time should be discounted by # threads.
      val startSelectTime = time.nanoseconds
    
      val req = requestChannel.receiveRequest(300)
      val endTime = time.nanoseconds
      val idleTime = endTime - startSelectTime
      aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get)
    
      req match {
        case RequestChannel.ShutdownRequest =>
          debug(s"Kafka request handler $id on broker $brokerId received shut down command")
          completeShutdown()
          return
    
        case request: RequestChannel.Request =>
          try {
            request.requestDequeueTimeNanos = endTime
            trace(s"Kafka request handler $id on broker $brokerId handling request $request")
            apis.handle(request, requestLocal)
          } catch {
            case e: FatalExitError =>
              completeShutdown()
              Exit.exit(e.statusCode)
            case e: Throwable => error("Exception when handling request", e)
          } finally {
            request.releaseBuffer()
          }
    
        case null => // continue
      }
    }
    completeShutdown()
    }
    
    ......
    
    }
    
  16. 上层的请求处理器已经启动,此时回到步骤6启动底层网络服务器的方法SocketServer.scala#startProcessingRequests(),从源码中可以看到,此处核心为调用 SocketServer.scala#startDataPlaneProcessorsAndAcceptors() 方法,最终启动 Acceptor 和 Prrocessor 的逻辑在 SocketServer.scala#startAcceptorAndProcessors() 方法中

      def startProcessingRequests(authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = Map.empty): Unit = {
    info("Starting socket server acceptors and processors")
    this.synchronized {
      if (!startedProcessingRequests) {
        startControlPlaneProcessorAndAcceptor(authorizerFutures)
        startDataPlaneProcessorsAndAcceptors(authorizerFutures)
        startedProcessingRequests = true
      } else {
        info("Socket server acceptors and processors already started")
      }
    }
    info("Started socket server acceptors and processors")
    }
    
    private def startDataPlaneProcessorsAndAcceptors(authorizerFutures: Map[Endpoint, CompletableFuture[Void]]): Unit = {
    val interBrokerListener = dataPlaneAcceptors.asScala.keySet
      .find(_.listenerName == config.interBrokerListenerName)
    val orderedAcceptors = interBrokerListener match {
      case Some(interBrokerListener) => List(dataPlaneAcceptors.get(interBrokerListener)) ++
        dataPlaneAcceptors.asScala.filter { case (k, _) => k != interBrokerListener }.values
      case None => dataPlaneAcceptors.asScala.values
    }
    orderedAcceptors.foreach { acceptor =>
      val endpoint = acceptor.endPoint
      startAcceptorAndProcessors(DataPlaneThreadPrefix, endpoint, acceptor, authorizerFutures)
    }
    }
    
  17. SocketServer.scala#startAcceptorAndProcessors() 方法的实现如下,简单来说分为两步,至此服务端的启动告一段落

    1. 首先调用SocketServer.scala#Acceptor#startProcessors() 将 Acceptor 内部的 Processor 都扔进线程中启动
    2. 其次新起线程,将当前 Acceptor 扔进线程中启动
    private def startAcceptorAndProcessors(threadPrefix: String,
                                         endpoint: EndPoint,
                                         acceptor: Acceptor,
                                         authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = Map.empty): Unit = {
    debug(s"Wait for authorizer to complete start up on listener ${endpoint.listenerName}")
    waitForAuthorizerFuture(acceptor, authorizerFutures)
    debug(s"Start processors on listener ${endpoint.listenerName}")
    acceptor.startProcessors(threadPrefix)
    debug(s"Start acceptor thread on listener ${endpoint.listenerName}")
    if (!acceptor.isStarted()) {
      KafkaThread.nonDaemon(
        s"${threadPrefix}-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}",
        acceptor
      ).start()
      acceptor.awaitStartup()
    }
    info(s"Started $threadPrefix acceptor and processor(s) for endpoint : ${endpoint.listenerName}")
    }
    
2. Kafka 服务端新建连接的处理
  1. Acceptor 连接接收器启动后,会触发 SocketServer.scala#Acceptor#run() 方法执行,可以看到起关键操作如下:

    1. 首先通过 serverChannel.register() 将服务端 ServerSocketChannel注册到 Selector 上,并设置监听的事件为 SelectionKey.OP_ACCEPT
    2. 在死循环中不断调用 SocketServer.scala#Acceptor#acceptNewConnections() 方法接收远端连接
     def run(): Unit = {
     serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
     startupComplete()
     try {
       while (isRunning) {
         try {
           acceptNewConnections()
           closeThrottledConnections()
         }
         catch {
           // We catch all the throwables to prevent the acceptor thread from exiting on exceptions due
           // to a select operation on a specific channel or a bad request. We don't want
           // the broker to stop responding to requests from other clients in these scenarios.
           case e: ControlThrowable => throw e
           case e: Throwable => error("Error occurred", e)
         }
       }
     } finally {
       debug("Closing server socket, selector, and any throttled sockets.")
       CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)
       CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)
       throttledSockets.foreach(throttledSocket => closeSocket(throttledSocket.socket))
       throttledSockets.clear()
       shutdownComplete()
     }
    }
    
  2. SocketServer.scala#Acceptor#acceptNewConnections() 方法其实就是标准的 NIO 处理,可以看到核心如下:

    1. 首先调用 nioSelector.select() 轮询底层连接,如有连接就绪则调用 nioSelector.selectedKeys() 获取事件处理
    2. 如果是接收连接事件,则调用SocketServer.scala#Acceptor#accept()进行连接接收,随后按照连接计数器取模选定一个 Processor,最后调用SocketServer.scala#Acceptor#assignNewConnection() 方法将连接分配给选定的 Processor
      private def acceptNewConnections(): Unit = {
     val ready = nioSelector.select(500)
     if (ready > 0) {
       val keys = nioSelector.selectedKeys()
       val iter = keys.iterator()
       while (iter.hasNext && isRunning) {
         try {
           val key = iter.next
           iter.remove()
    
           if (key.isAcceptable) {
             accept(key).foreach { socketChannel =>
               // Assign the channel to the next processor (using round-robin) to which the
               // channel can be added without blocking. If newConnections queue is full on
               // all processors, block until the last one is able to accept a connection.
               var retriesLeft = synchronized(processors.length)
               var processor: Processor = null
               do {
                 retriesLeft -= 1
                 processor = synchronized {
                   // adjust the index (if necessary) and retrieve the processor atomically for
                   // correct behaviour in case the number of processors is reduced dynamically
                   currentProcessorIndex = currentProcessorIndex % processors.length
                   processors(currentProcessorIndex)
                 }
                 currentProcessorIndex += 1
               } while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
             }
           } else
             throw new IllegalStateException("Unrecognized key state for acceptor thread.")
         } catch {
           case e: Throwable => error("Error while accepting connection", e)
         }
       }
     }
    }
    
  3. SocketServer.scala#Acceptor#accept()内部调用 serverSocketChannel.accept() 进行标准的连接接收处理,可以看到此处配置了新连接的一些关键属性

      private def accept(key: SelectionKey): Option[SocketChannel] = {
     val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
     val socketChannel = serverSocketChannel.accept()
     try {
       connectionQuotas.inc(endPoint.listenerName, socketChannel.socket.getInetAddress, blockedPercentMeter)
       socketChannel.configureBlocking(false)
       socketChannel.socket().setTcpNoDelay(true)
       socketChannel.socket().setKeepAlive(true)
       if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
         socketChannel.socket().setSendBufferSize(sendBufferSize)
       Some(socketChannel)
     } catch {
       case e: TooManyConnectionsException =>
         info(s"Rejected connection from ${e.ip}, address already has the configured maximum of ${e.count} connections.")
         close(endPoint.listenerName, socketChannel)
         None
       case e: ConnectionThrottledException =>
         val ip = socketChannel.socket.getInetAddress
         debug(s"Delaying closing of connection from $ip for ${e.throttleTimeMs} ms")
         val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
         throttledSockets += DelayedCloseSocket(socketChannel, endThrottleTimeMs)
         None
     }
    }
    
  4. SocketServer.scala#Acceptor#assignNewConnection() 方法的核心是调用 SocketServer.scala#Processor#accept() 方法将新建立的链接丢到 Processor 的新连接队列

      private def assignNewConnection(socketChannel: SocketChannel, processor: Processor, mayBlock: Boolean): Boolean = {
     if (processor.accept(socketChannel, mayBlock, blockedPercentMeter)) {
       debug(s"Accepted connection from ${socketChannel.socket.getRemoteSocketAddress} on" +
         s" ${socketChannel.socket.getLocalSocketAddress} and assigned it to processor ${processor.id}," +
         s" sendBufferSize [actual|requested]: [${socketChannel.socket.getSendBufferSize}|$sendBufferSize]" +
         s" recvBufferSize [actual|requested]: [${socketChannel.socket.getReceiveBufferSize}|$recvBufferSize]")
       true
     } else
       false
    }
    
  5. SocketServer.scala#Processor#accept()方法主要处理是新连接入队,其次也会调用 SocketServer.scala#Processor#wakeup() 唤醒 Processor 的 Selector ,至此新连接的处理分配告一段落

      def accept(socketChannel: SocketChannel,
              mayBlock: Boolean,
              acceptorIdlePercentMeter: com.yammer.metrics.core.Meter): Boolean = {
     val accepted = {
       if (newConnections.offer(socketChannel))
         true
       else if (mayBlock) {
         val startNs = time.nanoseconds
         newConnections.put(socketChannel)
         acceptorIdlePercentMeter.mark(time.nanoseconds() - startNs)
         true
       } else
         false
     }
     if (accepted)
       wakeup()
     accepted
    }
    
3. Kafka 服务端请求处理流程
  1. 新连接入队后,需要将其注册到 Processor的 Selector 上才能实现读写事件监听,这些主要在SocketServer.scala#Processor#run()方法中处理,可以看到这个方法关键逻辑如下:

    1. 首先调用 SocketServer.scala#Processor#configureNewConnections()方法将新连接列表中的连接注册到 Selector 上
    2. 接下来调用 SocketServer.scala#Processor#processNewResponses()方法将响应列表中的响应存入连接缓存,注册监听可写事件SelectionKey.OP_WRITE
    3. 调用 SocketServer.scala#Processor#poll()方法处理连接上的可读事件,将网络数据暂存到接收缓冲区
    4. 调用 SocketServer.scala#Processor#processCompletedReceives()方法将接收缓冲区的网络数据解析为 Kafka 请求,并将其扔进请求队列,等待请求处理器轮询处理
      override def run(): Unit = {
     startupComplete()
     try {
       while (isRunning) {
         try {
           // setup any new connections that have been queued up
           configureNewConnections()
           // register any new responses for writing
           processNewResponses()
           poll()
           processCompletedReceives()
           processCompletedSends()
           processDisconnected()
           closeExcessConnections()
         } catch {
           // We catch all the throwables here to prevent the processor thread from exiting. We do this because
           // letting a processor exit might cause a bigger impact on the broker. This behavior might need to be
           // reviewed if we see an exception that needs the entire broker to stop. Usually the exceptions thrown would
           // be either associated with a specific socket channel or a bad request. These exceptions are caught and
           // processed by the individual methods above which close the failing channel and continue processing other
           // channels. So this catch block should only ever see ControlThrowables.
           case e: Throwable => processException("Processor got uncaught exception.", e)
         }
       }
     } finally {
       debug(s"Closing selector - processor $id")
       CoreUtils.swallow(closeAll(), this, Level.ERROR)
       shutdownComplete()
     }
    }
    
  2. SocketServer.scala#Processor#configureNewConnections()方法的关键处理其实就是从新连接列表 newConnections 中拿到连接对象,随后调用 selector.register() 将其注册到 Selector上并设置监听事件为 SelectionKey.OP_READ

    private def configureNewConnections(): Unit = {
     var connectionsProcessed = 0
     while (connectionsProcessed < connectionQueueSize && !newConnections.isEmpty) {
       val channel = newConnections.poll()
       try {
         debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}")
         selector.register(connectionId(channel.socket), channel)
         connectionsProcessed += 1
       } catch {
         // We explicitly catch all exceptions and close the socket to avoid a socket leak.
         case e: Throwable =>
           val remoteAddress = channel.socket.getRemoteSocketAddress
           // need to close the channel here to avoid a socket leak.
           close(listenerName, channel)
           processException(s"Processor $id closed connection from $remoteAddress", e)
       }
     }
    }
    
  3. SocketServer.scala#Processor#poll()方法如下,可以看到核心是调用 Selector.java#poll() 方法去处理底层连接的读写事件

      private def poll(): Unit = {
     val pollTimeout = if (newConnections.isEmpty) 300 else 0
     try selector.poll(pollTimeout)
     catch {
       case e @ (_: IllegalStateException | _: IOException) =>
         // The exception is not re-thrown and any completed sends/receives/connections/disconnections
         // from this poll will be processed.
         error(s"Processor $id poll failed", e)
     }
    }
    
  4. Selector.java#poll() 方法内部也是标准的 NIO 操作处理,轮询出来的读写就绪连接都交给了 Selector.java#pollSelectionKeys()方法处理

       public void poll(long timeout) throws IOException {
        if (timeout < 0)
            throw new IllegalArgumentException("timeout should be >= 0");
    
        boolean madeReadProgressLastCall = madeReadProgressLastPoll;
        clear();
    
        boolean dataInBuffers = !keysWithBufferedRead.isEmpty();
    
        if (!immediatelyConnectedKeys.isEmpty() || (madeReadProgressLastCall && dataInBuffers))
            timeout = 0;
    
        if (!memoryPool.isOutOfMemory() && outOfMemory) {
            //we have recovered from memory pressure. unmute any channel not explicitly muted for other reasons
            log.trace("Broker no longer low on memory - unmuting incoming sockets");
            for (KafkaChannel channel : channels.values()) {
                if (channel.isInMutableState() && !explicitlyMutedChannels.contains(channel)) {
                    channel.maybeUnmute();
                }
            }
            outOfMemory = false;
        }
    
        
        long startSelect = time.nanoseconds();
        int numReadyKeys = select(timeout);
        long endSelect = time.nanoseconds();
        this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
    
        if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
            Set readyKeys = this.nioSelector.selectedKeys();
    
            // Poll from channels that have buffered data (but nothing more from the underlying socket)
            if (dataInBuffers) {
                keysWithBufferedRead.removeAll(readyKeys); //so no channel gets polled twice
                Set toPoll = keysWithBufferedRead;
                keysWithBufferedRead = new HashSet<>(); //poll() calls will repopulate if needed
                pollSelectionKeys(toPoll, false, endSelect);
            }
    
            // Poll from channels where the underlying socket has more data
            pollSelectionKeys(readyKeys, false, endSelect);
            // Clear all selected keys so that they are included in the ready count for the next select
            readyKeys.clear();
    
            pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
            immediatelyConnectedKeys.clear();
        } else {
            madeReadProgressLastPoll = true; //no work is also "progress"
        }
    
        long endIo = time.nanoseconds();
        this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
    
        // Close channels that were delayed and are now ready to be closed
        completeDelayedChannelClose(endIo);
    
        // we use the time at the end of select to ensure that we don't close any connections that
        // have just been processed in pollSelectionKeys
        maybeCloseOldestConnection(endSelect);
    }
    
  5. Selector.java#pollSelectionKeys()方法的关键处理有两处:

    1. 调用 Selector.java#attemptRead()方法接收可读连接上的数据,并将其暂存在接收缓冲区
    2. 调用 Selector.java#attemptWrite()方法尝试将该连接发送缓冲区的数据写入Socket
        void pollSelectionKeys(Set selectionKeys,
                            boolean isImmediatelyConnected,
                            long currentTimeNanos) {
         for (SelectionKey key : determineHandlingOrder(selectionKeys)) {
             KafkaChannel channel = channel(key);
             long channelStartTimeNanos = recordTimePerConnection ? time.nanoseconds() : 0;
             boolean sendFailed = false;
             String nodeId = channel.id();
    
             // register all per-connection metrics at once
             sensors.maybeRegisterConnectionMetrics(nodeId);
             if (idleExpiryManager != null)
                 idleExpiryManager.update(nodeId, currentTimeNanos);
    
             try {
                 
                 if (isImmediatelyConnected || key.isConnectable()) {
                     if (channel.finishConnect()) {
                         this.connected.add(nodeId);
                         this.sensors.connectionCreated.record();
    
                         SocketChannel socketChannel = (SocketChannel) key.channel();
                         log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}",
                                 socketChannel.socket().getReceiveBufferSize(),
                                 socketChannel.socket().getSendBufferSize(),
                                 socketChannel.socket().getSoTimeout(),
                                 nodeId);
                     } else {
                         continue;
                     }
                 }
    
                 
                 if (channel.isConnected() && !channel.ready()) {
                     channel.prepare();
                     if (channel.ready()) {
                         long readyTimeMs = time.milliseconds();
                         boolean isReauthentication = channel.successfulAuthentications() > 1;
                         if (isReauthentication) {
                             sensors.successfulReauthentication.record(1.0, readyTimeMs);
                             if (channel.reauthenticationLatencyMs() == null)
                                 log.warn(
                                     "Should never happen: re-authentication latency for a re-authenticated channel was null; continuing...");
                             else
                                 sensors.reauthenticationLatency
                                     .record(channel.reauthenticationLatencyMs().doublevalue(), readyTimeMs);
                         } else {
                             sensors.successfulAuthentication.record(1.0, readyTimeMs);
                             if (!channel.connectedClientSupportsReauthentication())
                                 sensors.successfulAuthenticationNoReauth.record(1.0, readyTimeMs);
                         }
                         log.debug("Successfully {}authenticated with {}", isReauthentication ?
                             "re-" : "", channel.socketDescription());
                     }
                 }
                 if (channel.ready() && channel.state() == ChannelState.NOT_CONNECTED)
                     channel.state(ChannelState.READY);
                 Optional responseReceivedDuringReauthentication = channel.pollResponseReceivedDuringReauthentication();
                 responseReceivedDuringReauthentication.ifPresent(receive -> {
                     long currentTimeMs = time.milliseconds();
                     addToCompletedReceives(channel, receive, currentTimeMs);
                 });
    
                 //if channel is ready and has bytes to read from socket or buffer, and has no
                 //previous completed receive then read from it
                 if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasCompletedReceive(channel)
                         && !explicitlyMutedChannels.contains(channel)) {
                     attemptRead(channel);
                 }
    
                 if (channel.hasBytesBuffered() && !explicitlyMutedChannels.contains(channel)) {
                     //this channel has bytes enqueued in intermediary buffers that we could not read
                     //(possibly because no memory). it may be the case that the underlying socket will
                     //not come up in the next poll() and so we need to remember this channel for the
                     //next poll call otherwise data may be stuck in said buffers forever. If we attempt
                     //to process buffered data and no progress is made, the channel buffered status is
                     //cleared to avoid the overhead of checking every time.
                     keysWithBufferedRead.add(key);
                 }
    
                 
    
                 long nowNanos = channelStartTimeNanos != 0 ? channelStartTimeNanos : currentTimeNanos;
                 try {
                     attemptWrite(key, channel, nowNanos);
                 } catch (Exception e) {
                     sendFailed = true;
                     throw e;
                 }
    
                 
                 if (!key.isValid())
                     close(channel, CloseMode.GRACEFUL);
    
             } catch (Exception e) {
                 String desc = channel.socketDescription();
                 if (e instanceof IOException) {
                     log.debug("Connection with {} disconnected", desc, e);
                 } else if (e instanceof AuthenticationException) {
                     boolean isReauthentication = channel.successfulAuthentications() > 0;
                     if (isReauthentication)
                         sensors.failedReauthentication.record();
                     else
                         sensors.failedAuthentication.record();
                     String exceptionMessage = e.getMessage();
                     if (e instanceof DelayedResponseAuthenticationException)
                         exceptionMessage = e.getCause().getMessage();
                     log.info("Failed {}authentication with {} ({})", isReauthentication ? "re-" : "",
                         desc, exceptionMessage);
                 } else {
                     log.warn("Unexpected error from {}; closing connection", desc, e);
                 }
    
                 if (e instanceof DelayedResponseAuthenticationException)
                     maybeDelayCloseOnAuthenticationFailure(channel);
                 else
                     close(channel, sendFailed ? CloseMode.NOTIFY_onLY : CloseMode.GRACEFUL);
             } finally {
                 maybeRecordTimePerConnection(channel, channelStartTimeNanos);
             }
         }
     }
    
  6. Selector.java#attemptRead()方法会从连接上读取网络数据,并调用 Selector.java#addToCompletedReceives()方法将接收到的网络数据暂存到缓冲区

        private void attemptRead(KafkaChannel channel) throws IOException {
         String nodeId = channel.id();
    
         long bytesReceived = channel.read();
         if (bytesReceived != 0) {
             long currentTimeMs = time.milliseconds();
             sensors.recordBytesReceived(nodeId, bytesReceived, currentTimeMs);
             madeReadProgressLastPoll = true;
    
             NetworkReceive receive = channel.maybeCompleteReceive();
             if (receive != null) {
                 addToCompletedReceives(channel, receive, currentTimeMs);
             }
         }
         if (channel.isMuted()) {
             outOfMemory = true; //channel has muted itself due to memory pressure.
         } else {
             madeReadProgressLastPoll = true;
         }
     }
    
  7. 回到本节步骤1, SocketServer.scala#Processor#processCompletedReceives()方法会将上一步接收到的网路数据解析为上层处理器能够处理的 Kafka 请求,其关键点如下,至此 Kafka 请求处理的关键流程基本结束

    1. 调用 SocketServer.scala#Processor#parseRequestHeader() 解析出请求的头信息
    2. 调用 RequestChannel.scala#Request 构造方法将网络数据入参创建 Kafka 请求,其内部将根据 Kafka 自定义的协议完成请求的解析工作
    3. requestChannel.sendRequest() 触发 RequestChannel.scala#sendRequest() 方法,将解析完成的请求扔进请求队列,上层处理器 KafkaRequestHandler 将调用 RequestChannel.scala#receiveRequest() 方法轮询请求队列完成请求处理,也就是本文第一节步骤15的流程
    private def processCompletedReceives(): Unit = {
     selector.completedReceives.forEach { receive =>
       try {
         openOrClosingChannel(receive.source) match {
           case Some(channel) =>
             val header = parseRequestHeader(receive.payload)
             if (header.apiKey == ApiKeys.SASL_HANDSHAKE && channel.maybeBeginServerReauthentication(receive,
               () => time.nanoseconds()))
               trace(s"Begin re-authentication: $channel")
             else {
               val nowNanos = time.nanoseconds()
               if (channel.serverAuthenticationSessionExpired(nowNanos)) {
                 // be sure to decrease connection count and drop any in-flight responses
                 debug(s"Disconnecting expired channel: $channel : $header")
                 close(channel.id)
                 expiredConnectionsKilledCount.record(null, 1, 0)
               } else {
                 val connectionId = receive.source
                 val context = new RequestContext(header, connectionId, channel.socketAddress,
                   channel.principal, listenerName, securityProtocol,
                   channel.channelmetadataRegistry.clientInformation, isPrivilegedListener, channel.principalSerde)
    
                 val req = new RequestChannel.Request(processor = id, context = context,
                   startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics, None)
    
                 // KIP-511: ApiVersionsRequest is intercepted here to catch the client software name
                 // and version. It is done here to avoid wiring things up to the api layer.
                 if (header.apiKey == ApiKeys.API_VERSIONS) {
                   val apiVersionsRequest = req.body[ApiVersionsRequest]
                   if (apiVersionsRequest.isValid) {
                     channel.channelmetadataRegistry.registerClientInformation(new ClientInformation(
                       apiVersionsRequest.data.clientSoftwareName,
                       apiVersionsRequest.data.clientSoftwareVersion))
                   }
                 }
                 requestChannel.sendRequest(req)
                 selector.mute(connectionId)
                 handleChannelMuteEvent(connectionId, ChannelMuteEvent.REQUEST_RECEIVED)
               }
             }
           case None =>
             // This should never happen since completed receives are processed immediately after `poll()`
             throw new IllegalStateException(s"Channel ${receive.source} removed from selector before processing completed receive")
         }
       } catch {
         // note that even though we got an exception, we can assume that receive.source is valid.
         // Issues with constructing a valid receive object were handled earlier
         case e: Throwable =>
           processChannelException(receive.source, s"Exception while processing request from ${receive.source}", e)
       }
     }
     selector.clearCompletedReceives()
    }
    
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/673382.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号