前言1. Kafka 的文件存储结构2. 来自客户端的 Fetch 请求处理
前言在 Kafka 3.0 源码笔记(3)-Kafka 消费者的核心流程源码分析 中笔者介绍了 Kafka 的客户端之一消费者 Consumer 拉取消息的流程,而Kafka 3.0 源码笔记(2)-Kafka 服务端的启动与请求处理源码分析一文则从服务器层面解析了 Kafka 服务端接收请求的关键,本文则将以来自客户端的 Fetch 请求的处理为例,大致分析 Kafka 服务端对请求的业务处理
1. Kafka 的文件存储结构此处之所以强调 来自客户端的 Fetch 请求的处理,是因为在 KRaft 模式下作为 Controller 集群 Leader 的 Kafka 节点会收到来自其他服务端的集群元数据同步的 Fetch 请求,负责这部分请求的处理组件截然不同
2. 来自客户端的 Fetch 请求处理Topic
发布订阅的消息主题,用作存储消息的第一层逻辑结构。每个主题包含多个分区,这些分区通常会分布在不同的 Broker 节点上,共同构成一个主题的物理基础Partition
主题下的分区,逻辑上分区由一组副本共同组成,其中 Leader 角色的副本实际存储来自客户端的消息,Follower 角色的副本通过同步 Leader 的数据来保证高可用。分区副本的数据结构为 Partition,是 Kafka 中用来存储消息的第二层结构,每个副本物理上是一组有序的消息日志Log
分区副本中的日志结构,实际是一组日志数据段的集合LogSegment
日志段数据结构,其内部保存了实际保存消息数据的文件对象,主要包括实际消息存储文件 FileRecords,偏移量索引文件OffsetIndex、时间戳索引文件TimeIndex 等
在 Kafka 的 BrokerServer 接收来自客户端的 Fetch 请求后,会将其投入到请求队列交由上层业务处理器处理,此时将触发 KafkaApis.scala#handle() 方法。可以看到这是个入口方法,其核心逻辑如下:
根据请求类型分发请求到不同的方法进行处理,来自客户端的 Fetch 请求将被 KafkaApis.scala#handleFetchRequest() 方法处理在 finally 块中调用 ReplicaManager.scala#tryCompleteActions() 方法尝试执行请求处理过程中产生的延时任务
override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
try {
trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" +
s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")
if (!apiVersionManager.isApiEnabled(request.header.apiKey)) {
// The socket server will reject APIs which are not exposed in this scope and close the connection
// before handing them to the request handler, so this path should not be exercised in practice
throw new IllegalStateException(s"API ${request.header.apiKey} is not enabled")
}
request.header.apiKey match {
case ApiKeys.PRODUCE => handleProduceRequest(request, requestLocal)
case ApiKeys.FETCH => handleFetchRequest(request)
case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
case ApiKeys.metaDATA => handleTopicmetadataRequest(request)
case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
case ApiKeys.UPDATe_metaDATA => handleUpdatemetadataRequest(request, requestLocal)
case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request)
case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request, requestLocal)
case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request, requestLocal)
case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request, requestLocal)
case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
case ApiKeys.CREATE_TOPICS => maybeForwardToController(request, handleCreateTopicsRequest)
case ApiKeys.DELETE_TOPICS => maybeForwardToController(request, handleDeleteTopicsRequest)
case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request, requestLocal)
case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)
case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request, requestLocal)
case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request, requestLocal)
case ApiKeys.END_TXN => handleEndTxnRequest(request, requestLocal)
case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request, requestLocal)
case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request, requestLocal)
case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
case ApiKeys.CREATE_ACLS => maybeForwardToController(request, handleCreateAcls)
case ApiKeys.DELETE_ACLS => maybeForwardToController(request, handleDeleteAcls)
case ApiKeys.ALTER_ConFIGS => maybeForwardToController(request, handleAlterConfigsRequest)
case ApiKeys.DESCRIBE_ConFIGS => handleDescribeConfigsRequest(request)
case ApiKeys.ALTER_REPLICA_LOG_DIRS => handleAlterReplicaLogDirsRequest(request)
case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
case ApiKeys.CREATE_PARTITIONS => maybeForwardToController(request, handleCreatePartitionsRequest)
case ApiKeys.CREATE_DELEGATION_TOKEN => maybeForwardToController(request, handleCreateTokenRequest)
case ApiKeys.RENEW_DELEGATION_TOKEN => maybeForwardToController(request, handleRenewTokenRequest)
case ApiKeys.EXPIRE_DELEGATION_TOKEN => maybeForwardToController(request, handleExpireTokenRequest)
case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request, requestLocal)
case ApiKeys.ELECT_LEADERS => handleElectReplicaLeader(request)
case ApiKeys.INCREMENTAL_ALTER_ConFIGS => maybeForwardToController(request, handleIncrementalAlterConfigsRequest)
case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleAlterPartitionReassignmentsRequest)
case ApiKeys.LIST_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleListPartitionReassignmentsRequest)
case ApiKeys.OFFSET_DELETE => handleOffsetDeleteRequest(request, requestLocal)
case ApiKeys.DESCRIBE_CLIENT_QUOTAS => handleDescribeClientQuotasRequest(request)
case ApiKeys.ALTER_CLIENT_QUOTAS => maybeForwardToController(request, handleAlterClientQuotasRequest)
case ApiKeys.DESCRIBE_USER_SCRAM_CREDENTIALS => handleDescribeUserScramCredentialsRequest(request)
case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => maybeForwardToController(request, handleAlterUserScramCredentialsRequest)
case ApiKeys.ALTER_ISR => handleAlterIsrRequest(request)
case ApiKeys.UPDATE_FEATURES => maybeForwardToController(request, handleUpdateFeatures)
case ApiKeys.ENVELOPE => handleEnvelope(request, requestLocal)
case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request)
case ApiKeys.DESCRIBE_PRODUCERS => handleDescribeProducersRequest(request)
case ApiKeys.DESCRIBE_TRANSACTIONS => handleDescribeTransactionsRequest(request)
case ApiKeys.LIST_TRANSACTIONS => handleListTransactionsRequest(request)
case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request)
case ApiKeys.DESCRIBE_QUORUM => forwardToControllerOrFail(request)
case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}")
}
} catch {
case e: FatalExitError => throw e
case e: Throwable =>
error(s"Unexpected error handling request ${request.requestDesc(true)} " +
s"with context ${request.context}", e)
requestHelper.handleError(request, e)
} finally {
// try to complete delayed action. In order to avoid conflicting locking, the actions to complete delayed requests
// are kept in a queue. We add the logic to check the ReplicaManager queue at the end of KafkaApis.handle() and the
// expiration thread for certain delayed operations (e.g. DelayedJoin)
replicaManager.tryCompleteActions()
// The local completion time may be set while processing the request. only record it if it's unset.
if (request.apiLocalCompleteTimeNanos < 0)
request.apiLocalCompleteTimeNanos = time.nanoseconds
}
}
KafkaApis.scala#handleFetchRequest() 方法非常长,不过核心逻辑比较简单:
首先从请求参数中确定客户端订阅消费的 topic调用 replicaManager.scala#fetchMessages() 方法去本地文件读取消息数据,并设置 KafkaApis.scala#handleFetchRequest()#processResponseCallback() 函数为处理完成后的响应回调
def handleFetchRequest(request: RequestChannel.Request): Unit = {
val versionId = request.header.apiVersion
val clientId = request.header.clientId
val fetchRequest = request.body[FetchRequest]
val fetchContext = fetchManager.newContext(
fetchRequest.metadata,
fetchRequest.fetchData,
fetchRequest.toForget,
fetchRequest.isFromFollower)
val clientmetadata: Option[Clientmetadata] = if (versionId >= 11) {
// Fetch API version 11 added preferred replica logic
Some(new DefaultClientmetadata(
fetchRequest.rackId,
clientId,
request.context.clientAddress,
request.context.principal,
request.context.listenerName.value))
} else {
None
}
val erroneous = mutable.ArrayBuffer[(TopicPartition, FetchResponseData.PartitionData)]()
val interesting = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]()
if (fetchRequest.isFromFollower) {
// The follower must have ClusterAction on ClusterResource in order to fetch partition data.
if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
fetchContext.foreachPartition { (topicPartition, data) =>
if (!metadataCache.contains(topicPartition))
erroneous += topicPartition -> FetchResponse.partitionResponse(topicPartition.partition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
else
interesting += (topicPartition -> data)
}
} else {
fetchContext.foreachPartition { (part, _) =>
erroneous += part -> FetchResponse.partitionResponse(part.partition, Errors.TOPIC_AUTHORIZATION_FAILED)
}
}
} else {
// Regular Kafka consumers need READ permission on each partition they are fetching.
val partitionDatas = new mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]
fetchContext.foreachPartition { (topicPartition, partitionData) =>
partitionDatas += topicPartition -> partitionData
}
val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, partitionDatas)(_._1.topic)
partitionDatas.foreach { case (topicPartition, data) =>
if (!authorizedTopics.contains(topicPartition.topic))
erroneous += topicPartition -> FetchResponse.partitionResponse(topicPartition.partition, Errors.TOPIC_AUTHORIZATION_FAILED)
else if (!metadataCache.contains(topicPartition))
erroneous += topicPartition -> FetchResponse.partitionResponse(topicPartition.partition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
else
interesting += (topicPartition -> data)
}
}
......
// for fetch from consumer, cap fetchMaxBytes to the maximum bytes that could be fetched without being throttled given
// no bytes were recorded in the recent quota window
// trying to fetch more bytes would result in a guaranteed throttling potentially blocking consumer progress
val maxQuotaWindowBytes = if (fetchRequest.isFromFollower)
Int.MaxValue
else
quotas.fetch.getMaxValueInQuotaWindow(request.session, clientId).toInt
val fetchMaxBytes = Math.min(Math.min(fetchRequest.maxBytes, config.fetchMaxBytes), maxQuotaWindowBytes)
val fetchMinBytes = Math.min(fetchRequest.minBytes, fetchMaxBytes)
if (interesting.isEmpty)
processResponseCallback(Seq.empty)
else {
// call the replica manager to fetch messages from the local replica
replicaManager.fetchMessages(
fetchRequest.maxWait.toLong,
fetchRequest.replicaId,
fetchMinBytes,
fetchMaxBytes,
versionId <= 2,
interesting,
replicationQuota(fetchRequest),
processResponseCallback,
fetchRequest.isolationLevel,
clientmetadata)
}
}
replicaManager.scala#fetchMessages() 方法的核心处理相对清晰,需要关注的如下:
首先调用 replicaManager.scala#fetchMessages()#readFromLog() 函数进行实际的数据读取操作,可以看到这个方法的核心是调用 replicaManager.scala#fetchMessages()#readFromLocalLog() 方法根据相关配置决定是否立即回调函数 KafkaApis.scala#handleFetchRequest()#processResponseCallback() 将数据返回给请求方,可以看到能够立即返回的条件有以下 5 个
请求携带的超时参数小于 0,也就是说请求方不愿意等待请求订阅的 topic 为空服务端读到了足够的数据,可以返回读取数据时发生了异常请求处理期间 Kafka 集群版本发生了变更 如果不能立即返回,则生成一个延迟操作,将其投入到延迟队列等待触发
def fetchMessages(timeout: Long,
replicaId: Int,
fetchMinBytes: Int,
fetchMaxBytes: Int,
hardMaxBytesLimit: Boolean,
fetchInfos: Seq[(TopicPartition, PartitionData)],
quota: ReplicaQuota,
responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,
isolationLevel: IsolationLevel,
clientmetadata: Option[Clientmetadata]): Unit = {
val isFromFollower = Request.isValidBrokerId(replicaId)
val isFromConsumer = !(isFromFollower || replicaId == Request.FutureLocalReplicaId)
val fetchIsolation = if (!isFromConsumer)
FetchLogEnd
else if (isolationLevel == IsolationLevel.READ_COMMITTED)
FetchTxnCommitted
else
FetchHighWatermark
// Restrict fetching to leader if request is from follower or from a client with older version (no Clientmetadata)
val fetchonlyFromLeader = isFromFollower || (isFromConsumer && clientmetadata.isEmpty)
def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
val result = readFromLocalLog(
replicaId = replicaId,
fetchonlyFromLeader = fetchOnlyFromLeader,
fetchIsolation = fetchIsolation,
fetchMaxBytes = fetchMaxBytes,
hardMaxBytesLimit = hardMaxBytesLimit,
readPartitionInfo = fetchInfos,
quota = quota,
clientmetadata = clientmetadata)
if (isFromFollower) updateFollowerFetchState(replicaId, result)
else result
}
val logReadResults = readFromLog()
// check if this fetch request can be satisfied right away
var bytesReadable: Long = 0
var errorReadingData = false
var hasDivergingEpoch = false
val logReadResultMap = new mutable.HashMap[TopicPartition, LogReadResult]
logReadResults.foreach { case (topicPartition, logReadResult) =>
brokerTopicStats.topicStats(topicPartition.topic).totalFetchRequestRate.mark()
brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()
if (logReadResult.error != Errors.NONE)
errorReadingData = true
if (logReadResult.divergingEpoch.nonEmpty)
hasDivergingEpoch = true
bytesReadable = bytesReadable + logReadResult.info.records.sizeInBytes
logReadResultMap.put(topicPartition, logReadResult)
}
// respond immediately if 1) fetch request does not want to wait
// 2) fetch request does not require any data
// 3) has enough data to respond
// 4) some error happens while reading data
// 5) we found a diverging epoch
if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData || hasDivergingEpoch) {
val fetchPartitionData = logReadResults.map { case (tp, result) =>
val isReassignmentFetch = isFromFollower && isAddingReplica(tp, replicaId)
tp -> result.toFetchPartitionData(isReassignmentFetch)
}
responseCallback(fetchPartitionData)
} else {
// construct the fetch results from the read results
val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicPartition, FetchPartitionStatus)]
fetchInfos.foreach { case (topicPartition, partitionData) =>
logReadResultMap.get(topicPartition).foreach(logReadResult => {
val logOffsetmetadata = logReadResult.info.fetchOffsetmetadata
fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetmetadata, partitionData))
})
}
val fetchmetadata: SFetchmetadata = SFetchmetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit,
fetchOnlyFromLeader, fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus)
val delayedFetch = new DelayedFetch(timeout, fetchmetadata, this, quota, clientmetadata,
responseCallback)
// create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) }
// try to complete the request immediately, otherwise put it into the purgatory;
// this is because while the delayed fetch operation is being created, new requests
// may arrive and hence make this operation completable.
delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)
}
}
replicaManager.scala#fetchMessages()#readFromLocalLog() 方法的处理比较简明,重点在于遍历 topic 列表,调用 replicaManager.scala#fetchMessages()#readFromLocalLog()#read() 方法读取每个 topic 下分区内存储的消息数据,可以看到这个方法的关键处理是调用 Partition.scala#readRecords() 方法执行读取操作
def readFromLocalLog(replicaId: Int,
fetchOnlyFromLeader: Boolean,
fetchIsolation: FetchIsolation,
fetchMaxBytes: Int,
hardMaxBytesLimit: Boolean,
readPartitionInfo: Seq[(TopicPartition, PartitionData)],
quota: ReplicaQuota,
clientmetadata: Option[Clientmetadata]): Seq[(TopicPartition, LogReadResult)] = {
val traceEnabled = isTraceEnabled
def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = {
val offset = fetchInfo.fetchOffset
val partitionFetchSize = fetchInfo.maxBytes
val followerLogStartOffset = fetchInfo.logStartOffset
val adjustedMaxBytes = math.min(fetchInfo.maxBytes, limitBytes)
try {
if (traceEnabled)
trace(s"Fetching log segment for partition $tp, offset $offset, partition fetch size $partitionFetchSize, " +
s"remaining response limit $limitBytes" +
(if (minOneMessage) s", ignoring response/partition size limits" else ""))
val partition = getPartitionOrException(tp)
val fetchTimeMs = time.milliseconds
// If we are the leader, determine the preferred read-replica
val preferredReadReplica = clientmetadata.flatMap(
metadata => findPreferredReadReplica(partition, metadata, replicaId, fetchInfo.fetchOffset, fetchTimeMs))
if (preferredReadReplica.isDefined) {
replicaSelectorOpt.foreach { selector =>
debug(s"Replica selector ${selector.getClass.getSimpleName} returned preferred replica " +
s"${preferredReadReplica.get} for $clientmetadata")
}
// If a preferred read-replica is set, skip the read
val offsetSnapshot = partition.fetchOffsetSnapshot(fetchInfo.currentLeaderEpoch, fetchonlyFromLeader = false)
LogReadResult(info = FetchDataInfo(LogOffsetmetadata.UnknownOffsetmetadata, MemoryRecords.EMPTY),
divergingEpoch = None,
highWatermark = offsetSnapshot.highWatermark.messageOffset,
leaderLogStartOffset = offsetSnapshot.logStartOffset,
leaderLogEndOffset = offsetSnapshot.logEndOffset.messageOffset,
followerLogStartOffset = followerLogStartOffset,
fetchTimeMs = -1L,
lastStableOffset = Some(offsetSnapshot.lastStableOffset.messageOffset),
preferredReadReplica = preferredReadReplica,
exception = None)
} else {
// Try the read first, this tells us whether we need all of adjustedFetchSize for this partition
val readInfo: LogReadInfo = partition.readRecords(
lastFetchedEpoch = fetchInfo.lastFetchedEpoch,
fetchOffset = fetchInfo.fetchOffset,
currentLeaderEpoch = fetchInfo.currentLeaderEpoch,
maxBytes = adjustedMaxBytes,
fetchIsolation = fetchIsolation,
fetchonlyFromLeader = fetchOnlyFromLeader,
minoneMessage = minOneMessage)
val fetchDataInfo = if (shouldLeaderThrottle(quota, partition, replicaId)) {
// If the partition is being throttled, simply return an empty set.
FetchDataInfo(readInfo.fetchedData.fetchOffsetmetadata, MemoryRecords.EMPTY)
} else if (!hardMaxBytesLimit && readInfo.fetchedData.firstEntryIncomplete) {
// For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make
// progress in such cases and don't need to report a `RecordTooLargeException`
FetchDataInfo(readInfo.fetchedData.fetchOffsetmetadata, MemoryRecords.EMPTY)
} else {
readInfo.fetchedData
}
LogReadResult(info = fetchDataInfo,
divergingEpoch = readInfo.divergingEpoch,
highWatermark = readInfo.highWatermark,
leaderLogStartOffset = readInfo.logStartOffset,
leaderLogEndOffset = readInfo.logEndOffset,
followerLogStartOffset = followerLogStartOffset,
fetchTimeMs = fetchTimeMs,
lastStableOffset = Some(readInfo.lastStableOffset),
preferredReadReplica = preferredReadReplica,
exception = None)
}
} catch {
// NOTE: Failed fetch requests metric is not incremented for known exceptions since it
// is supposed to indicate un-expected failure of a broker in handling a fetch request
case e@ (_: UnknownTopicOrPartitionException |
_: NotLeaderOrFollowerException |
_: UnknownLeaderEpochException |
_: FencedLeaderEpochException |
_: ReplicaNotAvailableException |
_: KafkaStorageException |
_: OffsetOutOfRangeException) =>
LogReadResult(info = FetchDataInfo(LogOffsetmetadata.UnknownOffsetmetadata, MemoryRecords.EMPTY),
divergingEpoch = None,
highWatermark = Log.UnknownOffset,
leaderLogStartOffset = Log.UnknownOffset,
leaderLogEndOffset = Log.UnknownOffset,
followerLogStartOffset = Log.UnknownOffset,
fetchTimeMs = -1L,
lastStableOffset = None,
exception = Some(e))
case e: Throwable =>
brokerTopicStats.topicStats(tp.topic).failedFetchRequestRate.mark()
brokerTopicStats.allTopicsStats.failedFetchRequestRate.mark()
val fetchSource = Request.describeReplicaId(replicaId)
error(s"Error processing fetch with max size $adjustedMaxBytes from $fetchSource " +
s"on partition $tp: $fetchInfo", e)
LogReadResult(info = FetchDataInfo(LogOffsetmetadata.UnknownOffsetmetadata, MemoryRecords.EMPTY),
divergingEpoch = None,
highWatermark = Log.UnknownOffset,
leaderLogStartOffset = Log.UnknownOffset,
leaderLogEndOffset = Log.UnknownOffset,
followerLogStartOffset = Log.UnknownOffset,
fetchTimeMs = -1L,
lastStableOffset = None,
exception = Some(e))
}
}
var limitBytes = fetchMaxBytes
val result = new mutable.ArrayBuffer[(TopicPartition, LogReadResult)]
var minoneMessage = !hardMaxBytesLimit
readPartitionInfo.foreach { case (tp, fetchInfo) =>
val readResult = read(tp, fetchInfo, limitBytes, minOneMessage)
val recordBatchSize = readResult.info.records.sizeInBytes
// once we read from a non-empty partition, we stop ignoring request and partition level size limits
if (recordBatchSize > 0)
minoneMessage = false
limitBytes = math.max(0, limitBytes - recordBatchSize)
result += (tp -> readResult)
}
result
}
Partition.scala#readRecords() 方法执行读取操作会比较请求带来的集群版本号和服务端保存的集群版本号,从而确定客户端和服务端是否有版本分歧。如果不存在分歧,则调用 Log.scala#read() 方法开始进入下一级 Log 数据结构读取数据
def readRecords(lastFetchedEpoch: Optional[Integer],
fetchOffset: Long,
currentLeaderEpoch: Optional[Integer],
maxBytes: Int,
fetchIsolation: FetchIsolation,
fetchOnlyFromLeader: Boolean,
minOneMessage: Boolean): LogReadInfo = inReadLock(leaderIsrUpdateLock) {
// decide whether to only fetch from leader
val localLog = localLogWithEpochOrException(currentLeaderEpoch, fetchOnlyFromLeader)
// Note we use the log end offset prior to the read. This ensures that any appends following
// the fetch do not prevent a follower from coming into sync.
val initialHighWatermark = localLog.highWatermark
val initialLogStartOffset = localLog.logStartOffset
val initialLogEndOffset = localLog.logEndOffset
val initialLastStableOffset = localLog.lastStableOffset
lastFetchedEpoch.ifPresent { fetchEpoch =>
val epochEndOffset = lastOffsetForLeaderEpoch(currentLeaderEpoch, fetchEpoch, fetchonlyFromLeader = false)
val error = Errors.forCode(epochEndOffset.errorCode)
if (error != Errors.NONE) {
throw error.exception()
}
if (epochEndOffset.endOffset == UNDEFINED_EPOCH_OFFSET || epochEndOffset.leaderEpoch == UNDEFINED_EPOCH) {
throw new OffsetOutOfRangeException("Could not determine the end offset of the last fetched epoch " +
s"$lastFetchedEpoch from the request")
}
// If fetch offset is less than log start, fail with OffsetOutOfRangeException, regardless of whether epochs are diverging
if (fetchOffset < initialLogStartOffset) {
throw new OffsetOutOfRangeException(s"Received request for offset $fetchOffset for partition $topicPartition, " +
s"but we only have log segments in the range $initialLogStartOffset to $initialLogEndOffset.")
}
if (epochEndOffset.leaderEpoch < fetchEpoch || epochEndOffset.endOffset < fetchOffset) {
val emptyFetchData = FetchDataInfo(
fetchOffsetmetadata = LogOffsetmetadata(fetchOffset),
records = MemoryRecords.EMPTY,
firstEntryIncomplete = false,
abortedTransactions = None
)
val divergingEpoch = new FetchResponseData.EpochEndOffset()
.setEpoch(epochEndOffset.leaderEpoch)
.setEndOffset(epochEndOffset.endOffset)
return LogReadInfo(
fetchedData = emptyFetchData,
divergingEpoch = Some(divergingEpoch),
highWatermark = initialHighWatermark,
logStartOffset = initialLogStartOffset,
logEndOffset = initialLogEndOffset,
lastStableOffset = initialLastStableOffset)
}
}
val fetchedData = localLog.read(fetchOffset, maxBytes, fetchIsolation, minOneMessage)
LogReadInfo(
fetchedData = fetchedData,
divergingEpoch = None,
highWatermark = initialHighWatermark,
logStartOffset = initialLogStartOffset,
logEndOffset = initialLogEndOffset,
lastStableOffset = initialLastStableOffset)
}
Log.scala#read() 方法的关键处理如下:
因为 Kafka 的消息数据分散存储在多个日志段中,此处需调用 LogSegments.scala#floorSegment() 通过 startOffset 参数定位到一个日志段对象 LogSegment调用 LogSegment.scala#read() 方法读取这个日志段中指定位置的消息数据
def read(startOffset: Long,
maxLength: Int,
isolation: FetchIsolation,
minOneMessage: Boolean): FetchDataInfo = {
maybeHandleIOException(s"Exception while reading from $topicPartition in dir ${dir.getParent}") {
trace(s"Reading maximum $maxLength bytes at offset $startOffset from log with " +
s"total length $size bytes")
val includeAbortedTxns = isolation == FetchTxnCommitted
// Because we don't use the lock for reading, the synchronization is a little bit tricky.
// We create the local variables to avoid race conditions with updates to the log.
val endOffsetmetadata = nextOffsetmetadata
val endOffset = endOffsetmetadata.messageOffset
var segmentOpt = segments.floorSegment(startOffset)
// return error on attempt to read beyond the log end offset or read below log start offset
if (startOffset > endOffset || segmentOpt.isEmpty || startOffset < logStartOffset)
throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " +
s"but we only have log segments in the range $logStartOffset to $endOffset.")
val maxOffsetmetadata = isolation match {
case FetchLogEnd => endOffsetmetadata
case FetchHighWatermark => fetchHighWatermarkmetadata
case FetchTxnCommitted => fetchLastStableOffsetmetadata
}
if (startOffset == maxOffsetmetadata.messageOffset)
emptyFetchDataInfo(maxOffsetmetadata, includeAbortedTxns)
else if (startOffset > maxOffsetmetadata.messageOffset)
emptyFetchDataInfo(convertToOffsetmetadataOrThrow(startOffset), includeAbortedTxns)
else {
// Do the read on the segment with a base offset less than the target offset
// but if that segment doesn't contain any messages with an offset greater than that
// continue to read from successive segments until we get some messages or we reach the end of the log
var fetchDataInfo: FetchDataInfo = null
while (fetchDataInfo == null && segmentOpt.isDefined) {
val segment = segmentOpt.get
val baseOffset = segment.baseOffset
val maxPosition =
// Use the max offset position if it is on this segment; otherwise, the segment size is the limit.
if (maxOffsetmetadata.segmentbaseOffset == segment.baseOffset) maxOffsetmetadata.relativePositionInSegment
else segment.size
fetchDataInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
if (fetchDataInfo != null) {
if (includeAbortedTxns)
fetchDataInfo = addAbortedTransactions(startOffset, segment, fetchDataInfo)
} else segmentOpt = segments.higherSegment(baseOffset)
}
if (fetchDataInfo != null) fetchDataInfo
else {
// okay we are beyond the end of the last segment with no data fetched although the start offset is in range,
// this can happen when all messages with offset larger than start offsets have been deleted.
// In this case, we will return the empty set with log end offset metadata
FetchDataInfo(nextOffsetmetadata, MemoryRecords.EMPTY)
}
}
}
}
LogSegment.scala#read() 方法的核心处理如下:
调用 LogSegment.scala#translateOffset() 方法将消息偏移量转换为消息在文件中的实际物理位点确定消息的实际物理位置后,调用 FileRecords.java#slice() 方法返回一个逻辑切片的 FileRecords 对象用于表征消息数据
@threadsafe
def read(startOffset: Long,
maxSize: Int,
maxPosition: Long = size,
minOneMessage: Boolean = false): FetchDataInfo = {
if (maxSize < 0)
throw new IllegalArgumentException(s"Invalid max size $maxSize for log read from segment $log")
val startOffsetAndSize = translateOffset(startOffset)
// if the start position is already off the end of the log, return null
if (startOffsetAndSize == null)
return null
val startPosition = startOffsetAndSize.position
val offsetmetadata = LogOffsetmetadata(startOffset, this.baseOffset, startPosition)
val adjustedMaxSize =
if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)
else maxSize
// return a log segment but with zero size in the case below
if (adjustedMaxSize == 0)
return FetchDataInfo(offsetmetadata, MemoryRecords.EMPTY)
// calculate the length of the message set to read based on whether or not they gave us a maxOffset
val fetchSize: Int = min((maxPosition - startPosition).toInt, adjustedMaxSize)
FetchDataInfo(offsetmetadata, log.slice(startPosition, fetchSize),
firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size)
}
LogSegment.scala#translateOffset() 方法是逻辑偏移量和实际物理位置映射转换的关键,其关键处理如下:
调用 OffsetIndex.scala#lookup() 方法从偏移量索引文件中找到消息数据的位点,需注意 Kafka 的索引是稀疏索引,因此在相同空间内相比稠密索引可以保存更多索引数据调用 FileRecords.java#searchForOffsetWithSize() 方法结合请求的偏移量和索引记录的位置确定消息文件读取的起始位置
稠密索引
在密集索引中,每个搜索键值都有一个索引记录。这样可以加快搜索速度,但需要更多空间来存储索引记录本身稀疏索引
在稀疏索引中,不会为每个搜索关键字创建索引记录,物理上相邻的两个索引记录其搜索关键字实际上不是相邻的,读者可以将其理解为跳跃表那样的结构
@threadsafe
private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): LogOffsetPosition = {
val mapping = offsetIndex.lookup(offset)
log.searchForOffsetWithSize(offset, max(mapping.position, startingFilePosition))
}
OffsetIndex.scala#lookup() 的源码如下,可以看到关键处有两点:
偏移量索引使用 mmap 来映射操作索引数据,这样索引数据不需要拷贝到用户态,提高了性能调用 AbstractIndex.scala#largestLowerBoundSlotFor() 方法从索引数据中查找确定消息数据读取的起始位置
def lookup(targetOffset: Long): OffsetPosition = {
maybeLock(lock) {
val idx = mmap.duplicate
val slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY)
if(slot == -1)
OffsetPosition(baseOffset, 0)
else
parseEntry(idx, slot)
}
}
AbstractIndex.scala#largestLowerBoundSlotFor() 的主要逻辑是从索引数据中二分查找确定消息数据在文件中的物理起始点,这里需要注意索引文件实际进行了冷热分区,其中关键如下:
使用所有索引数据 entry 的总量 _entries 减去热区数据大小_warmEntries,确定一个热区索引的起始位置,这样可以保障只在索引数据的尾部进行二分查找之所以这样处理,是因为 Kafka 的索引是在末尾追加写入的,并且一般写入的数据很快就会被读取,数据热点集中在尾部。索引数据一般都在页缓存中,而操作系统的内存是有限的,必然要通过类似 LRU 的机制淘汰页缓存。如果每次二分查找都从头开始,则索引中间部分的数据所在的页缓存大概率已经被淘汰掉,从而导致缺页中断,必须重新从磁盘上读文件,影响性能
页缓存也叫文件缓冲,是文件系统数据在内存中的缓存结构,Kafka 的消息数据存储也充分利用了页缓存,如果消息写入消费速度相当,则消费时大概率直接命中缓存而不经过磁盘IO,极大提高性能。但是当某个消费者消费速度落后时,可能会导致 Kafka 节点上的页缓存频繁切换,拖累整个集群的性能
protected def largestLowerBoundSlotFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchType): Int =
indexSlotRangeFor(idx, target, searchEntity)._1
private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchType): (Int, Int) = {
// check if the index is empty
if(_entries == 0)
return (-1, -1)
def binarySearch(begin: Int, end: Int) : (Int, Int) = {
// binary search for the entry
var lo = begin
var hi = end
while(lo < hi) {
val mid = (lo + hi + 1) >>> 1
val found = parseEntry(idx, mid)
val compareResult = compareIndexEntry(found, target, searchEntity)
if(compareResult > 0)
hi = mid - 1
else if(compareResult < 0)
lo = mid
else
return (mid, mid)
}
(lo, if (lo == _entries - 1) -1 else lo + 1)
}
val firstHotEntry = Math.max(0, _entries - 1 - _warmEntries)
// check if the target offset is in the warm section of the index
if(compareIndexEntry(parseEntry(idx, firstHotEntry), target, searchEntity) < 0) {
return binarySearch(firstHotEntry, _entries - 1)
}
// check if the target offset is smaller than the least offset
if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
return (-1, 0)
binarySearch(0, firstHotEntry)
}
FileRecords.java#searchForOffsetWithSize() 方法的主要逻辑是根据偏移量找到文件数据中第一个大于该偏移量的消息的物理位点
public LogOffsetPosition searchForOffsetWithSize(long targetOffset, int startingPosition) {
for (FileChannelRecordBatch batch : batchesFrom(startingPosition)) {
long offset = batch.lastOffset();
if (offset >= targetOffset)
return new LogOffsetPosition(offset, batch.position(), batch.sizeInBytes());
}
return null;
}
public Iterable batchesFrom(final int start) {
return () -> batchIterator(start);
}
private AbstractIterator batchIterator(int start) {
final int end;
if (isSlice)
end = this.end;
else
end = this.sizeInBytes();
FileLogInputStream inputStream = new FileLogInputStream(this, start, end);
return new RecordBatchIterator<>(inputStream);
}
FileRecords.java#slice() 方法返回一个使用偏移量逻辑切片的 FileRecords 对象,需注意其内部包含操作文件流的 FileChannel 对象,后续实际进行网络数据发送的时候会调用到 FileRecords.java#writeTo() 方法将流中指定偏移量的数据写到 socket
public FileRecords slice(int position, int size) throws IOException {
int availableBytes = availableBytes(position, size);
int startPosition = this.start + position;
return new FileRecords(file, channel, startPosition, startPosition + availableBytes, true);
}
FileRecords.java#writeTo() 方法如下,实际发送数据时不需要将文件数据拷贝到用户态,而是通过 FileChannel#transferTo() 方法使用零拷贝直接在内核态把数据发送到 socket 缓冲区,极大提高性能
@Override
public long writeTo(TransferableChannel destChannel, long offset, int length) throws IOException {
long newSize = Math.min(channel.size(), end) - start;
int oldSize = sizeInBytes();
if (newSize < oldSize)
throw new KafkaException(String.format(
"Size of FileRecords %s has been truncated during write: old size %d, new size %d",
file.getAbsolutePath(), oldSize, newSize));
long position = start + offset;
long count = Math.min(length, oldSize - offset);
return destChannel.transferFrom(channel, position, count);
}



