栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Hadoop Yarn作业调度本地性源码分析

Hadoop Yarn作业调度本地性源码分析

1、Yarn的调度本地性是指将作业分配到数据所在节点,可以减少很多网络IO,对MR作业来说,只有map task有本地性需求,reduce task和failed map task都没有本地性需求

2、Yarn的调度本地性是通过延迟调度来满足的,本地性有3个级别:节点本地、机架本地和随意调度,当调度不能满足本地性时,调度器会计算错过的调度机会数量,并等待该计数达到阈值,然后将本地性约束放宽到下一个级别

3、为了调度到满足数据本地性的节点,可以错过一定数量的调度机会,这个错过机会数量的阈值由以下参数控制:
FairScheduler

// 配置为浮点数,最终错过的节点数为配置 * 集群总节点数
yarn.scheduler.fair.locality.threshold.node 默认为-1.0f
yarn.scheduler.fair.locality.threshold.rack 默认为-1.0f

CapacityScheduler

// 配置为正整数,即最终错过的节点数
yarn.scheduler.capacity.node-locality-delay 默认为40
yarn.scheduler.capacity.rack-locality-additional-delay 默认为-1

4、如果 YARN 与文件系统分开部署,则应禁用此功能,因为本地性没有意义,将以上参数设置为-1即可禁用本地调度功能

container请求事件的产生

TaskAttemptImpl.java

taskAttempt.eventHandler.handle(new ContainerRequestEvent(
            taskAttempt.attemptId, taskAttempt.resourceCapability,
            taskAttempt.dataLocalHosts.toArray(
                new String[taskAttempt.dataLocalHosts.size()]),
            taskAttempt.dataLocalRacks.toArray(
                new String[taskAttempt.dataLocalRacks.size()])));

dataLocalHosts和dataLocalRacks来源于MapTaskAttemptImpl.java

public MapTaskAttemptImpl(TaskId taskId, int attempt, 
      EventHandler eventHandler, Path jobFile, 
      int partition, TaskSplitmetaInfo splitInfo, JobConf conf,
      TaskAttemptListener taskAttemptListener, 
      Token jobToken,
      Credentials credentials, Clock clock,
      AppContext appContext) {
    super(taskId, attempt, eventHandler, 
        // splitInfo.getLocations()就是输入数据split所在节点
        taskAttemptListener, jobFile, partition, conf, splitInfo.getLocations(),
        jobToken, credentials, clock, appContext);
    this.splitInfo = splitInfo;
  }

接下来是RMContainerAllocator.java,处理container请求事件

private void handleMapContainerRequest(ContainerRequestEvent reqEvent) {
    ...

    if(mapContainerRequestAccepted) {
      // set the resources
      reqEvent.getCapability().setMemorySize(
          mapResourceRequest.getMemorySize());
      reqEvent.getCapability().setVirtualCores(
          mapResourceRequest.getVirtualCores());
      // 添加map
      scheduledRequests.addMap(reqEvent); //maps are immediately scheduled
    } else {
      ...
    }
  }

scheduledRequests.addMap方法

void addMap(ContainerRequestEvent event) {
      ContainerRequest request = null;
      
      ...
        } else {
          // 创建container请求
          request =
              new ContainerRequest(event, PRIORITY_MAP, mapNodeLabelexpression);
          // 将数据所在节点与task attempt相对应,是为了之后的container分配节点本地性
          for (String host : event.getHosts()) {
            linkedList list = mapsHostMapping.get(host);
            if (list == null) {
              list = new linkedList();
              mapsHostMapping.put(host, list);
            }
            list.add(event.getAttemptID());
            if (LOG.isDebugEnabled()) {
              LOG.debug("Added attempt req to host " + host);
            }
          }
          // 将机架与task attempt相对应,是为了之后的container分配机架本地性
          for (String rack : event.getRacks()) {
            linkedList list = mapsRackMapping.get(rack);
            if (list == null) {
              list = new linkedList();
              mapsRackMapping.put(rack, list);
            }
            list.add(event.getAttemptID());
            if (LOG.isDebugEnabled()) {
              LOG.debug("Added attempt req to rack " + rack);
            }
          }
          maps.put(event.getAttemptID(), request);
          // 调用addContainerReq方法
          addContainerReq(request);
        }
        ...
      }
    }

