前言1. Controller 处理请求的流程2. 源码分析
2.1 事件生成2.2 事件消费2.3 创建 topic 时的分区分配2.4 业务执行结果处理
前言在 Kafka 3.0 源码笔记(5)-Kafka 服务端 Controller 集群选举的流程 中笔者详细分析了 Controller 集群启动时的选主流程,而在确定 Controller 集群的主节点后该节点需要对外提供服务,其中最重要的就是接受请求并维护集群的元数据。本文将以 Kafka 最常用的 Topic创建场景来分析 Controller 的运行原理,其中也涉及分区副本选主,读者可以清楚了解到 Topic 创建时的分区分配流程
1. Controller 处理请求的流程对于创建 Topic 这种会更改集群元数据的请求,在 KRaft模式下都会交给 Kafka Controller集群的 Leader 节点处理。 Kafka 的源码中对于这类请求具有完备统一的异步处理框架,大致流程如下:
2. 源码分析 2.1 事件生成异步事件生成
ControllerApis.scala 将创建 topic 请求分发给 QuorumController.java,由其负责生成封装了业务逻辑的异步事件 ControllerWriteEvent,并将事件投递到事件队列KafkaEventQueue.java异步事件消费
事件处理器 EventHandler 消费 KafkaEventQueue.java 中的事件,封装在 ControllerWriteEvent中的业务逻辑被触发执行,本文中的业务逻辑主要指 topic 创建时的分区分配业务执行结果处理
对业务处理的结果需要进行后续处理,包括给请求方返回响应以及将集群元数据变动写入内部topic(__cluster_metadata) 等
客户端的请求抵达 Kafka 服务端 ControllerServer 后,经过底层网络组件的协议解析处理转换为上层的 Request,然后分发到上层的 ControllerApis.scala#handle() 方法进行业务逻辑分发。对于 CreateTopics请求,处理方法是 ControllerApis.scala#handleCreateTopics(),可以看到其核心逻辑如下:
首先使用 AuthHelper 组件进行必要的鉴权等操作调用 ControllerApis.scala#createTopics() 方法将请求分发出去,并获取到一个异步任务 CompletableFuture 对象持有 CompletableFuture 对象,并调用其 CompletableFuture#whenComplete() 设置异步任务完成时的后续处理,可以看到此处任务完成的主要处理是调用 RequestHelper.scala#sendResponseMaybeThrottle() 方法将处理结果发送给请求发起方
def handleCreateTopics(request: RequestChannel.Request): Unit = {
val createTopicsRequest = request.body[CreateTopicsRequest]
val future = createTopics(createTopicsRequest.data(),
authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME),
names => authHelper.filterByAuthorized(request.context, CREATE, TOPIC, names)(identity))
future.whenComplete { (result, exception) =>
requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
if (exception != null) {
createTopicsRequest.getErrorResponse(throttleTimeMs, exception)
} else {
result.setThrottleTimeMs(throttleTimeMs)
new CreateTopicsResponse(result)
}
})
}
}
ControllerApis.scala#createTopics() 方法源码的处理比较清晰,关键步骤如下:
首先检查过滤掉请求携带的 topic 列表中名称重复的 topic,确定需要执行创建动作的 topic 列表调用接口方法 Controller.java#createTopics() 进行下一步处理,接口实现为 QuorumController.java#createTopics() 方法
def createTopics(request: CreateTopicsRequestData,
hasClusterAuth: Boolean,
getCreatableTopics: Iterable[String] => Set[String])
: CompletableFuture[CreateTopicsResponseData] = {
val topicNames = new util.HashSet[String]()
val duplicateTopicNames = new util.HashSet[String]()
request.topics().forEach { topicData =>
if (!duplicateTopicNames.contains(topicData.name())) {
if (!topicNames.add(topicData.name())) {
topicNames.remove(topicData.name())
duplicateTopicNames.add(topicData.name())
}
}
}
val authorizedTopicNames = if (hasClusterAuth) {
topicNames.asScala
} else {
getCreatableTopics.apply(topicNames.asScala)
}
val effectiveRequest = request.duplicate()
val iterator = effectiveRequest.topics().iterator()
while (iterator.hasNext) {
val creatableTopic = iterator.next()
if (duplicateTopicNames.contains(creatableTopic.name()) ||
!authorizedTopicNames.contains(creatableTopic.name())) {
iterator.remove()
}
}
controller.createTopics(effectiveRequest).thenApply { response =>
duplicateTopicNames.forEach { name =>
response.topics().add(new CreatableTopicResult().
setName(name).
setErrorCode(INVALID_REQUEST.code).
setErrorMessage("Duplicate topic name."))
}
topicNames.forEach { name =>
if (!authorizedTopicNames.contains(name)) {
response.topics().add(new CreatableTopicResult().
setName(name).
setErrorCode(TOPIC_AUTHORIZATION_FAILED.code))
}
}
response
}
}
QuorumController.java#createTopics() 方法实现如下,核心的处理其实是调用 QuorumController.java#appendWriteEvent() 方法进行事件创建:
以ReplicationControl.java#createTopics() 方法构建 Lambda 表达式作为 ControllerWriteOperation 的接口实现完成业务逻辑封装,调用 QuorumController.java#appendWriteEvent() 方法创建事件QuorumController.java#appendWriteEvent() 方法中首先使用方法入参构建 ControllerWriteEvent 对象调用 KafkaEventQueue.java#appendWithDeadline() 方法将新建事件投递到事件队列
@Override public CompletableFuturecreateTopics(CreateTopicsRequestData request) { if (request.topics().isEmpty()) { return CompletableFuture.completedFuture(new CreateTopicsResponseData()); } return appendWriteEvent("createTopics", time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs(), MILLISECONDS), () -> replicationControl.createTopics(request)); } private CompletableFuture appendWriteEvent(String name, long deadlineNs, ControllerWriteOperation op) { ControllerWriteEvent event = new ControllerWriteEvent<>(name, op); queue.appendWithDeadline(deadlineNs, event); return event.future(); }
KafkaEventQueue.java#appendWithDeadline() 方法的实现为接口默认方法EventQueue#appendWithDeadline(),最终其实调用到 KafkaEventQueue.java#enqueue() 方法实现事件入队,关键处理如下,至此事件的生产入队基本结束
将异步事件封装到 EventContext 对象中调用 EventHandler#enqueue 方法将新建的 EventContext 对象加入到待处理队列
@Override
public void enqueue(EventInsertionType insertionType,
String tag,
Function deadlineNsCalculator,
Event event) {
EventContext eventContext = new EventContext(event, insertionType, tag);
Exception e = eventHandler.enqueue(eventContext, deadlineNsCalculator);
if (e != null) {
eventContext.completeWithException(e);
}
}
上一节中事件已经被投递到队列内部,事件消费则是由 EevntHandler 事件处理器来完成的。EevntHandler 实现了 Runnable 接口,会在事件队列KafkaEventQueue被创建的时候启动,触发 EevntHandler#run() 方法执行,可以看到其核心是执行 EevntHandler#handleEvents() 方法
@Override
public void run() {
try {
handleEvents();
cleanupEvent.run();
} catch (Throwable e) {
log.warn("event handler thread exiting with exception", e);
}
}
EevntHandler#handleEvents() 方法会在 while 死循环中不断轮询获取内部队列中的 EventContext 对象,一旦获取到则调用 EventContext#run() 方法完成事件消费
private void handleEvents() throws InterruptedException {
EventContext toTimeout = null;
EventContext toRun = null;
while (true) {
if (toTimeout != null) {
toTimeout.completeWithTimeout();
toTimeout = null;
} else if (toRun != null) {
toRun.run(log);
toRun = null;
}
lock.lock();
try {
long awaitNs = Long.MAX_VALUE;
Map.Entry entry = deadlineMap.firstEntry();
if (entry != null) {
// Search for timed-out events or deferred events that are ready
// to run.
long now = time.nanoseconds();
long timeoutNs = entry.getKey();
EventContext eventContext = entry.getValue();
if (timeoutNs <= now) {
if (eventContext.insertionType == EventInsertionType.DEFERRED) {
// The deferred event is ready to run. Prepend it to the
// queue. (The value for deferred events is a schedule time
// rather than a timeout.)
remove(eventContext);
toRun = eventContext;
} else {
// not a deferred event, so it is a deadline, and it is timed out.
remove(eventContext);
toTimeout = eventContext;
}
continue;
} else if (closingTimeNs <= now) {
remove(eventContext);
toTimeout = eventContext;
continue;
}
awaitNs = timeoutNs - now;
}
if (head.next == head) {
if ((closingTimeNs != Long.MAX_VALUE) && deadlineMap.isEmpty()) {
// If there are no more entries to process, and the queue is
// closing, exit the thread.
return;
}
} else {
toRun = head.next;
remove(toRun);
continue;
}
if (closingTimeNs != Long.MAX_VALUE) {
long now = time.nanoseconds();
if (awaitNs > closingTimeNs - now) {
awaitNs = closingTimeNs - now;
}
}
if (awaitNs == Long.MAX_VALUE) {
cond.await();
} else {
cond.awaitNanos(awaitNs);
}
} finally {
lock.unlock();
}
}
}
EventContext#run() 方法的核心其实是调用 Event#run() 方法触发任务执行,在本文中也就是触发 ControllerWriteEvent#run() 方法
void run(Logger log) throws InterruptedException {
try {
event.run();
} catch (InterruptedException e) {
throw e;
} catch (Exception e) {
try {
event.handleException(e);
} catch (Throwable t) {
log.error("Unexpected exception in handleException", t);
}
}
}
ControllerWriteEvent#run() 方法的实现如下,此处非常核心,定义了 Controller 对于会改变元数据的请求的处理框架,至此事件消费处理的大致逻辑基本介绍完毕
调用 ControllerWriteOperation#generateRecordsAndResult() 函数式接口方法,触发在2.1节步骤3设置的业务逻辑处理,本文中则是触发 ReplicationControl.java#createTopics() 方法执行业务处理完成后,根据处理结果进行后续处理。如果处理结果中的消息记录不为空,根据 ControllerResult.isAtomic 属性确定向集群元数据 topic 写入消息的方式,对于创建 topic 的请求,此处将调用 KafkaRaftClient.java#scheduleAtomicAppend() 方法以上处理完成,调用 ControllerPurgatory.java#add() 将当前 ControllerWriteEvent 对象作为监听器监听元数据 偏移量offset 的移动,当目标 offset 抵达时,ControllerWriteEvent#complete() 方法将被执行,进而通过 CompletableFuture 一路回调触发异步任务,最终实现2.1节步骤1提到的将请求的处理结果发送给请求方
@Override
public void run() throws Exception {
long now = time.nanoseconds();
controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - eventCreatedTimeNs));
int controllerEpoch = curClaimEpoch;
if (controllerEpoch == -1) {
throw newNotControllerException();
}
startProcessingTimeNs = Optional.of(now);
ControllerResult result = op.generateRecordsAndResult();
if (result.records().isEmpty()) {
op.processBatchEndOffset(writeOffset);
// If the operation did not return any records, then it was actually just
// a read after all, and not a read + write. However, this read was done
// from the latest in-memory state, which might contain uncommitted data.
Optional maybeOffset = purgatory.highestPendingOffset();
if (!maybeOffset.isPresent()) {
// If the purgatory is empty, there are no pending operations and no
// uncommitted state. We can return immediately.
resultAndOffset = ControllerResultAndOffset.of(-1, result);
log.debug("Completing read-only operation {} immediately because " +
"the purgatory is empty.", this);
complete(null);
return;
}
// If there are operations in the purgatory, we want to wait for the latest
// one to complete before returning our result to the user.
resultAndOffset = ControllerResultAndOffset.of(maybeOffset.get(), result);
log.debug("Read-only operation {} will be completed when the log " +
"reaches offset {}", this, resultAndOffset.offset());
} else {
// If the operation returned a batch of records, those records need to be
// written before we can return our result to the user. Here, we hand off
// the batch of records to the raft client. They will be written out
// asynchronously.
final long offset;
if (result.isAtomic()) {
offset = raftClient.scheduleAtomicAppend(controllerEpoch, result.records());
} else {
offset = raftClient.scheduleAppend(controllerEpoch, result.records());
}
op.processBatchEndOffset(offset);
writeOffset = offset;
resultAndOffset = ControllerResultAndOffset.of(offset, result);
for (ApiMessageAndVersion message : result.records()) {
replay(message.message(), Optional.empty(), offset);
}
snapshotRegistry.getOrCreateSnapshot(offset);
log.debug("Read-write operation {} will be completed when the log " +
"reaches offset {}.", this, resultAndOffset.offset());
}
purgatory.add(resultAndOffset.offset(), this);
}
ReplicationControl.java#createTopics() 方法是创建 topic 的入口,这里关键的处理如下:
首先依然是请求携带的 topic 校验,包括 topic 名称的校验及 topic 存在性校验等,还包括新的 topic 的配置校验校验通过则遍历 topic 列表,调用 ReplicationControl.java#createTopics() 方法依次创建 topic。需注意此处会将消息列表 records 传入,这个集合用于保存记录了 topic 分区分配信息的消息最后调用 ControllerResult#atomicOf() 方法将 topic 创建请求的响应和分区分配消息记录封装起来,作为业务逻辑的处理结果返回
ControllerResultcreateTopics(CreateTopicsRequestData request) { Map topicErrors = new HashMap<>(); List records = new ArrayList<>(); // Check the topic names. validateNewTopicNames(topicErrors, request.topics()); // Identify topics that already exist and mark them with the appropriate error request.topics().stream().filter(creatableTopic -> topicsByName.containsKey(creatableTopic.name())) .forEach(t -> topicErrors.put(t.name(), new ApiError(Errors.TOPIC_ALREADY_EXISTS))); // Verify that the configurations for the new topics are OK, and figure out what // ConfigRecords should be created. Map >> configChanges = computeConfigChanges(topicErrors, request.topics()); ControllerResult
ReplicationControl.java#createTopics() 方法的处理主要分为两个部分:
请求中手动指定了分区分配方案,则进行方案校验,校验通过则直接采用手动分配方案完成该 topic 下的分区分配。这部分代码比较直观,不做过多分析请求中未手动指定分区方案,则使用内部算法进行 topic 下各个分区及其副本在 Broker 上的分配,这部分主要通过ClusterControlManager#placeReplicas() 方法进行
需注意一个分区的分配信息都存储在 PartitionRegistration 对象中,该对象会保存分区下所有副本分布的 Broker 列表,并单独保存 leader 副本所在的 Broker,从代码中可以看到 ISR 列表的第一个 Broker 节点上的副本将作为分区下所有副本的 leader
private ApiError createTopic(CreatableTopic topic,
List records,
Map successes) {
Map newParts = new HashMap<>();
if (!topic.assignments().isEmpty()) {
if (topic.replicationFactor() != -1) {
return new ApiError(INVALID_REQUEST,
"A manual partition assignment was specified, but replication " +
"factor was not set to -1.");
}
if (topic.numPartitions() != -1) {
return new ApiError(INVALID_REQUEST,
"A manual partition assignment was specified, but numPartitions " +
"was not set to -1.");
}
OptionalInt replicationFactor = OptionalInt.empty();
for (CreatableReplicaAssignment assignment : topic.assignments()) {
if (newParts.containsKey(assignment.partitionIndex())) {
return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
"Found multiple manual partition assignments for partition " +
assignment.partitionIndex());
}
validateManualPartitionAssignment(assignment.brokerIds(), replicationFactor);
replicationFactor = OptionalInt.of(assignment.brokerIds().size());
List isr = assignment.brokerIds().stream().
filter(clusterControl::unfenced).collect(Collectors.toList());
if (isr.isEmpty()) {
return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
"All brokers specified in the manual partition assignment for " +
"partition " + assignment.partitionIndex() + " are fenced.");
}
newParts.put(assignment.partitionIndex(), new PartitionRegistration(
Replicas.toArray(assignment.brokerIds()), Replicas.toArray(isr),
Replicas.NONE, Replicas.NONE, isr.get(0), 0, 0));
}
} else if (topic.replicationFactor() < -1 || topic.replicationFactor() == 0) {
return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
"Replication factor was set to an invalid non-positive value.");
} else if (!topic.assignments().isEmpty()) {
return new ApiError(INVALID_REQUEST,
"Replication factor was not set to -1 but a manual partition " +
"assignment was specified.");
} else if (topic.numPartitions() < -1 || topic.numPartitions() == 0) {
return new ApiError(Errors.INVALID_PARTITIONS,
"Number of partitions was set to an invalid non-positive value.");
} else {
int numPartitions = topic.numPartitions() == -1 ?
defaultNumPartitions : topic.numPartitions();
short replicationFactor = topic.replicationFactor() == -1 ?
defaultReplicationFactor : topic.replicationFactor();
try {
List> replicas = clusterControl.
placeReplicas(0, numPartitions, replicationFactor);
for (int partitionId = 0; partitionId < replicas.size(); partitionId++) {
int[] r = Replicas.toArray(replicas.get(partitionId));
newParts.put(partitionId,
new PartitionRegistration(r, r, Replicas.NONE, Replicas.NONE, r[0], 0, 0));
}
} catch (InvalidReplicationFactorException e) {
return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
"Unable to replicate the partition " + replicationFactor +
" time(s): " + e.getMessage());
}
}
Uuid topicId = Uuid.randomUuid();
successes.put(topic.name(), new CreatableTopicResult().
setName(topic.name()).
setTopicId(topicId).
setErrorCode((short) 0).
setErrorMessage(null).
setNumPartitions(newParts.size()).
setReplicationFactor((short) newParts.get(0).replicas.length));
records.add(new ApiMessageAndVersion(new TopicRecord().
setName(topic.name()).
setTopicId(topicId), TOPIC_RECORD.highestSupportedVersion()));
for (Entry partEntry : newParts.entrySet()) {
int partitionIndex = partEntry.getKey();
PartitionRegistration info = partEntry.getValue();
records.add(info.toRecord(topicId, partitionIndex));
}
return ApiError.NONE;
}
ClusterControlManager#placeReplicas() 方法如下,显然核心在于 BrokerHeartbeatManager#placeReplicas() 方法的调用
public List> placeReplicas(int startPartition, int numPartitions, short numReplicas) { if (heartbeatManager == null) { throw new RuntimeException("ClusterControlManager is not active."); } return heartbeatManager.placeReplicas(startPartition, numPartitions, numReplicas, id -> brokerRegistrations.get(id).rack(), replicaPlacer); }
BrokerHeartbeatManager#placeReplicas() 方法其实也只是个入口,核心的分区分配功能由 ReplicaPlacer#place()方法完成,StripedReplicaPlacer#place()方法为最终实现
List> placeReplicas(int startPartition, int numPartitions, short numReplicas, Function
> idToRack, ReplicaPlacer placer) { Iterator iterator = new UsableBrokerIterator( brokers.values().iterator(), idToRack); return placer.place(startPartition, numPartitions, numReplicas, iterator); }
StripedReplicaPlacer#place()方法的关键处理如下:
新建一个 RackList 对象来作为分区分配的处理器检查分区副本数设置是否合法,如果分区副本数参数大于集群内 Broker 节点的总数量则抛出异常遍历分区列表,调用 RackList#place() 方法将每一个分区下的副本分配到各个 Broker 上
@Override public List> place(int startPartition, int numPartitions, short replicationFactor, Iterator
iterator) { RackList rackList = new RackList(random, iterator); throwInvalidReplicationFactorIfNonPositive(replicationFactor); throwInvalidReplicationFactorIfZero(rackList.numUnfencedBrokers()); throwInvalidReplicationFactorIfTooFewBrokers(replicationFactor, rackList.numTotalBrokers()); List > placements = new ArrayList<>(numPartitions); for (int partition = 0; partition < numPartitions; partition++) { placements.add(rackList.place(replicationFactor)); } return placements; }
RackList#place() 方法的实现中会使用成员变量记录每次分区分配的信息,使用这个数据来避免各个分区的 leader 副本集中分布在一个 Broker 节点上。至此,创建 topic 时的分区副本分配告一段落
Listplace(int replicationFactor) { throwInvalidReplicationFactorIfNonPositive(replicationFactor); throwInvalidReplicationFactorIfTooFewBrokers(replicationFactor, numTotalBrokers()); throwInvalidReplicationFactorIfZero(numUnfencedBrokers()); // If we have returned as many assignments as there are unfenced brokers in // the cluster, shuffle the rack list and broker lists to try to avoid // repeating the same assignments again. // But don't reset the iteration epoch for a single unfenced broker -- otherwise we would loop forever if (epoch == numUnfencedBrokers && numUnfencedBrokers > 1) { shuffle(); epoch = 0; } if (offset == rackNames.size()) { offset = 0; } List brokers = new ArrayList<>(replicationFactor); int firstRackIndex = offset; while (true) { Optional name = rackNames.get(firstRackIndex); Rack rack = racks.get(name); int result = rack.nextUnfenced(epoch); if (result >= 0) { brokers.add(result); break; } firstRackIndex++; if (firstRackIndex == rackNames.size()) { firstRackIndex = 0; } } int rackIndex = offset; for (int replica = 1; replica < replicationFactor; replica++) { int result = -1; do { if (rackIndex == firstRackIndex) { firstRackIndex = -1; } else { Optional rackName = rackNames.get(rackIndex); Rack rack = racks.get(rackName); result = rack.next(epoch); } rackIndex++; if (rackIndex == rackNames.size()) { rackIndex = 0; } } while (result < 0); brokers.add(result); } epoch++; offset++; return brokers; }
业务处理结束后,回到2.2节步骤4第2步,如果本次创建 topic 请求确实产生了元数据消息,则需要将其写入内部 topic: __cluster_metadata,本文中将触发KafkaRaftClient.java#scheduleAtomicAppend() 方法执行,可以这里的核心是调用 KafkaRaftClient.java#append() 方法,其关键处理如下:
首先是调用 QuorumState#maybeLeaderState() 方法进行 Controller 节点状态检查,如果当前节点已经不是 leader,则不能继续处理检查通过后,调用 QuorumState#accumulator() 方法获取批量消息暂存器BatchAccumulator,随后调用 BatchAccumulator.appendAtomic() 方法暂存消息
@Override public long scheduleAtomicAppend(int epoch, Listrecords) { return append(epoch, records, true); } private long append(int epoch, List records, boolean isAtomic) { LeaderState leaderState = quorum. maybeLeaderState().orElseThrow( () -> new NotLeaderException("Append failed because the replication is not the current leader") ); BatchAccumulator accumulator = leaderState.accumulator(); boolean isFirstAppend = accumulator.isEmpty(); final long offset; if (isAtomic) { offset = accumulator.appendAtomic(epoch, records); } else { offset = accumulator.append(epoch, records); } // Wakeup the network channel if either this is the first append // or the accumulator is ready to drain now. Checking for the first // append ensures that we give the IO thread a chance to observe // the linger timeout so that it can schedule its own wakeup in case // there are no additional appends. if (isFirstAppend || accumulator.needsDrain(time.milliseconds())) { wakeup(); } return offset; }
消息暂存下来后,写入的动作将被 KafkaRaftClient.java#poll() 方法异步触发,读者如不了解 KafkaRaftClient.java#poll() 方法的触发点,可参考Kafka 3.0 源码笔记(5)-Kafka 服务端 Controller 集群选举的流程 。 KafkaRaftClient.java#poll() 方法内与本节关联的重点是 KafkaRaftClient.java#pollCurrentState() 方法调用
public void poll() {
pollListeners();
long currentTimeMs = time.milliseconds();
if (maybeCompleteShutdown(currentTimeMs)) {
return;
}
long pollStateTimeoutMs = pollCurrentState(currentTimeMs);
long cleaningTimeoutMs = snapshotCleaner.maybeClean(currentTimeMs);
long pollTimeoutMs = Math.min(pollStateTimeoutMs, cleaningTimeoutMs);
kafkaRaftMetrics.updatePollStart(currentTimeMs);
RaftMessage message = messageQueue.poll(pollTimeoutMs);
currentTimeMs = time.milliseconds();
kafkaRaftMetrics.updatePollEnd(currentTimeMs);
if (message != null) {
handleInboundMessage(message, currentTimeMs);
}
}
KafkaRaftClient.java#pollCurrentState() 方法中,当前节点为 Controller 集群 leader,则调用 KafkaRaftClient.java#pollLeader() 方法进入 Leader 的周期处理。从以下代码可以看到,最终与本节息息相关的流程是 KafkaRaftClient.java#maybeAppendBatches() 方法执行
private long pollCurrentState(long currentTimeMs) {
if (quorum.isLeader()) {
return pollLeader(currentTimeMs);
} else if (quorum.isCandidate()) {
return pollCandidate(currentTimeMs);
} else if (quorum.isFollower()) {
return pollFollower(currentTimeMs);
} else if (quorum.isVoted()) {
return pollVoted(currentTimeMs);
} else if (quorum.isUnattached()) {
return pollUnattached(currentTimeMs);
} else if (quorum.isResigned()) {
return pollResigned(currentTimeMs);
} else {
throw new IllegalStateException("Unexpected quorum state " + quorum);
}
}
private long pollLeader(long currentTimeMs) {
LeaderState state = quorum.leaderStateOrThrow();
maybeFireLeaderChange(state);
if (shutdown.get() != null || state.isResignRequested()) {
transitionToResigned(state.nonLeaderVotersByDescendingFetchOffset());
return 0L;
}
long timeUntilFlush = maybeAppendBatches(
state,
currentTimeMs
);
long timeUntilSend = maybeSendRequests(
currentTimeMs,
state.nonAcknowledgingVoters(),
this::buildBeginQuorumEpochRequest
);
return Math.min(timeUntilFlush, timeUntilSend);
}
KafkaRaftClient.java#maybeAppendBatches() 方法简单明了,关键处理分为两步:
state.accumulator().drain() 执行获取消息暂存器,并调用 BatchAccumulator#drain() 方法获取批量消息列表批量消息列表,依次调用 KafkaRaftClient.java#appendBatch() 方法开始将其追加到本地 Log 文件
private long maybeAppendBatches(
LeaderState state,
long currentTimeMs
) {
long timeUntilDrain = state.accumulator().timeUntilDrain(currentTimeMs);
if (timeUntilDrain <= 0) {
List> batches = state.accumulator().drain();
Iterator> iterator = batches.iterator();
try {
while (iterator.hasNext()) {
BatchAccumulator.CompletedBatch batch = iterator.next();
appendBatch(state, batch, currentTimeMs);
}
flushLeaderLog(state, currentTimeMs);
} finally {
// Release and discard any batches which failed to be appended
while (iterator.hasNext()) {
iterator.next().release();
}
}
}
return timeUntilDrain;
}
KafkaRaftClient.java#appendBatch() 方法的核心处理是调用 KafkaRaftClient.java#appendAsLeader() 方法进行写入,此处将通过 ReplicatedLog.appendAsLeader() 接口调用到其实现 KafkametadataLog.scala#appendAsLeader() 方法。至此处理流程进入消息的本地写入,不了解的读者可参考Kafka 3.0 源码笔记(7)-Kafka 服务端对客户端的 Produce 请求处理,本文全部分析基本结束
private void appendBatch(
LeaderState state,
BatchAccumulator.CompletedBatch batch,
long appendTimeMs
) {
try {
int epoch = state.epoch();
LogAppendInfo info = appendAsLeader(batch.data);
OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(info.lastOffset, epoch);
CompletableFuture future = appendPurgatory.await(
offsetAndEpoch.offset + 1, Integer.MAX_VALUE);
future.whenComplete((commitTimeMs, exception) -> {
if (exception != null) {
logger.debug("Failed to commit {} records at {}", batch.numRecords, offsetAndEpoch, exception);
} else {
long elapsedTime = Math.max(0, commitTimeMs - appendTimeMs);
double elapsedTimePerRecord = (double) elapsedTime / batch.numRecords;
kafkaRaftMetrics.updateCommitLatency(elapsedTimePerRecord, appendTimeMs);
logger.debug("Completed commit of {} records at {}", batch.numRecords, offsetAndEpoch);
batch.records.ifPresent(records -> {
maybeFireHandleCommit(batch.baseOffset, epoch, batch.appendTimestamp(), batch.sizeInBytes(), records);
});
}
});
} finally {
batch.release();
}
}
private LogAppendInfo appendAsLeader(
Records records
) {
LogAppendInfo info = log.appendAsLeader(records, quorum.epoch());
OffsetAndEpoch endOffset = endOffset();
kafkaRaftMetrics.updateAppendRecords(info.lastOffset - info.firstOffset + 1);
kafkaRaftMetrics.updateLogEnd(endOffset);
logger.trace("Leader appended records at base offset {}, new end offset is {}", info.firstOffset, endOffset);
return info;
}



