1. Kafka 集群选举的流程2. Kafka 集群选举的源码分析
2.1 KafkaRaftManager 的初始化准备2.2 KafkaRaftManager 的启动运行2.3 集群选主的流程2.4 选举僵局的处理-回退机制
1. Kafka 集群选举的流程在 Kafka 3.0 源码笔记(1)-Kafka 服务端的网络通信架构 中笔者提到在 KRaft 模式下 Kafka 集群的元数据已经交由 Controller 集群自治,则在分布式环境下必然要涉及到集群节点的交互,包括集群选主、集群元数据同步等。其中 Kafka 集群选举涉及的状态流转如下图所示,关键的请求交互如下:
2. Kafka 集群选举的源码分析Vote
由 Candidate 候选者节点发送,请求其他节点为自己投票BeginQuorumEpoch
由 Leader 节点发送,告知其他节点当前的 Leader 信息EndQuorumEpoch
当前 Leader 退位时发送,触发重新选举Fetch
由 Follower 发送,用于复制 Leader 日志,另外通过 Fetch 请求 Follower 也可以完成对 Leader 的探活
Controller 集群节点交互的这部分其实依赖协调集群信息的 KafkaRaftManager 组件,本文以集群选主的场景切入分析,将集群的运作机制分为以下几个部分:
2.1 KafkaRaftManager 的初始化准备集群组件 KafkaRaftManager 的初始化准备集群组件 KafkaRaftManager 的启动运行集群选主的主流程处理选举僵局的处理
KafkaRaftManager组件初始化的触发点在 KafkaRaftServer 实例的创建过程中,可以看到其初始化过程中的关键处理如下:
调用 RaftManager.scala#buildNetworkChannel() 方法创建底层网络通信组件 KafkaNetworkChannel调用 RaftManager.scala#buildRaftClient() 方法创建集群客户端 KafkaRaftClient创建 RaftIoThread 线程用于不断进行本地网络通信客户端请求响应处理
class KafkaRaftManager[T](
metaProperties: metaProperties,
config: KafkaConfig,
recordSerde: RecordSerde[T],
topicPartition: TopicPartition,
topicId: Uuid,
time: Time,
metrics: Metrics,
threadNamePrefixOpt: Option[String],
val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]]
) extends RaftManager[T] with Logging {
private val raftConfig = new RaftConfig(config)
private val threadNamePrefix = threadNamePrefixOpt.getOrElse("kafka-raft")
private val logContext = new LogContext(s"[RaftManager nodeId=${config.nodeId}] ")
this.logIdent = logContext.logPrefix()
private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix + "-scheduler")
scheduler.startup()
private val dataDir = createDataDir()
override val replicatedLog: ReplicatedLog = buildmetadataLog()
private val netChannel = buildNetworkChannel()
override val client: KafkaRaftClient[T] = buildRaftClient()
private val raftIoThread = new RaftIoThread(client, threadNamePrefix)
...
}
RaftManager.scala#buildNetworkChannel() 方法的核心为以下两步:
调用 RaftManager.scala#buildNetworkClient() 方法创建网络客户端,可以看到这个方法创建了 NetworkClient 实例用于底层网络连接的监听处理使用上一步创建的 NetworkClient 实例新建 KafkaNetworkChannel 对象,给上层提供集群交互请求的发送入口
private def buildNetworkChannel(): KafkaNetworkChannel = {
val netClient = buildNetworkClient()
new KafkaNetworkChannel(time, netClient, config.quorumRequestTimeoutMs, threadNamePrefix)
}
private def buildNetworkClient(): NetworkClient = {
val controllerListenerName = new ListenerName(config.controllerListenerNames.head)
val controllerSecurityProtocol = config.listenerSecurityProtocolMap.getOrElse(controllerListenerName, SecurityProtocol.forName(controllerListenerName.value()))
val channelBuilder = ChannelBuilders.clientChannelBuilder(
controllerSecurityProtocol,
JaasContext.Type.SERVER,
config,
controllerListenerName,
config.saslMechanismControllerProtocol,
time,
config.saslInterBrokerHandshakeRequestEnable,
logContext
)
val metricGroupPrefix = "raft-channel"
val collectPerConnectionMetrics = false
val selector = new Selector(
NetworkReceive.UNLIMITED,
config.connectionsMaxIdleMs,
metrics,
time,
metricGroupPrefix,
Map.empty[String, String].asJava,
collectPerConnectionMetrics,
channelBuilder,
logContext
)
val clientId = s"raft-client-${config.nodeId}"
val maxInflightRequestsPerConnection = 1
val reconnectBackoffMs = 50
val reconnectBackoffMsMs = 500
val discoverBrokerVersions = true
new NetworkClient(
selector,
new ManualmetadataUpdater(),
clientId,
maxInflightRequestsPerConnection,
reconnectBackoffMs,
reconnectBackoffMsMs,
Selectable.USE_DEFAULT_BUFFER_SIZE,
config.socketReceiveBufferBytes,
config.quorumRequestTimeoutMs,
config.connectionSetupTimeoutMs,
config.connectionSetupTimeoutMaxMs,
time,
discoverBrokerVersions,
new ApiVersions,
logContext
)
}
KafkaNetworkChannel 实例创建过程中关键组件也会被创建,其中就包括 RaftSendThread 请求发送线程。RaftSendThread 继承自 InterBrokerSendThread,其用途将在后文分析
class KafkaNetworkChannel(
time: Time,
client: KafkaClient,
requestTimeoutMs: Int,
threadNamePrefix: String
) extends NetworkChannel with Logging {
import KafkaNetworkChannel._
type ResponseHandler = AbstractResponse => Unit
private val correlationIdCounter = new AtomicInteger(0)
private val endpoints = mutable.HashMap.empty[Int, Node]
private val requestThread = new RaftSendThread(
name = threadNamePrefix + "-outbound-request-thread",
networkClient = client,
requestTimeoutMs = requestTimeoutMs,
time = time,
isInterruptible = false
)
}
回到本节步骤1第2步 RaftManager.scala#buildRaftClient() 方法执行,可以看到核心的处理分为两步:
创建 KafkaRaftClient 集群客户端实例调用 KafkaRaftClient.java#initialize() 方法初始化当前节点的 quorum 状态
private def buildRaftClient(): KafkaRaftClient[T] = {
val expirationTimer = new SystemTimer("raft-expiration-executor")
val expirationService = new TimingWheelExpirationService(expirationTimer)
val quorumStateStore = new FilebasedStateStore(new File(dataDir, "quorum-state"))
val nodeId = if (config.processRoles.contains(ControllerRole)) {
OptionalInt.of(config.nodeId)
} else {
OptionalInt.empty()
}
val client = new KafkaRaftClient(
recordSerde,
netChannel,
replicatedLog,
quorumStateStore,
time,
metrics,
expirationService,
logContext,
metaProperties.clusterId,
nodeId,
raftConfig
)
client.initialize()
client
}
KafkaRaftClient 集群客户端的构造方法如下,需要关注的点有以下几个:
调用 RaftConfig.java#quorumVoterIds() 方法获取配置文件中controller.quorum.voters属性配置的有选举权的节点的 id 列表创建代表自身在集群中的角色状态的 QuorumState 实例调用KafkaNetworkChannel.java#updateEndpoint() 方法将其它可以投票的节点连接地址信息更新到集群请求发送组件内部
KafkaRaftClient(
RecordSerde serde,
NetworkChannel channel,
RaftMessageQueue messageQueue,
ReplicatedLog log,
QuorumStateStore quorumStateStore,
MemoryPool memoryPool,
Time time,
Metrics metrics,
ExpirationService expirationService,
int fetchMaxWaitMs,
String clusterId,
OptionalInt nodeId,
LogContext logContext,
Random random,
RaftConfig raftConfig
) {
this.serde = serde;
this.channel = channel;
this.messageQueue = messageQueue;
this.log = log;
this.memoryPool = memoryPool;
this.fetchPurgatory = new ThresholdPurgatory<>(expirationService);
this.appendPurgatory = new ThresholdPurgatory<>(expirationService);
this.time = time;
this.clusterId = clusterId;
this.fetchMaxWaitMs = fetchMaxWaitMs;
this.logger = logContext.logger(KafkaRaftClient.class);
this.random = random;
this.raftConfig = raftConfig;
this.snapshotCleaner = new RaftmetadataLogCleanerManager(logger, time, 60000, log::maybeClean);
Set quorumVoterIds = raftConfig.quorumVoterIds();
this.requestManager = new RequestManager(quorumVoterIds, raftConfig.retryBackoffMs(),
raftConfig.requestTimeoutMs(), random);
this.quorum = new QuorumState(
nodeId,
quorumVoterIds,
raftConfig.electionTimeoutMs(),
raftConfig.fetchTimeoutMs(),
quorumStateStore,
time,
logContext,
random);
this.kafkaRaftMetrics = new KafkaRaftMetrics(metrics, "raft", quorum);
kafkaRaftMetrics.updateNumUnknownVoterConnections(quorum.remoteVoters().size());
// Update the voter endpoints with what's in RaftConfig
Map voterAddresses = raftConfig.quorumVoterConnections();
voterAddresses.entrySet().stream()
.filter(e -> e.getValue() instanceof RaftConfig.InetAddressSpec)
.forEach(e -> this.channel.updateEndpoint(e.getKey(), (RaftConfig.InetAddressSpec) e.getValue()));
}
回到本节步骤4第2步,KafkaRaftClient.java#initialize() 方法的核心其实是调用 QuorumState.java#initialize() 方法去初始化当前节点在集群中所处的角色状态
public void initialize() {
quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, log.lastFetchedEpoch()));
long currentTimeMs = time.milliseconds();
if (quorum.isLeader()) {
throw new IllegalStateException("Voter cannot initialize as a Leader");
} else if (quorum.isCandidate()) {
onBecomeCandidate(currentTimeMs);
} else if (quorum.isFollower()) {
onBecomeFollower(currentTimeMs);
}
// When there is only a single voter, become candidate immediately
if (quorum.isVoter()
&& quorum.remoteVoters().isEmpty()
&& !quorum.isCandidate()) {
transitionToCandidate(currentTimeMs);
}
}
QuorumState.java#initialize() 方法比较长,不过核心只有两点:
首先调用 QuorumStateStore#readElectionState() 从本地quorum-state文件读取选举状态记录,此处主要是为了覆盖节点重启的场景根据选举状态初始化节点的集群状态,初次启动的节点将被设置为 UnattachedState 状态
public void initialize(OffsetAndEpoch logEndOffsetAndEpoch) throws IllegalStateException {
// We initialize in whatever state we were in on shutdown. If we were a leader
// or candidate, probably an election was held, but we will find out about it
// when we send Vote or BeginEpoch requests.
ElectionState election;
try {
election = store.readElectionState();
if (election == null) {
election = ElectionState.withUnknownLeader(0, voters);
}
} catch (final UncheckedIOException e) {
// For exceptions during state file loading (missing or not readable),
// we could assume the file is corrupted already and should be cleaned up.
log.warn("Clearing local quorum state store after error loading state {}",
store.toString(), e);
store.clear();
election = ElectionState.withUnknownLeader(0, voters);
}
final EpochState initialState;
if (!election.voters().isEmpty() && !voters.equals(election.voters())) {
throw new IllegalStateException("Configured voter set: " + voters
+ " is different from the voter set read from the state file: " + election.voters()
+ ". Check if the quorum configuration is up to date, "
+ "or wipe out the local state file if necessary");
} else if (election.hasVoted() && !isVoter()) {
String localIdDescription = localId.isPresent() ?
localId.getAsInt() + " is not a voter" :
"is undefined";
throw new IllegalStateException("Initialized quorum state " + election
+ " with a voted candidate, which indicates this node was previously "
+ " a voter, but the local id " + localIdDescription);
} else if (election.epoch < logEndOffsetAndEpoch.epoch) {
log.warn("Epoch from quorum-state file is {}, which is " +
"smaller than last written epoch {} in the log",
election.epoch, logEndOffsetAndEpoch.epoch);
initialState = new UnattachedState(
time,
logEndOffsetAndEpoch.epoch,
voters,
Optional.empty(),
randomElectionTimeoutMs(),
logContext
);
} else if (localId.isPresent() && election.isLeader(localId.getAsInt())) {
// If we were previously a leader, then we will start out as resigned
// in the same epoch. This serves two purposes:
// 1. It ensures that we cannot vote for another leader in the same epoch.
// 2. It protects the invariant that each record is uniquely identified by
// offset and epoch, which might otherwise be violated if unflushed data
// is lost after restarting.
initialState = new ResignedState(
time,
localId.getAsInt(),
election.epoch,
voters,
randomElectionTimeoutMs(),
Collections.emptyList(),
logContext
);
} else if (localId.isPresent() && election.isVotedCandidate(localId.getAsInt())) {
initialState = new CandidateState(
time,
localId.getAsInt(),
election.epoch,
voters,
Optional.empty(),
1,
randomElectionTimeoutMs(),
logContext
);
} else if (election.hasVoted()) {
initialState = new VotedState(
time,
election.epoch,
election.votedId(),
voters,
Optional.empty(),
randomElectionTimeoutMs(),
logContext
);
} else if (election.hasLeader()) {
initialState = new FollowerState(
time,
election.epoch,
election.leaderId(),
voters,
Optional.empty(),
fetchTimeoutMs,
logContext
);
} else {
initialState = new UnattachedState(
time,
election.epoch,
voters,
Optional.empty(),
randomElectionTimeoutMs(),
logContext
);
}
transitionTo(initialState);
}
至此 KafkaRaftClient 对象的实例化基本结束,回到本节步骤1第3步,RaftIoThread 的定义比较简单,可以看到核心的方法 doWork() 方法其实就是调用 KafkaRaftClien.java#poll(),这部分后文将详细分析,至此集群组件 KafkaRaftManager 的初始化结束
class RaftIoThread(
client: KafkaRaftClient[_],
threadNamePrefix: String
) extends ShutdownableThread(
name = threadNamePrefix + "-io-thread",
isInterruptible = false
) {
override def doWork(): Unit = {
client.poll()
}
......
}
KafkaRaftManager组件启动运行的触发点在 KafkaRaftServer 实例的启动过程中,KafkaRaftManager.scala#startup() 方法的关键如下:
调用 KafkaNetworkChannel.scala#start() 启动集群交互请求发送组件调用 RaftIoThread.scala#start() 方法启动线程,用于不断进行本地网络通信客户端请求响应处理
def startup(): Unit = {
// Update the voter endpoints (if valid) with what's in RaftConfig
val voterAddresses: util.Map[Integer, AddressSpec] = controllerQuorumVotersFuture.get()
for (voterAddressEntry <- voterAddresses.entrySet.asScala) {
voterAddressEntry.getValue match {
case spec: InetAddressSpec =>
netChannel.updateEndpoint(voterAddressEntry.getKey, spec)
case _: UnknownAddressSpec =>
logger.info(s"Skipping channel update for destination ID: ${voterAddressEntry.getKey} " +
s"because of non-routable endpoint: ${NON_ROUTABLE_ADDRESS.toString}")
case invalid: AddressSpec =>
logger.warn(s"Unexpected address spec (type: ${invalid.getClass}) for channel update for " +
s"destination ID: ${voterAddressEntry.getKey}")
}
}
netChannel.start()
raftIoThread.start()
}
KafkaNetworkChannel.scala#start() 方法其实就是启动 RaftSendThread 线程,这个线程类继承于 InterBrokerSendThread,InterBrokerSendThread 又继承于ShutdownableThread,下文将进行分析其工作机制
def start(): Unit = {
requestThread.start()
}
RaftIoThread.scala#start() 方法比较关键,从流程上看,此处实际上是整个 KafkaRaftManager 组件开始工作的入口。RaftIoThread 间接继承了 Thread,实际上线程启动后应该会调用 RaftIoThread.scala#run() 方法,而这个方法由其父类 ShutdownableThread.scala#run()实现
可以看到, ShutdownableThread.scala#run() 的核心逻辑其实是循环调用子类 RaftIoThread.scala#doWork() 方法
override def run(): Unit = {
isStarted = true
info("Starting")
try {
while (isRunning)
doWork()
} catch {
case e: FatalExitError =>
shutdownInitiated.countDown()
shutdownComplete.countDown()
info("Stopped")
Exit.exit(e.statusCode())
case e: Throwable =>
if (isRunning)
error("Error due to", e)
} finally {
shutdownComplete.countDown()
}
info("Stopped")
}
RaftIoThread.scala#doWork() 方法非常简明,重要逻辑就是调用 KafkaRaftClient.java#poll() 方法开始进行集群节点间的交互,至此核心组件的启动告一段落
override def doWork(): Unit = {
client.poll()
}
KafkaRaftClient.java#poll() 方法源码简单易懂,核心在于以下几步:
首先调用 KafkaRaftClient.java#pollCurrentState() 方法根据节点当前状态进行下一步处理,推动集群状态的变迁通过 messageQueue.poll() 轮询消息队列,取出消息队列中的消息调用 KafkaRaftClient.java#handleInboundMessage() 进行处理。需注意,此处消息队列实际只会存储两种消息,一种是来自其他节点的 RaftRequest.Inbound 请求,另一种是其他节点对本节点的请求作出的 RaftResponse.Inbound 响应
public void poll() {
pollListeners();
long currentTimeMs = time.milliseconds();
if (maybeCompleteShutdown(currentTimeMs)) {
return;
}
long pollStateTimeoutMs = pollCurrentState(currentTimeMs);
long cleaningTimeoutMs = snapshotCleaner.maybeClean(currentTimeMs);
long pollTimeoutMs = Math.min(pollStateTimeoutMs, cleaningTimeoutMs);
kafkaRaftMetrics.updatePollStart(currentTimeMs);
RaftMessage message = messageQueue.poll(pollTimeoutMs);
currentTimeMs = time.milliseconds();
kafkaRaftMetrics.updatePollEnd(currentTimeMs);
if (message != null) {
handleInboundMessage(message, currentTimeMs);
}
}
KafkaRaftClient.java#pollCurrentState() 方法是节点集群状态流转的重点,上文中我们提到初次启动的节点都处于 UnattachedState 状态,则此处将调用 KafkaRaftClient.java#pollUnattached() 方法
private long pollCurrentState(long currentTimeMs) {
if (quorum.isLeader()) {
return pollLeader(currentTimeMs);
} else if (quorum.isCandidate()) {
return pollCandidate(currentTimeMs);
} else if (quorum.isFollower()) {
return pollFollower(currentTimeMs);
} else if (quorum.isVoted()) {
return pollVoted(currentTimeMs);
} else if (quorum.isUnattached()) {
return pollUnattached(currentTimeMs);
} else if (quorum.isResigned()) {
return pollResigned(currentTimeMs);
} else {
throw new IllegalStateException("Unexpected quorum state " + quorum);
}
}
KafkaRaftClient.java#pollUnattached() 方法的关键逻辑如下:
首先判断当前节点是否有 Controller 角色,以及是否在属性 controller.quorum.voters 配置的有选举权的节点列表中,条件成立的话则调用 KafkaRaftClient.java#pollUnattachedAsVoter() 方法KafkaRaftClient.java#pollUnattachedAsVoter() 方法的处理非常简单,可以看到核心其实是调用 UnattachedState#hasElectionTimeoutExpired() 方法判断当前节点的选举时间是否到达,如果是的话则调用 KafkaRaftClient.java#transitionToCandidate() 方法将当前节点的角色切换为候选者,切换的过程中当前节点会给自己投一票
private long pollUnattached(long currentTimeMs) {
UnattachedState state = quorum.unattachedStateOrThrow();
if (quorum.isVoter()) {
return pollUnattachedAsVoter(state, currentTimeMs);
} else {
return pollUnattachedAsObserver(state, currentTimeMs);
}
}
private long pollUnattachedAsVoter(UnattachedState state, long currentTimeMs) {
GracefulShutdown shutdown = this.shutdown.get();
if (shutdown != null) {
// If shutting down, then remain in this state until either the
// shutdown completes or an epoch bump forces another state transition
return shutdown.remainingTimeMs();
} else if (state.hasElectionTimeoutExpired(currentTimeMs)) {
transitionToCandidate(currentTimeMs);
return 0L;
} else {
return state.remainingElectionTimeMs(currentTimeMs);
}
}
节点状态变迁为候选者后,下一轮 KafkaRaftClient.java#poll() 方法调用最终将触发 KafkaRaftClient.java#pollCandidate() 方法,这个方法的几个异常分支我们暂且不管,先看下 KafkaRaftClient.java#maybeSendVoteRequests() 向其他节点发起 Vote 投票请求的处理
private long pollCandidate(long currentTimeMs) {
CandidateState state = quorum.candidateStateOrThrow();
GracefulShutdown shutdown = this.shutdown.get();
if (shutdown != null) {
// If we happen to shutdown while we are a candidate, we will continue
// with the current election until one of the following conditions is met:
// 1) we are elected as leader (which allows us to resign)
// 2) another leader is elected
// 3) the shutdown timer expires
long minRequestBackoffMs = maybeSendVoteRequests(state, currentTimeMs);
return Math.min(shutdown.remainingTimeMs(), minRequestBackoffMs);
} else if (state.isBackingOff()) {
if (state.isBackoffComplete(currentTimeMs)) {
logger.info("Re-elect as candidate after election backoff has completed");
transitionToCandidate(currentTimeMs);
return 0L;
}
return state.remainingBackoffMs(currentTimeMs);
} else if (state.hasElectionTimeoutExpired(currentTimeMs)) {
long backoffDurationMs = binaryExponentialElectionBackoffMs(state.retries());
logger.debug("Election has timed out, backing off for {}ms before becoming a candidate again",
backoffDurationMs);
state.startBackingOff(currentTimeMs, backoffDurationMs);
return backoffDurationMs;
} else {
long minRequestBackoffMs = maybeSendVoteRequests(state, currentTimeMs);
return Math.min(minRequestBackoffMs, state.remainingElectionTimeMs(currentTimeMs));
}
}
KafkaRaftClient.java#maybeSendVoteRequests() 方法源码如下,可以看到经过层层检查,Vote 请求会被发送给每一个有投票权的节点,最终调用到的方法的核心逻辑如下:
首先生成 RaftRequest.Outbound 请求出站对象,并且设置这个请求完成后的回调方法,可以看到其回调中会把对端的响应包装成 RaftResponse.Inbound 响应入站对象,并通过 messageQueue.add() 调用将响应投入到本节步骤1第2步提到的消息队列中调用 KafkaNetworkChannel.scala#send() 方法将出站请求投入到队列中
private long maybeSendVoteRequests(
CandidateState state,
long currentTimeMs
) {
// Continue sending Vote requests as long as we still have a chance to win the election
if (!state.isVoteRejected()) {
return maybeSendRequests(
currentTimeMs,
state.unrecordedVoters(),
this::buildVoteRequest
);
}
return Long.MAX_VALUE;
}
private long maybeSendRequests(
long currentTimeMs,
Set destinationIds,
Supplier requestSupplier
) {
long minBackoffMs = Long.MAX_VALUE;
for (Integer destinationId : destinationIds) {
long backoffMs = maybeSendRequest(currentTimeMs, destinationId, requestSupplier);
if (backoffMs < minBackoffMs) {
minBackoffMs = backoffMs;
}
}
return minBackoffMs;
}
private long maybeSendRequest(
long currentTimeMs,
int destinationId,
Supplier requestSupplier
) {
ConnectionState connection = requestManager.getOrCreate(destinationId);
if (connection.isBackingOff(currentTimeMs)) {
long remainingBackoffMs = connection.remainingBackoffMs(currentTimeMs);
logger.debug("Connection for {} is backing off for {} ms", destinationId, remainingBackoffMs);
return remainingBackoffMs;
}
if (connection.isReady(currentTimeMs)) {
int correlationId = channel.newCorrelationId();
ApiMessage request = requestSupplier.get();
RaftRequest.Outbound requestMessage = new RaftRequest.Outbound(
correlationId,
request,
destinationId,
currentTimeMs
);
requestMessage.completion.whenComplete((response, exception) -> {
if (exception != null) {
ApiKeys api = ApiKeys.forId(request.apiKey());
Errors error = Errors.forException(exception);
ApiMessage errorResponse = RaftUtil.errorResponse(api, error);
response = new RaftResponse.Inbound(
correlationId,
errorResponse,
destinationId
);
}
messageQueue.add(response);
});
channel.send(requestMessage);
logger.trace("Sent outbound request: {}", requestMessage);
connection.onRequestSent(correlationId, currentTimeMs);
return Long.MAX_VALUE;
}
return connection.remainingRequestTimeMs(currentTimeMs);
}
KafkaNetworkChannel.scala#send() 方法的实现如下,核心动作是调用 RaftSendThread.scala#sendRequest() 方法将请求投入到发送队列。需注意,此处设置了异步请求的回调函数,请求完成拿到对端响应后将一路回调,通知到本节步骤5第1步
override def send(request: RaftRequest.Outbound): Unit = {
def completeFuture(message: ApiMessage): Unit = {
val response = new RaftResponse.Inbound(
request.correlationId,
message,
request.destinationId
)
request.completion.complete(response)
}
def onComplete(clientResponse: ClientResponse): Unit = {
val response = if (clientResponse.versionMismatch != null) {
error(s"Request $request failed due to unsupported version error",
clientResponse.versionMismatch)
errorResponse(request.data, Errors.UNSUPPORTED_VERSION)
} else if (clientResponse.authenticationException != null) {
// For now we treat authentication errors as retriable. We use the
// `NETWORK_EXCEPTION` error code for lack of a good alternative.
// Note that `BrokerToControllerChannelManager` will still log the
// authentication errors so that users have a chance to fix the problem.
error(s"Request $request failed due to authentication error",
clientResponse.authenticationException)
errorResponse(request.data, Errors.NETWORK_EXCEPTION)
} else if (clientResponse.wasDisconnected()) {
errorResponse(request.data, Errors.BROKER_NOT_AVAILABLE)
} else {
clientResponse.responseBody.data
}
completeFuture(response)
}
endpoints.get(request.destinationId) match {
case Some(node) =>
requestThread.sendRequest(RequestAndCompletionHandler(
request.createdTimeMs,
destination = node,
request = buildRequest(request.data),
handler = onComplete
))
case None =>
completeFuture(errorResponse(request.data, Errors.BROKER_NOT_AVAILABLE))
}
}
RaftSendThread.scala#sendRequest() 方法只是个请求入队的操作,在 2.2节步骤2 我们提到了RaftSendThread 的继承结构,则可以知道这个线程启动后,核心逻辑在于 InterBrokerSendThread.scala#doWork()
def sendRequest(request: RequestAndCompletionHandler): Unit = {
queue.add(request)
wakeup()
}
InterBrokerSendThread.scala#doWork() 方法的处理关键是触发 InterBrokerSendThread.scala#pollonce() 方法,这个方法的核心如下:
调用 InterBrokerSendThread.scala#drainGeneratedRequests() 方法生成协议要求的请求对象调用 InterBrokerSendThread.scala#sendRequests() 方法将请求投入到底层网络客户端 NetworkClient 的发送缓冲区调用 NetworkClient.java#poll() 方法触发底层网络数据的收发
override def doWork(): Unit = {
pollOnce(Long.MaxValue)
}
protected def pollOnce(maxTimeoutMs: Long): Unit = {
try {
drainGeneratedRequests()
var now = time.milliseconds()
val timeout = sendRequests(now, maxTimeoutMs)
networkClient.poll(timeout, now)
now = time.milliseconds()
checkDisconnects(now)
failExpiredRequests(now)
unsentRequests.clean()
} catch {
case _: DisconnectException if !networkClient.active() =>
// DisconnectException is expected when NetworkClient#initiateClose is called
case e: FatalExitError => throw e
case t: Throwable =>
error(s"unhandled exception caught in InterBrokerSendThread", t)
// rethrow any unhandled exceptions as FatalExitError so the JVM will be terminated
// as we will be in an unknown state with potentially some requests dropped and not
// being able to make progress. Known and expected Errors should have been appropriately
// dealt with already.
throw new FatalExitError()
}
}
InterBrokerSendThread.scala#drainGeneratedRequests() 方法的关键处理分为两步:
调用子类 RaftSendThread.scala#generateRequests() 将上层发送队列中的请求数据存入 Buffer 对象调用 NetworkClient.java#newClientRequest() 方法将上层请求转化为网络客户端请求对象 ClientRequest,并将其存入集合
private def drainGeneratedRequests(): Unit = {
generateRequests().foreach { request =>
unsentRequests.put(request.destination,
networkClient.newClientRequest(
request.destination.idString,
request.request,
request.creationTimeMs,
true,
requestTimeoutMs,
request.handler
))
}
}
RaftSendThread.scala#generateRequests() 的处理简单明了,不再赘述
def generateRequests(): Iterable[RequestAndCompletionHandler] = {
val buffer = mutable.Buffer[RequestAndCompletionHandler]()
while (true) {
val request = queue.poll()
if (request == null) {
return buffer
} else {
buffer += request
}
}
buffer
}
回到本节步骤8第2步, InterBrokerSendThread.scala#sendRequests() 方法的处理其实就是从未发送请求的集合中取出请求,再通过 NetworkClient.java#send() 方法将其存入网络客户端的发送缓冲区,笔者在Kafka 3.0 源码笔记(3)-Kafka 消费者的核心流程源码分析 中详细分析过底层网络客户端的运行机制,此处不再赘述
private def sendRequests(now: Long, maxTimeoutMs: Long): Long = {
var pollTimeout = maxTimeoutMs
for (node <- unsentRequests.nodes.asScala) {
val requestIterator = unsentRequests.requestIterator(node)
while (requestIterator.hasNext) {
val request = requestIterator.next
if (networkClient.ready(node, now)) {
networkClient.send(request, now)
requestIterator.remove()
} else
pollTimeout = Math.min(pollTimeout, networkClient.connectionDelay(node, now))
}
}
pollTimeout
}
至此 Vote 投票请求 已经发送出去,对端节点接收到请求后经过一系列分发将会触发 KafkaRaftClient.java#handleVoteRequest() 方法,可以看到其处理核心如下:
首先校验请求携带过来的参数,如果不合法直接响应返回其次比较请求携带过来的集群版本 epoch 和本地 epoch,如果本地版本更小,则说明当前节点已经落后,需要切换到 UnattachedState 状态,需注意,这是回退机制的重要基础最后比较请求携带的日志尾部 Offset 和本地 Offset,并根据本节点当前的角色状态决定投票结果。如果当前节点投了赞成票且自身处于 UnattachedState 状态则切换到 VoedState 状态
private VoteResponseData handleVoteRequest(
RaftRequest.Inbound requestmetadata
) {
VoteRequestData request = (VoteRequestData) requestmetadata.data;
if (!hasValidClusterId(request.clusterId())) {
return new VoteResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code());
}
if (!hasValidTopicPartition(request, log.topicPartition())) {
// Until we support multi-raft, we treat individual topic partition mismatches as invalid requests
return new VoteResponseData().setErrorCode(Errors.INVALID_REQUEST.code());
}
VoteRequestData.PartitionData partitionRequest =
request.topics().get(0).partitions().get(0);
int candidateId = partitionRequest.candidateId();
int candidateEpoch = partitionRequest.candidateEpoch();
int lastEpoch = partitionRequest.lastOffsetEpoch();
long lastEpochEndOffset = partitionRequest.lastOffset();
if (lastEpochEndOffset < 0 || lastEpoch < 0 || lastEpoch >= candidateEpoch) {
return buildVoteResponse(Errors.INVALID_REQUEST, false);
}
Optional errorOpt = validateVoterOnlyRequest(candidateId, candidateEpoch);
if (errorOpt.isPresent()) {
return buildVoteResponse(errorOpt.get(), false);
}
if (candidateEpoch > quorum.epoch()) {
transitionToUnattached(candidateEpoch);
}
OffsetAndEpoch lastEpochEndOffsetAndEpoch = new OffsetAndEpoch(lastEpochEndOffset, lastEpoch);
boolean voteGranted = quorum.canGrantVote(candidateId, lastEpochEndOffsetAndEpoch.compareTo(endOffset()) >= 0);
if (voteGranted && quorum.isUnattached()) {
transitionToVoted(candidateId, candidateEpoch);
}
logger.info("Vote request {} with epoch {} is {}", request, candidateEpoch, voteGranted ? "granted" : "rejected");
return buildVoteResponse(Errors.NONE, voteGranted);
}
对端节点处理完 Vote 请求,其响应将被本节步骤5提到的请求完成回调投入到消息队列中,并最终触发当前节点的 KafkaRaftClient.java#handleVoteResponse() 方法,这个方法的核心流程如下:
如果对端节点投的是赞成票,则调用 CandidateState#recordGrantedVote() 计票,并调用 KafkaRaftClient.java#maybeTransitionToLeader() 检查得票数是否超过有投票权的节点数的一半,如果条件成立则当前节点切换到 LeaderState 状态,成为集群的 Leader如果对端节点投的是反对票,同样记录下来,并检查反对票是否已经过半,如是则调用 CandidateState#startBackingOff() 方法通过回退避免多个候选者的选举僵局
private boolean handleVoteResponse(
RaftResponse.Inbound responsemetadata,
long currentTimeMs
) {
int remoteNodeId = responsemetadata.sourceId();
VoteResponseData response = (VoteResponseData) responsemetadata.data;
Errors topLevelError = Errors.forCode(response.errorCode());
if (topLevelError != Errors.NONE) {
return handleTopLevelError(topLevelError, responsemetadata);
}
if (!hasValidTopicPartition(response, log.topicPartition())) {
return false;
}
VoteResponseData.PartitionData partitionResponse =
response.topics().get(0).partitions().get(0);
Errors error = Errors.forCode(partitionResponse.errorCode());
OptionalInt responseLeaderId = optionalLeaderId(partitionResponse.leaderId());
int responseEpoch = partitionResponse.leaderEpoch();
Optional handled = maybeHandleCommonResponse(
error, responseLeaderId, responseEpoch, currentTimeMs);
if (handled.isPresent()) {
return handled.get();
} else if (error == Errors.NONE) {
if (quorum.isLeader()) {
logger.debug("Ignoring vote response {} since we already became leader for epoch {}",
partitionResponse, quorum.epoch());
} else if (quorum.isCandidate()) {
CandidateState state = quorum.candidateStateOrThrow();
if (partitionResponse.voteGranted()) {
state.recordGrantedVote(remoteNodeId);
maybeTransitionToLeader(state, currentTimeMs);
} else {
state.recordRejectedVote(remoteNodeId);
// If our vote is rejected, we go immediately to the random backoff. This
// ensures that we are not stuck waiting for the election timeout when the
// vote has become gridlocked.
if (state.isVoteRejected() && !state.isBackingOff()) {
logger.info("Insufficient remaining votes to become leader (rejected by {}). " +
"We will backoff before retrying election again", state.rejectingVoters());
state.startBackingOff(
currentTimeMs,
binaryExponentialElectionBackoffMs(state.retries())
);
}
}
} else {
logger.debug("Ignoring vote response {} since we are no longer a candidate in epoch {}",
partitionResponse, quorum.epoch());
}
return true;
} else {
return handleUnexpectedError(error, responsemetadata);
}
}
读者可以想象一下如果集群中多个可投票节点同时启动,并且都是初次启动,那么很可能这些节点都会切换到候选者状态。此时它们即便收到了其他候选者的 Vote 投票请求,也不会为其他候选者投票,选举就陷入了失败的僵局。对于这种情况, Kafka 引入了回退机制进行处理,大致流程如下图所示:
回退机制的核心在于使用 controller.quorum.election.backoff.max.ms 配置设置一个随机的回退超时时间,KafkaRaftClient.java#pollCandidate() 方法会检查候选者节点是否处于回退状态,回退状态的候选者将不再发送 Vote 请求。一旦回退的超时时间到达,最早退出回退状态的候选者节点将重新发起 Vote 投票,此时投票请求中携带的集群 epoch 增加了一个版本,收到请求的其他候选者会因为版本落后而回退到 UnattachedState 状态,此时可以顺利地投赞成票,选举僵局解除



