栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Kafka的副本管理模块ReplicaManager源码解析

Kafka的副本管理模块ReplicaManager源码解析

ReplicaManager负责kafka集群中Topic的分区副本数据的同步功能,当集群中发送副本的变化时,如Partition的Replicas发生leader切换的时候, ReplicaManager会接收来自controller的command命令,ReplicaManager会根据command作出不同的操作, 从而完成Replica的管理工作。

ReplicaManager接收的command主要有两种, LeaderAndISRCommand和StopReplicaCommand。需要执行的操作主要有:

1、确定Replica成为leader还是follower,以及成为leader或者follower的后续操作

2、当Replica被删除或者所在的broker离线时,接收到StopReplicaCommand。执行相应的资源清理操作

3、开启定时线程maybeShrinkIsr

Kafka中分区Partition的定义:

class Partition(val topic: String,  //topic 名称
                val partitionId: Int,  //partition Id
                time: Time
                replicaManager: ReplicaManager) extends Logging with KafkaMetricsGroup {
   ......
   //分配给该Partition的Replica集合
   private val assignedReplicaMap = new Pool[Int, Replica]
   //处于同步状态的Replica集合(ISR)
   var inSyncReplicas: Set[Replica] = Set.empty[Replica]
   ......
}

Replica的定义:

class Replica(val brokerId: Int,   //该Replica位于哪个brokerId
              val partition: Partition,  //对应的Partition
              time: Time = SystemTime,
              initialHighWatermarkValue: Long = 0L,  
              //对应的Log,日志和索引数据
              val log: Option[Log] = None) extends Logging {
}

Kafka中一个Partition有多个Replica,这些Replica中有一个是Leader,其余的是Follower,Leader接收生成者发来的message并写入Log,Follower定时从Leader拉取新增的message并写本地的Log。Follower主要是通过运行ReplicaFetcherThread(副本数据拉取线程)和High Watermark Mechanism(高水位线机制)来同步partition中的消息。

除此之外,还有一个重要的概念,ISR:表示所有处于同步状态的副本的集合,包括一个leader副本和若干个Follower

ReplicaFetcherThread继承自AbstractFetcherThread,AbstractFetcherThread内部的处理如下:

abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int,
                                     fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1,
                                     isInterruptible: Boolean = true)
  extends ShutdownableThread(name, isInterruptible) {
  private val partitionMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset map
  
  ......
  override def doWork() {
    inLock(partitionMapLock) {
      if (partitionMap.isEmpty)
        partitionMapCond.await(200L, TimeUnit.MILLISECONDS)
     //partitionMap存放了该fetch线程所负责拉取的TopicAndPartition,因此遍历partitionMap就可以生成对应的fetchRequest
      partitionMap.foreach {
        case((topicAndPartition, offset)) =>
          fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition,
                           offset, fetchSize)
      }
    }

   val fetchRequest = fetchRequestBuilder.build()
    if (!fetchRequest.requestInfo.isEmpty)
     //发送和处理fetchRequest请求,如果正常,进入processPartitionData流程
      processFetchRequest(fetchRequest)
  }
 ......
}

AbstractFetcherThread继承自ShutdownableThread,其内部是死循环调用doWork方法,这样就可以不断的从leader获取message。发送和处理fetchRequest请求的函数如下:

private def processFetchRequest(fetchRequest: FetchRequest) {
    val partitionsWithError = new mutable.HashSet[TopicAndPartition]
    var response: FetchResponse = null
    try {
      //发送fetchRequest给leader,并获取leader返回的结果
      response = simpleConsumer.fetch(fetchRequest)
    } catch {
       ......
    }
    fetcherStats.requestRate.mark()

    if (response != null) {
      //处理返回的结果
      inLock(partitionMapLock) {
        response.data.foreach {
          case(topicAndPartition, partitionData) =>
            val (topic, partitionId) = topicAndPartition.asTuple
            val currentOffset = partitionMap.get(topicAndPartition)
            // 如果offset没有问题,就将message写入log
            if (currentOffset.isDefined && fetchRequest.requestInfo(topicAndPartition).offset == currentOffset.get) {
              partitionData.error match {
                case ErrorMapping.NoError =>
                  try {
                    val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet]
                    val validBytes = messages.validBytes
                    val newOffset = messages.shallowIterator.toSeq.lastOption match {
                      case Some(m: MessageAndOffset) => m.nextOffset
                      case None => currentOffset.get
                    }
                    partitionMap.put(topicAndPartition, newOffset)
                    fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset
                    fetcherStats.byteRate.mark(validBytes)
                    // 处理partitionData
                    processPartitionData(topicAndPartition, currentOffset.get, partitionData)
......
}

其中处理获取到的数据是通过类ReplicaFetcherThread中的函数processPartitionData来实现的。

// process fetched data
  def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitiondata: FetchResponsePartitionData) {
    try {
      val topic = topicAndPartition.topic
      val partitionId = topicAndPartition.partition
      val replica = replicaMgr.getReplica(topic, partitionId).get
      val messageSet = partitionData.messages.asInstanceOf[ByteBufferMessageSet]
       //如果请求的fetchOffset不一致,抛出异常
      if (fetchOffset != replica.logEndOffset.messageOffset)
        throw new RuntimeException("Offset mismatch: fetched offset = %d, log end offset = %d.".format(fetchOffset, replica.logEndOffset.messageOffset))
      
     将messageSet写入log
      replica.log.get.append(messageSet, assignOffsets = false)
      //更新当前partition的highWaterMark
      val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.hw)
      replica.highWatermark = new LogOffsetmetadata(followerHighWatermark)

    } catch {
      case e: KafkaStorageException =>
        fatal("Disk error while replicating data.", e)
        Runtime.getRuntime.halt(1)
    }
  }

上面的流程主要有:

        1、将拉取的消息写入log

        2、更新当前partition的HighWaterMark

HighWaterMark即高水位,表示当前ISR中所有的replicas的last commited message的最小起始偏移量,即这个偏移量之前的数据都已经同步给这个ISR中的所有replicas,但是这个偏移量后面的数据只被一部分replicas所接收。

如下图

LEO:即日志末端位移(log end offset),记录了该副本底层日志(log)中下一条消息的位移值。注意是下一条消息!也就是说,如果LEO=10,那么表示该副本保存了10条消息,位移值范围是[0, 9]。另外,leader LEO和follower LEO的更新是有区别的。HighWaterMark:即上面提到的高水位。对于同一个副本对象而言,其HW值不会大于LEO值。小于等于HighWaterMark值的所有消息都被认为是“已备份”的(replicated)

Replication的高水位的更新流程如下:

  1、Leader状态的Replica在接收到follower的FetchRequest请求时,会选择性更新HighWaterMark;

 2、follower状态的Replica在接收到leader返回的FetchResponse时,会选择性更新HighWaterMark。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/710269.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号