RMContainerRequestor.java
addContainerReq方法

protected void addContainerReq(ContainerRequest req) {
    // Create resource requests
    for (String host : req.hosts) {
      // Data-local
      if (!isNodeBlacklisted(host)) {
        // 添加本地节点资源请求
        addResourceRequest(req.priority, host, req.capability,
            null);
      }
    }

    // Nothing Rack-local for now
    for (String rack : req.racks) {
      // 添加本地机架资源请求
      addResourceRequest(req.priority, rack, req.capability,
          null);
    }

    // 添加任意位置资源请求
    addResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
        req.nodeLabelexpression);
  }

最终会调用addResourceRequestToAsk方法将资源请求添加进ask

private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
    ...
    ask.remove(remoteRequest);
    ask.add(remoteRequest);    
  }
资源申请过程

AM通过RMContainerAllocator.heartbeat()来对所需资源进行申请,并分配资源:

protected synchronized void heartbeat() throws Exception {
    scheduleStats.updateAndLogIfChanged("Before Scheduling: ");
    // 申请资源
    List allocatedContainers = getResources();
    if (allocatedContainers != null && allocatedContainers.size() > 0) {
      // 分配资源
      scheduledRequests.assign(allocatedContainers);
    }
    ...
  }

getResources()会调用RMContainerRequestor的makeRemoteRequest()方法,这里会把之前存入ask中的资源请求取出并发送

protected AllocateResponse makeRemoteRequest() throws YarnException,
      IOException {
    ...
    // 从ask中取出资源请求
    AllocateRequest allocateRequest =
        AllocateRequest.newInstance(lastResponseID,
          super.getApplicationProgress(), new ArrayList(ask),
          new ArrayList(release), blacklistRequest);
    // 发送资源请求
    AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
    ...
    return allocateResponse;
  }

接下来的链路如下:
scheduler.allocate -> ApplicationMasterProtocol.allocate() -> ApplicationMasterService.allocate() -> AMSProcessingChain.allocate() -> DefaultAMSProcessor.allocate() -> FairScheduler.allocate() or CapacityScheduler.allocate()

DefaultAMSProcessor.allocate()
这里会区分具体使用的调度器

allocation = getScheduler().allocate(appAttemptId, ask,
            request.getSchedulingRequests(), release,
            blacklistAdditions, blacklistRemovals, containerUpdateRequests);
FairScheduler的资源分配过程

FairScheduler.allocate()
这个方法比较长,关键的逻辑是把资源请求ask更新到FSAppAttempt,并从FSAppAttempt里获取新的container

public Allocation allocate(ApplicationAttemptId appAttemptId,
      List ask, List schedulingRequests,
      List release, List blacklistAdditions,
      List blacklistRemovals, ContainerUpdates updateRequests) {
    // Make sure this application exists
    FSAppAttempt application = getSchedulerApp(appAttemptId);
    ...
	// Update application requests
    application.updateResourceRequests(ask);
    ...
    List newlyAllocatedContainers =
        application.pullNewlyAllocatedContainers();
    ...
	return new Allocation(newlyAllocatedContainers, headroom,
        preemptionContainerIds, null, null,
        updatedNMTokens, null, null,
        application.pullNewlyPromotedContainers(),
        application.pullNewlyDemotedContainers(),
        previousAttemptContainers, null);
}

FSAppAttempt.java
接下来是FairScheduler接收节点心跳最终分配container的逻辑

