1、Yarn的调度本地性是指将作业分配到数据所在节点,可以减少很多网络IO,对MR作业来说,只有map task有本地性需求,reduce task和failed map task都没有本地性需求
2、Yarn的调度本地性是通过延迟调度来满足的,本地性有3个级别:节点本地、机架本地和随意调度,当调度不能满足本地性时,调度器会计算错过的调度机会数量,并等待该计数达到阈值,然后将本地性约束放宽到下一个级别
3、为了调度到满足数据本地性的节点,可以错过一定数量的调度机会,这个错过机会数量的阈值由以下参数控制:
FairScheduler
// 配置为浮点数,最终错过的节点数为配置 * 集群总节点数 yarn.scheduler.fair.locality.threshold.node 默认为-1.0f yarn.scheduler.fair.locality.threshold.rack 默认为-1.0f
CapacityScheduler
// 配置为正整数,即最终错过的节点数 yarn.scheduler.capacity.node-locality-delay 默认为40 yarn.scheduler.capacity.rack-locality-additional-delay 默认为-1
4、如果 YARN 与文件系统分开部署,则应禁用此功能,因为本地性没有意义,将以上参数设置为-1即可禁用本地调度功能
container请求事件的产生TaskAttemptImpl.java
taskAttempt.eventHandler.handle(new ContainerRequestEvent(
taskAttempt.attemptId, taskAttempt.resourceCapability,
taskAttempt.dataLocalHosts.toArray(
new String[taskAttempt.dataLocalHosts.size()]),
taskAttempt.dataLocalRacks.toArray(
new String[taskAttempt.dataLocalRacks.size()])));
dataLocalHosts和dataLocalRacks来源于MapTaskAttemptImpl.java
public MapTaskAttemptImpl(TaskId taskId, int attempt,
EventHandler eventHandler, Path jobFile,
int partition, TaskSplitmetaInfo splitInfo, JobConf conf,
TaskAttemptListener taskAttemptListener,
Token jobToken,
Credentials credentials, Clock clock,
AppContext appContext) {
super(taskId, attempt, eventHandler,
// splitInfo.getLocations()就是输入数据split所在节点
taskAttemptListener, jobFile, partition, conf, splitInfo.getLocations(),
jobToken, credentials, clock, appContext);
this.splitInfo = splitInfo;
}
接下来是RMContainerAllocator.java,处理container请求事件
private void handleMapContainerRequest(ContainerRequestEvent reqEvent) {
...
if(mapContainerRequestAccepted) {
// set the resources
reqEvent.getCapability().setMemorySize(
mapResourceRequest.getMemorySize());
reqEvent.getCapability().setVirtualCores(
mapResourceRequest.getVirtualCores());
// 添加map
scheduledRequests.addMap(reqEvent); //maps are immediately scheduled
} else {
...
}
}
scheduledRequests.addMap方法
void addMap(ContainerRequestEvent event) {
ContainerRequest request = null;
...
} else {
// 创建container请求
request =
new ContainerRequest(event, PRIORITY_MAP, mapNodeLabelexpression);
// 将数据所在节点与task attempt相对应,是为了之后的container分配节点本地性
for (String host : event.getHosts()) {
linkedList list = mapsHostMapping.get(host);
if (list == null) {
list = new linkedList();
mapsHostMapping.put(host, list);
}
list.add(event.getAttemptID());
if (LOG.isDebugEnabled()) {
LOG.debug("Added attempt req to host " + host);
}
}
// 将机架与task attempt相对应,是为了之后的container分配机架本地性
for (String rack : event.getRacks()) {
linkedList list = mapsRackMapping.get(rack);
if (list == null) {
list = new linkedList();
mapsRackMapping.put(rack, list);
}
list.add(event.getAttemptID());
if (LOG.isDebugEnabled()) {
LOG.debug("Added attempt req to rack " + rack);
}
}
maps.put(event.getAttemptID(), request);
// 调用addContainerReq方法
addContainerReq(request);
}
...
}
}
RMContainerRequestor.java
addContainerReq方法
protected void addContainerReq(ContainerRequest req) {
// Create resource requests
for (String host : req.hosts) {
// Data-local
if (!isNodeBlacklisted(host)) {
// 添加本地节点资源请求
addResourceRequest(req.priority, host, req.capability,
null);
}
}
// Nothing Rack-local for now
for (String rack : req.racks) {
// 添加本地机架资源请求
addResourceRequest(req.priority, rack, req.capability,
null);
}
// 添加任意位置资源请求
addResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
req.nodeLabelexpression);
}
最终会调用addResourceRequestToAsk方法将资源请求添加进ask
private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
...
ask.remove(remoteRequest);
ask.add(remoteRequest);
}
资源申请过程
AM通过RMContainerAllocator.heartbeat()来对所需资源进行申请,并分配资源:
protected synchronized void heartbeat() throws Exception {
scheduleStats.updateAndLogIfChanged("Before Scheduling: ");
// 申请资源
List allocatedContainers = getResources();
if (allocatedContainers != null && allocatedContainers.size() > 0) {
// 分配资源
scheduledRequests.assign(allocatedContainers);
}
...
}
getResources()会调用RMContainerRequestor的makeRemoteRequest()方法,这里会把之前存入ask中的资源请求取出并发送
protected AllocateResponse makeRemoteRequest() throws YarnException,
IOException {
...
// 从ask中取出资源请求
AllocateRequest allocateRequest =
AllocateRequest.newInstance(lastResponseID,
super.getApplicationProgress(), new ArrayList(ask),
new ArrayList(release), blacklistRequest);
// 发送资源请求
AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
...
return allocateResponse;
}
接下来的链路如下:
scheduler.allocate -> ApplicationMasterProtocol.allocate() -> ApplicationMasterService.allocate() -> AMSProcessingChain.allocate() -> DefaultAMSProcessor.allocate() -> FairScheduler.allocate() or CapacityScheduler.allocate()
DefaultAMSProcessor.allocate()
这里会区分具体使用的调度器
allocation = getScheduler().allocate(appAttemptId, ask,
request.getSchedulingRequests(), release,
blacklistAdditions, blacklistRemovals, containerUpdateRequests);
FairScheduler的资源分配过程
FairScheduler.allocate()
这个方法比较长,关键的逻辑是把资源请求ask更新到FSAppAttempt,并从FSAppAttempt里获取新的container
public Allocation allocate(ApplicationAttemptId appAttemptId,
List ask, List schedulingRequests,
List release, List blacklistAdditions,
List blacklistRemovals, ContainerUpdates updateRequests) {
// Make sure this application exists
FSAppAttempt application = getSchedulerApp(appAttemptId);
...
// Update application requests
application.updateResourceRequests(ask);
...
List newlyAllocatedContainers =
application.pullNewlyAllocatedContainers();
...
return new Allocation(newlyAllocatedContainers, headroom,
preemptionContainerIds, null, null,
updatedNMTokens, null, null,
application.pullNewlyPromotedContainers(),
application.pullNewlyDemotedContainers(),
previousAttemptContainers, null);
}
FSAppAttempt.java
接下来是FairScheduler接收节点心跳最终分配container的逻辑
private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
...
Collection keysToTry = (reserved) ?
Collections.singletonList(
node.getReservedContainer().getReservedSchedulerKey()) :
getSchedulerKeys();
writeLock.lock();
try {
// 这里schedulerKey是请求类型的封装(eg : map、reduce、failed map)
for (SchedulerRequestKey schedulerKey : keysToTry) {
...
// 调度机会+1,用来计算为实现本地性而放弃的调度次数
addSchedulingOpportunity(schedulerKey);
// 获取当前节点所在机架上等待的资源请求
PendingAsk rackLocalPendingAsk = getPendingAsk(schedulerKey,
node.getRackName());
// 获取当前节点上等待的资源请求
PendingAsk nodeLocalPendingAsk = getPendingAsk(schedulerKey,
node.getNodeName());
...
// 允许的本地化级别
NodeType allowedLocality;
// isContinuousSchedulingEnabled由参数yarn.scheduler.fair.continuous-scheduling-enabled决定,默认为false
if (scheduler.isContinuousSchedulingEnabled()) {
allowedLocality = getAllowedLocalityLevelByTime(schedulerKey,
scheduler.getNodeLocalityDelayMs(),
scheduler.getRackLocalityDelayMs(),
scheduler.getClock().getTime());
} else {
// 调用getAllowedLocalityLevel方法
allowedLocality = getAllowedLocalityLevel(schedulerKey,
scheduler.getNumClusterNodes(),
// yarn.scheduler.fair.locality.threshold.node 默认为-1.0f
scheduler.getNodeLocalityThreshold(),
// yarn.scheduler.fair.locality.threshold.rack 默认为-1.0f
scheduler.getRackLocalityThreshold());
}
// 如果当前节点存在资源请求,则直接分配container,实现了节点本地性调度
if (rackLocalPendingAsk.getCount() > 0
&& nodeLocalPendingAsk.getCount() > 0) {
if (LOG.isTraceEnabled()) {
LOG.trace("Assign container on " + node.getNodeName()
+ " node, assignType: NODE_LOCAL" + ", allowedLocality: "
+ allowedLocality + ", priority: " + schedulerKey.getPriority()
+ ", app attempt id: " + this.attemptId);
}
return assignContainer(node, nodeLocalPendingAsk, NodeType.NODE_LOCAL,
reserved, schedulerKey);
}
if (!appSchedulingInfo.canDelayTo(schedulerKey, node.getRackName())) {
continue;
}
// 如果当前节点所在机架存在资源请求,且allowedLocality为RACK_LOCAL或OFF_SWITCH,则分配container,实现了机架本地性调度
if (rackLocalPendingAsk.getCount() > 0
&& (allowedLocality.equals(NodeType.RACK_LOCAL) || allowedLocality
.equals(NodeType.OFF_SWITCH))) {
if (LOG.isTraceEnabled()) {
LOG.trace("Assign container on " + node.getNodeName()
+ " node, assignType: RACK_LOCAL" + ", allowedLocality: "
+ allowedLocality + ", priority: " + schedulerKey.getPriority()
+ ", app attempt id: " + this.attemptId);
}
return assignContainer(node, rackLocalPendingAsk, NodeType.RACK_LOCAL,
reserved, schedulerKey);
}
// 获取所有等待的资源请求
PendingAsk offswitchAsk = getPendingAsk(schedulerKey,
ResourceRequest.ANY);
if (!appSchedulingInfo.canDelayTo(schedulerKey, ResourceRequest.ANY)) {
continue;
}
if (offswitchAsk.getCount() > 0) {
// 如果allowedLocality为OFF_SWITCH,则分配container,此时没有实现本地调度,为随机调度
if (getAppPlacementAllocator(schedulerKey).getUniqueLocationAsks()
<= 1 || allowedLocality.equals(NodeType.OFF_SWITCH)) {
if (LOG.isTraceEnabled()) {
LOG.trace("Assign container on " + node.getNodeName()
+ " node, assignType: OFF_SWITCH" + ", allowedLocality: "
+ allowedLocality + ", priority: "
+ schedulerKey.getPriority()
+ ", app attempt id: " + this.attemptId);
}
return assignContainer(node, offswitchAsk, NodeType.OFF_SWITCH,
reserved, schedulerKey);
}
}
...
}
} finally {
writeLock.unlock();
}
return Resources.none();
}
getAllowedLocalityLevel方法
NodeType getAllowedLocalityLevel(
SchedulerRequestKey schedulerKey, int numNodes,
double nodeLocalityThreshold, double rackLocalityThreshold) {
// nodeLocalityThreshold和rackLocalityThreshold上限为1
if (nodeLocalityThreshold > 1.0) {
nodeLocalityThreshold = 1.0;
}
if (rackLocalityThreshold > 1.0) {
rackLocalityThreshold = 1.0;
}
// 如果nodeLocalityThreshold或rackLocalityThreshold小于0,不启用本地调度,可以调度到任意节点
// If delay scheduling is not being used, can schedule anywhere
if (nodeLocalityThreshold < 0.0 || rackLocalityThreshold < 0.0) {
return NodeType.OFF_SWITCH;
}
writeLock.lock();
try {
// 默认级别为节点本地 NODE_LOCAL
if (!allowedLocalityLevel.containsKey(schedulerKey)) {
allowedLocalityLevel.put(schedulerKey, NodeType.NODE_LOCAL);
return NodeType.NODE_LOCAL;
}
NodeType allowed = allowedLocalityLevel.get(schedulerKey);
// 如果已经是随机调度OFF_SWITCH,直接返回
if (allowed.equals(NodeType.OFF_SWITCH)) {
return NodeType.OFF_SWITCH;
}
double threshold = allowed.equals(NodeType.NODE_LOCAL) ?
nodeLocalityThreshold :
rackLocalityThreshold;
// 当放弃的调度机会超出阈值时,降低本地性级别
int schedulingOpportunities = getSchedulingOpportunities(schedulerKey);
double thresholdNum = numNodes * threshold;
if (schedulingOpportunities > thresholdNum) {
// 节点本地降级到机架本地
if (allowed.equals(NodeType.NODE_LOCAL)) {
if (LOG.isTraceEnabled()) {
LOG.trace("SchedulingOpportunities: " + schedulingOpportunities
+ ", nodeLocalityThreshold: " + thresholdNum
+ ", change allowedLocality from NODE_LOCAL to RACK_LOCAL"
+ ", priority: " + schedulerKey.getPriority()
+ ", app attempt id: " + this.attemptId);
}
allowedLocalityLevel.put(schedulerKey, NodeType.RACK_LOCAL);
resetSchedulingOpportunities(schedulerKey);
// 机架本地降级到随意调度
} else if (allowed.equals(NodeType.RACK_LOCAL)) {
if (LOG.isTraceEnabled()) {
LOG.trace("SchedulingOpportunities: " + schedulingOpportunities
+ ", rackLocalityThreshold: " + thresholdNum
+ ", change allowedLocality from RACK_LOCAL to OFF_SWITCH"
+ ", priority: " + schedulerKey.getPriority()
+ ", app attempt id: " + this.attemptId);
}
allowedLocalityLevel.put(schedulerKey, NodeType.OFF_SWITCH);
resetSchedulingOpportunities(schedulerKey);
}
}
return allowedLocalityLevel.get(schedulerKey);
} finally {
writeLock.unlock();
}
}



