ReplicaManager负责kafka集群中Topic的分区副本数据的同步功能,当集群中发送副本的变化时,如Partition的Replicas发生leader切换的时候, ReplicaManager会接收来自controller的command命令,ReplicaManager会根据command作出不同的操作, 从而完成Replica的管理工作。
ReplicaManager接收的command主要有两种, LeaderAndISRCommand和StopReplicaCommand。需要执行的操作主要有:
1、确定Replica成为leader还是follower,以及成为leader或者follower的后续操作
2、当Replica被删除或者所在的broker离线时,接收到StopReplicaCommand。执行相应的资源清理操作
3、开启定时线程maybeShrinkIsr
Kafka中分区Partition的定义:
class Partition(val topic: String, //topic 名称
val partitionId: Int, //partition Id
time: Time
replicaManager: ReplicaManager) extends Logging with KafkaMetricsGroup {
......
//分配给该Partition的Replica集合
private val assignedReplicaMap = new Pool[Int, Replica]
//处于同步状态的Replica集合(ISR)
var inSyncReplicas: Set[Replica] = Set.empty[Replica]
......
}
Replica的定义:
class Replica(val brokerId: Int, //该Replica位于哪个brokerId
val partition: Partition, //对应的Partition
time: Time = SystemTime,
initialHighWatermarkValue: Long = 0L,
//对应的Log,日志和索引数据
val log: Option[Log] = None) extends Logging {
}
Kafka中一个Partition有多个Replica,这些Replica中有一个是Leader,其余的是Follower,Leader接收生成者发来的message并写入Log,Follower定时从Leader拉取新增的message并写本地的Log。Follower主要是通过运行ReplicaFetcherThread(副本数据拉取线程)和High Watermark Mechanism(高水位线机制)来同步partition中的消息。
除此之外,还有一个重要的概念,ISR:表示所有处于同步状态的副本的集合,包括一个leader副本和若干个Follower
ReplicaFetcherThread继承自AbstractFetcherThread,AbstractFetcherThread内部的处理如下:
abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int,
fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1,
isInterruptible: Boolean = true)
extends ShutdownableThread(name, isInterruptible) {
private val partitionMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset map
......
override def doWork() {
inLock(partitionMapLock) {
if (partitionMap.isEmpty)
partitionMapCond.await(200L, TimeUnit.MILLISECONDS)
//partitionMap存放了该fetch线程所负责拉取的TopicAndPartition,因此遍历partitionMap就可以生成对应的fetchRequest
partitionMap.foreach {
case((topicAndPartition, offset)) =>
fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition,
offset, fetchSize)
}
}
val fetchRequest = fetchRequestBuilder.build()
if (!fetchRequest.requestInfo.isEmpty)
//发送和处理fetchRequest请求,如果正常,进入processPartitionData流程
processFetchRequest(fetchRequest)
}
......
}
AbstractFetcherThread继承自ShutdownableThread,其内部是死循环调用doWork方法,这样就可以不断的从leader获取message。发送和处理fetchRequest请求的函数如下:
private def processFetchRequest(fetchRequest: FetchRequest) {
val partitionsWithError = new mutable.HashSet[TopicAndPartition]
var response: FetchResponse = null
try {
//发送fetchRequest给leader,并获取leader返回的结果
response = simpleConsumer.fetch(fetchRequest)
} catch {
......
}
fetcherStats.requestRate.mark()
if (response != null) {
//处理返回的结果
inLock(partitionMapLock) {
response.data.foreach {
case(topicAndPartition, partitionData) =>
val (topic, partitionId) = topicAndPartition.asTuple
val currentOffset = partitionMap.get(topicAndPartition)
// 如果offset没有问题,就将message写入log
if (currentOffset.isDefined && fetchRequest.requestInfo(topicAndPartition).offset == currentOffset.get) {
partitionData.error match {
case ErrorMapping.NoError =>
try {
val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet]
val validBytes = messages.validBytes
val newOffset = messages.shallowIterator.toSeq.lastOption match {
case Some(m: MessageAndOffset) => m.nextOffset
case None => currentOffset.get
}
partitionMap.put(topicAndPartition, newOffset)
fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset
fetcherStats.byteRate.mark(validBytes)
// 处理partitionData
processPartitionData(topicAndPartition, currentOffset.get, partitionData)
......
}
其中处理获取到的数据是通过类ReplicaFetcherThread中的函数processPartitionData来实现的。
// process fetched data
def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitiondata: FetchResponsePartitionData) {
try {
val topic = topicAndPartition.topic
val partitionId = topicAndPartition.partition
val replica = replicaMgr.getReplica(topic, partitionId).get
val messageSet = partitionData.messages.asInstanceOf[ByteBufferMessageSet]
//如果请求的fetchOffset不一致,抛出异常
if (fetchOffset != replica.logEndOffset.messageOffset)
throw new RuntimeException("Offset mismatch: fetched offset = %d, log end offset = %d.".format(fetchOffset, replica.logEndOffset.messageOffset))
将messageSet写入log
replica.log.get.append(messageSet, assignOffsets = false)
//更新当前partition的highWaterMark
val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.hw)
replica.highWatermark = new LogOffsetmetadata(followerHighWatermark)
} catch {
case e: KafkaStorageException =>
fatal("Disk error while replicating data.", e)
Runtime.getRuntime.halt(1)
}
}
上面的流程主要有:
1、将拉取的消息写入log
2、更新当前partition的HighWaterMark
HighWaterMark即高水位,表示当前ISR中所有的replicas的last commited message的最小起始偏移量,即这个偏移量之前的数据都已经同步给这个ISR中的所有replicas,但是这个偏移量后面的数据只被一部分replicas所接收。
如下图
LEO:即日志末端位移(log end offset),记录了该副本底层日志(log)中下一条消息的位移值。注意是下一条消息!也就是说,如果LEO=10,那么表示该副本保存了10条消息,位移值范围是[0, 9]。另外,leader LEO和follower LEO的更新是有区别的。HighWaterMark:即上面提到的高水位。对于同一个副本对象而言,其HW值不会大于LEO值。小于等于HighWaterMark值的所有消息都被认为是“已备份”的(replicated)
Replication的高水位的更新流程如下:
1、Leader状态的Replica在接收到follower的FetchRequest请求时,会选择性更新HighWaterMark;
2、follower状态的Replica在接收到leader返回的FetchResponse时,会选择性更新HighWaterMark。