private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
    ...

    Collection keysToTry = (reserved) ?
        Collections.singletonList(
            node.getReservedContainer().getReservedSchedulerKey()) :
        getSchedulerKeys();

    writeLock.lock();
    try {
      // 这里schedulerKey是请求类型的封装(eg : map、reduce、failed map)
      for (SchedulerRequestKey schedulerKey : keysToTry) {
        ...
        // 调度机会+1,用来计算为实现本地性而放弃的调度次数
        addSchedulingOpportunity(schedulerKey);
        // 获取当前节点所在机架上等待的资源请求
        PendingAsk rackLocalPendingAsk = getPendingAsk(schedulerKey,
            node.getRackName());
        // 获取当前节点上等待的资源请求
        PendingAsk nodeLocalPendingAsk = getPendingAsk(schedulerKey,
            node.getNodeName());

        ...
        // 允许的本地化级别
        NodeType allowedLocality;
        // isContinuousSchedulingEnabled由参数yarn.scheduler.fair.continuous-scheduling-enabled决定,默认为false
        if (scheduler.isContinuousSchedulingEnabled()) {
          allowedLocality = getAllowedLocalityLevelByTime(schedulerKey,
              scheduler.getNodeLocalityDelayMs(),
              scheduler.getRackLocalityDelayMs(),
              scheduler.getClock().getTime());
        } else {
          // 调用getAllowedLocalityLevel方法
          allowedLocality = getAllowedLocalityLevel(schedulerKey,
              scheduler.getNumClusterNodes(),
              // yarn.scheduler.fair.locality.threshold.node 默认为-1.0f
              scheduler.getNodeLocalityThreshold(),
              // yarn.scheduler.fair.locality.threshold.rack 默认为-1.0f
              scheduler.getRackLocalityThreshold());
        }
        
        // 如果当前节点存在资源请求,则直接分配container,实现了节点本地性调度
        if (rackLocalPendingAsk.getCount() > 0
            && nodeLocalPendingAsk.getCount() > 0) {
          if (LOG.isTraceEnabled()) {
            LOG.trace("Assign container on " + node.getNodeName()
                + " node, assignType: NODE_LOCAL" + ", allowedLocality: "
                + allowedLocality + ", priority: " + schedulerKey.getPriority()
                + ", app attempt id: " + this.attemptId);
          }
          return assignContainer(node, nodeLocalPendingAsk, NodeType.NODE_LOCAL,
              reserved, schedulerKey);
        }

        if (!appSchedulingInfo.canDelayTo(schedulerKey, node.getRackName())) {
          continue;
        }
		
		// 如果当前节点所在机架存在资源请求,且allowedLocality为RACK_LOCAL或OFF_SWITCH,则分配container,实现了机架本地性调度
        if (rackLocalPendingAsk.getCount() > 0
            && (allowedLocality.equals(NodeType.RACK_LOCAL) || allowedLocality
            .equals(NodeType.OFF_SWITCH))) {
          if (LOG.isTraceEnabled()) {
            LOG.trace("Assign container on " + node.getNodeName()
                + " node, assignType: RACK_LOCAL" + ", allowedLocality: "
                + allowedLocality + ", priority: " + schedulerKey.getPriority()
                + ", app attempt id: " + this.attemptId);
          }
          return assignContainer(node, rackLocalPendingAsk, NodeType.RACK_LOCAL,
              reserved, schedulerKey);
        }
        // 获取所有等待的资源请求
        PendingAsk offswitchAsk = getPendingAsk(schedulerKey,
            ResourceRequest.ANY);
        if (!appSchedulingInfo.canDelayTo(schedulerKey, ResourceRequest.ANY)) {
          continue;
        }
      
        if (offswitchAsk.getCount() > 0) {
          // 如果allowedLocality为OFF_SWITCH,则分配container,此时没有实现本地调度,为随机调度
          if (getAppPlacementAllocator(schedulerKey).getUniqueLocationAsks()
              <= 1 || allowedLocality.equals(NodeType.OFF_SWITCH)) {
            if (LOG.isTraceEnabled()) {
              LOG.trace("Assign container on " + node.getNodeName()
                  + " node, assignType: OFF_SWITCH" + ", allowedLocality: "
                  + allowedLocality + ", priority: "
                  + schedulerKey.getPriority()
                  + ", app attempt id: " + this.attemptId);
            }
            return assignContainer(node, offswitchAsk, NodeType.OFF_SWITCH,
                reserved, schedulerKey);
          }
        }

        ...
      }
    } finally {
      writeLock.unlock();
    }

    return Resources.none();
  }

