栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

KafkaApis处理FetchRequest请求源码解析

KafkaApis处理FetchRequest请求源码解析

KafkaApis模块是Kafka中负责不同业务请求的具体实现逻辑,本文主要讲一下KafkaApis处理FetchRequest请求的流程。

当状态为Follower的Replica向状态为Leader的Replica同步数据或者消费者获取数据时,Replica会发送FetchRequest给Leader所在的Broker Server,Broker Server在接收到FetchRequest请求时,会返回相应的数据,同时还会根据情况更新对应的元数据。其详细的过程如下:

def handleFetchRequest(request: RequestChannel.Request) {
    val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
    //根据fetchRequest获取指定的数据
    val dataRead = replicaManager.readMessageSets(fetchRequest)

    //如果更新请求是来自follower,还需要更新元数据详细以及响应那些延迟的DelayedFetch和DelayedProduce
    if(fetchRequest.isFromFollower)
      recordFollowerLogEndOffsets(fetchRequest.replicaId, dataRead.mapValues(_.offset))

    // 统计获取的数据量
    val bytesReadable = dataRead.values.map(_.data.messages.sizeInBytes).sum
    // 统计异常
    val errorReadingData = dataRead.values.foldLeft(false)((errorIncurred, dataAndOffset) =>
      errorIncurred || (dataAndOffset.data.error != ErrorMapping.NoError))
    
        replicaManager.updateReplicaLEOAndPartitionHW(topicAndPartition.topic,
          topicAndPartition.partition, replicaId, offset)

       
        replicaManager.unblockDelayedProduceRequests(topicAndPartition)
    }
  }

对于后者主要是判断当前是否有足够的数据可以拉取,如果有足够的数据可以拉取,则相应那些DelayedProduceRequest,其详细过程如下:

def isSatisfied(replicaManager: ReplicaManager) : Boolean = {
    var accumulatedSize = 0
    val fromFollower = fetch.isFromFollower
    partitionFetchOffsets.foreach {
      case (topicAndPartition, fetchOffset) =>
        try {
          if (fetchOffset != LogOffsetmetadata.UnknownOffsetmetadata) {
            //获取leader replica
            val replica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
            
              return true
            } else if (fetchOffset.offsetonOlderSegment(endOffset)) {
              
            
              return true
            } else if (fetchOffset.precedes(endOffset)) {
              //在当前segment上,并且fetchoffset < endoffset,说明有数据可以获取
              accumulatedSize += endOffset.positionDiff(fetchOffset)
            }
          }
        } catch {
          case utpe: UnknownTopicOrPartitionException => // Case A
            return true
          case nle: NotLeaderForPartitionException =>  // Case B
            return true
        }
    }

    // 统计可以拉取的数据量是否满足要求
    accumulatedSize >= fetch.minBytes
  }

在kafka集群正常运行过程中,fetchOffset和endOffset都位于当前Segment上,因此主要统计可以fetch的数据量是否满足最小数据量的要求,只要满足,就可以响应那些DelayedFetchRequest。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/728656.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号