栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark源码跟踪(九)就近计算

spark源码跟踪(九)就近计算

Spark就近计算

一,如何实现

1.1,描述1.2,过程 二,Container首选位置放置策略

2.1,规则描述2.2,实例描述 三,本地性任务信息的生成部分源码跟踪

一,如何实现 1.1,描述

hostToLocalTaskCounts:首选机器及其上可能运行的任务数的映射
numLocalityAwareTasks:有本地计算需求的任务数量
YarnAllocator.scala

// A map to store preferred hostname and possible task numbers running on it.
private var hostToLocalTaskCounts: Map[String, Int] = Map.empty

// Number of tasks that have locality preferences in active stages
private[yarn] var numLocalityAwareTasks: Int = 0

Spark就近计算是通过就近申请Container来实现的,根据数据分布情况,生成一个节点与节点可能运行的task数量映射表hostToLocalTaskCounts。Spark使用Container首选位置放置策略根据申请的Container数量,有本地性要求的任务数量,hostToLocalTaskCounts,主机与Container映射表(可用资源数量)等参数,生成ContainerRequest对象提交给yarn申请资源Container。

1.2,过程

1,
ApplicationMaster进程中Main线程中,启动Driver线程,初始化SparkContext对象后Driver阻塞,Main线程唤醒,开始根据用户配置的资源项,尝试向yarn申请资源,此时申请的资源与本地计算毫无关系(Driver线程中才有用户的业务代码)。第一次资源申请结束后(申请资源包括申请到后启动Executor进程),启动Reporter后台线程(setDaemon(true)),在Application结束之前Reporter线程每200ms循环执行一次,维护与yarn的心跳,动态申请资源或者释放不需要的资源,唤醒Driver线程。

2,
Driver线程唤醒后执行用户业务代码,在每个job的每个stage的提交过程中,根据数据分配情况生成hostToLocalTaskCounts和 numLocalityAwareTasks,通过Main线程中的后台通讯模块传递给封装资源申请的YarnAllocator对象,YarnAllocator根据这些信息结合Container首选位置放置策略和资源情况,计算出满足要求的Container申请列表,申请特点的Container和释放不满足要求的Container。

二,Container首选位置放置策略

具体规则实现类为LocalityPreferredContainerPlacementStrategy:

2.1,规则描述

该策略通过考虑待处理task的节点比例、所需的core/containers以及当前已存在和待分配containers的位置来计算YARN containers的最优位置。该算法的目标是最大限度地增加本地运行的tasks数。

2.2,实例描述

假设有这样一个场景,我们有20个tasks需要分配给hosts1,、host2和host3三台主机,10个tasks需要分配给host1、host2和host4,每个container有2个core,每个task占用一个cpu,那么总共需要15个containers,主机比例为(host1: 30, host2: 30, host3: 20, host4: 10),也就是3 : 3 : 2 : 1。

如果请求的container个数(18)比所需container数大(15),对应分配比例如下:
向节点(host1, host2, host3, host4)请求5个containers;
向节点(host1, host2, host3)请求5个containers;
向节点(host1, host2)请求5个containers;
剩下的3个container没有任何本地优先分配;

这种情况下的放置比例为3 : 3 : 2 : 1。

如果请求的container个数(10)比所需container数小(15),对应分配比例如下:
向节点(host1, host2, host3, host4)请求4个containers;
向节点(host1, host2, host3)请求3个containers;
向节点(host1, host2)请求3个containers;

这种情况下的放置比例为10 : 10 : 7 : 4,接近于3 : 3 : 2 : 1

如果存在可用的containers,没有一个可以满足请求的本地性,分配规则遵循上面两种情况。

如果存在可用的containers,而且其中部分containers可以满足请求的本地性。
例如,每个节点上可分配1个满足条件的container:(host1: 1, host2: 1: host3: 1, host4: 1),但是期望每个节点分配的container个数为:(host1: 5, host2: 5, host3: 4, host4: 2),那么每个节点上新请求的容器个数为: (host1: 4, host2: 4, host3: 3, host4: 1)。

如果要请求的containers个数(18)多于要求的containers个数(4+4+3+1=12),遵循规则1,比例为4 : 4 : 3 : 1
如果要请求的containers个数(10)多于要求的containers个数(4+4+3+1=12),遵循规则1,比例为4 : 4 : 3 : 1
如果存在可用的containers,且现有的container本地性可以完全满足所需的本地性要求。
例如,如果每个节点上都有5个container:(host1: 5, host2: 5, host3: 5, host4: 5),可以满足当前请求所需的本地性。

三,本地性任务信息的生成部分源码跟踪

hostToLocalTaskCounts:首选机器及其上运行的可能任务数的映射
numLocalityAwareTasks:有本地计算需求的任务数量

肯定是在stage提交task的地方。
跟踪job提交的过程。
DAGScheduler.scala

一路跟踪,最后到了一个stage中最后还是到了RDD.scala

final def preferredLocations(split: Partition): Seq[String] = {
  checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
    getPreferredLocations(split)
  }
}



protected def getPreferredLocations(split: Partition): Seq[String] = Nil

并不是所有task都有本地计算的需求的,比如内存中创建的RDD。找一个有本地计算需求的RDD如HadoopRDD,查看该方法的重写,可以看出一个task计算的首选位置由其分区中的文件块所在的位置决定。(sparkContext.textFile())


总结:一个RDD partition的最佳计算位置信息获取规则在RDD类中自定义实现,其具体的值在对应的task对象生成时计算确定。这些信息肯定是通过后台通讯系统传递给YarnAllocator,但是在哪调用的还没摸清楚。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/745730.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号