getAllowedLocalityLevel方法

NodeType getAllowedLocalityLevel(
      SchedulerRequestKey schedulerKey, int numNodes,
      double nodeLocalityThreshold, double rackLocalityThreshold) {
    // nodeLocalityThreshold和rackLocalityThreshold上限为1
    if (nodeLocalityThreshold > 1.0) {
      nodeLocalityThreshold = 1.0;
    }
    if (rackLocalityThreshold > 1.0) {
      rackLocalityThreshold = 1.0;
    }

    // 如果nodeLocalityThreshold或rackLocalityThreshold小于0,不启用本地调度,可以调度到任意节点
    // If delay scheduling is not being used, can schedule anywhere
    if (nodeLocalityThreshold < 0.0 || rackLocalityThreshold < 0.0) {
      return NodeType.OFF_SWITCH;
    }

    writeLock.lock();
    try {

      // 默认级别为节点本地 NODE_LOCAL
      if (!allowedLocalityLevel.containsKey(schedulerKey)) {
        allowedLocalityLevel.put(schedulerKey, NodeType.NODE_LOCAL);
        return NodeType.NODE_LOCAL;
      }

      NodeType allowed = allowedLocalityLevel.get(schedulerKey);

      // 如果已经是随机调度OFF_SWITCH,直接返回
      if (allowed.equals(NodeType.OFF_SWITCH)) {
        return NodeType.OFF_SWITCH;
      }

      double threshold = allowed.equals(NodeType.NODE_LOCAL) ?
          nodeLocalityThreshold :
          rackLocalityThreshold;

      // 当放弃的调度机会超出阈值时,降低本地性级别
      int schedulingOpportunities = getSchedulingOpportunities(schedulerKey);
      double thresholdNum = numNodes * threshold;
      if (schedulingOpportunities > thresholdNum) {
        // 节点本地降级到机架本地
        if (allowed.equals(NodeType.NODE_LOCAL)) {
          if (LOG.isTraceEnabled()) {
            LOG.trace("SchedulingOpportunities: " + schedulingOpportunities
                + ", nodeLocalityThreshold: " + thresholdNum
                + ", change allowedLocality from NODE_LOCAL to RACK_LOCAL"
                + ", priority: " + schedulerKey.getPriority()
                + ", app attempt id: " + this.attemptId);
          }
          allowedLocalityLevel.put(schedulerKey, NodeType.RACK_LOCAL);
          resetSchedulingOpportunities(schedulerKey);
        // 机架本地降级到随意调度
        } else if (allowed.equals(NodeType.RACK_LOCAL)) {
          if (LOG.isTraceEnabled()) {
            LOG.trace("SchedulingOpportunities: " + schedulingOpportunities
                + ", rackLocalityThreshold: " + thresholdNum
                + ", change allowedLocality from RACK_LOCAL to OFF_SWITCH"
                + ", priority: " + schedulerKey.getPriority()
                + ", app attempt id: " + this.attemptId);
          }
          allowedLocalityLevel.put(schedulerKey, NodeType.OFF_SWITCH);
          resetSchedulingOpportunities(schedulerKey);
        }
      }
      return allowedLocalityLevel.get(schedulerKey);
    } finally {
      writeLock.unlock();
    }
  }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/316622.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号