KafkaApis模块是Kafka中负责不同业务请求的具体实现逻辑,本文主要讲一下KafkaApis处理FetchRequest请求的流程。
当状态为Follower的Replica向状态为Leader的Replica同步数据或者消费者获取数据时,Replica会发送FetchRequest给Leader所在的Broker Server,Broker Server在接收到FetchRequest请求时,会返回相应的数据,同时还会根据情况更新对应的元数据。其详细的过程如下:
def handleFetchRequest(request: RequestChannel.Request) {
val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
//根据fetchRequest获取指定的数据
val dataRead = replicaManager.readMessageSets(fetchRequest)
//如果更新请求是来自follower,还需要更新元数据详细以及响应那些延迟的DelayedFetch和DelayedProduce
if(fetchRequest.isFromFollower)
recordFollowerLogEndOffsets(fetchRequest.replicaId, dataRead.mapValues(_.offset))
// 统计获取的数据量
val bytesReadable = dataRead.values.map(_.data.messages.sizeInBytes).sum
// 统计异常
val errorReadingData = dataRead.values.foldLeft(false)((errorIncurred, dataAndOffset) =>
errorIncurred || (dataAndOffset.data.error != ErrorMapping.NoError))
replicaManager.updateReplicaLEOAndPartitionHW(topicAndPartition.topic,
topicAndPartition.partition, replicaId, offset)
replicaManager.unblockDelayedProduceRequests(topicAndPartition)
}
}
对于后者主要是判断当前是否有足够的数据可以拉取,如果有足够的数据可以拉取,则相应那些DelayedProduceRequest,其详细过程如下:
def isSatisfied(replicaManager: ReplicaManager) : Boolean = {
var accumulatedSize = 0
val fromFollower = fetch.isFromFollower
partitionFetchOffsets.foreach {
case (topicAndPartition, fetchOffset) =>
try {
if (fetchOffset != LogOffsetmetadata.UnknownOffsetmetadata) {
//获取leader replica
val replica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
return true
} else if (fetchOffset.offsetonOlderSegment(endOffset)) {
return true
} else if (fetchOffset.precedes(endOffset)) {
//在当前segment上,并且fetchoffset < endoffset,说明有数据可以获取
accumulatedSize += endOffset.positionDiff(fetchOffset)
}
}
} catch {
case utpe: UnknownTopicOrPartitionException => // Case A
return true
case nle: NotLeaderForPartitionException => // Case B
return true
}
}
// 统计可以拉取的数据量是否满足要求
accumulatedSize >= fetch.minBytes
}
在kafka集群正常运行过程中,fetchOffset和endOffset都位于当前Segment上,因此主要统计可以fetch的数据量是否满足最小数据量的要求,只要满足,就可以响应那些DelayedFetchRequest。



