一,如何实现
1.1,描述1.2,过程 二,Container首选位置放置策略
2.1,规则描述2.2,实例描述 三,本地性任务信息的生成部分源码跟踪
一,如何实现 1.1,描述hostToLocalTaskCounts:首选机器及其上可能运行的任务数的映射
numLocalityAwareTasks:有本地计算需求的任务数量
YarnAllocator.scala
// A map to store preferred hostname and possible task numbers running on it. private var hostToLocalTaskCounts: Map[String, Int] = Map.empty // Number of tasks that have locality preferences in active stages private[yarn] var numLocalityAwareTasks: Int = 0
Spark就近计算是通过就近申请Container来实现的,根据数据分布情况,生成一个节点与节点可能运行的task数量映射表hostToLocalTaskCounts。Spark使用Container首选位置放置策略根据申请的Container数量,有本地性要求的任务数量,hostToLocalTaskCounts,主机与Container映射表(可用资源数量)等参数,生成ContainerRequest对象提交给yarn申请资源Container。
1.2,过程1,
ApplicationMaster进程中Main线程中,启动Driver线程,初始化SparkContext对象后Driver阻塞,Main线程唤醒,开始根据用户配置的资源项,尝试向yarn申请资源,此时申请的资源与本地计算毫无关系(Driver线程中才有用户的业务代码)。第一次资源申请结束后(申请资源包括申请到后启动Executor进程),启动Reporter后台线程(setDaemon(true)),在Application结束之前Reporter线程每200ms循环执行一次,维护与yarn的心跳,动态申请资源或者释放不需要的资源,唤醒Driver线程。
2,
Driver线程唤醒后执行用户业务代码,在每个job的每个stage的提交过程中,根据数据分配情况生成hostToLocalTaskCounts和 numLocalityAwareTasks,通过Main线程中的后台通讯模块传递给封装资源申请的YarnAllocator对象,YarnAllocator根据这些信息结合Container首选位置放置策略和资源情况,计算出满足要求的Container申请列表,申请特点的Container和释放不满足要求的Container。
具体规则实现类为LocalityPreferredContainerPlacementStrategy:
该策略通过考虑待处理task的节点比例、所需的core/containers以及当前已存在和待分配containers的位置来计算YARN containers的最优位置。该算法的目标是最大限度地增加本地运行的tasks数。
2.2,实例描述假设有这样一个场景,我们有20个tasks需要分配给hosts1,、host2和host3三台主机,10个tasks需要分配给host1、host2和host4,每个container有2个core,每个task占用一个cpu,那么总共需要15个containers,主机比例为(host1: 30, host2: 30, host3: 20, host4: 10),也就是3 : 3 : 2 : 1。
如果请求的container个数(18)比所需container数大(15),对应分配比例如下:
向节点(host1, host2, host3, host4)请求5个containers;
向节点(host1, host2, host3)请求5个containers;
向节点(host1, host2)请求5个containers;
剩下的3个container没有任何本地优先分配;
这种情况下的放置比例为3 : 3 : 2 : 1。
如果请求的container个数(10)比所需container数小(15),对应分配比例如下:
向节点(host1, host2, host3, host4)请求4个containers;
向节点(host1, host2, host3)请求3个containers;
向节点(host1, host2)请求3个containers;
这种情况下的放置比例为10 : 10 : 7 : 4,接近于3 : 3 : 2 : 1
如果存在可用的containers,没有一个可以满足请求的本地性,分配规则遵循上面两种情况。
如果存在可用的containers,而且其中部分containers可以满足请求的本地性。
例如,每个节点上可分配1个满足条件的container:(host1: 1, host2: 1: host3: 1, host4: 1),但是期望每个节点分配的container个数为:(host1: 5, host2: 5, host3: 4, host4: 2),那么每个节点上新请求的容器个数为: (host1: 4, host2: 4, host3: 3, host4: 1)。
如果要请求的containers个数(18)多于要求的containers个数(4+4+3+1=12),遵循规则1,比例为4 : 4 : 3 : 1
如果要请求的containers个数(10)多于要求的containers个数(4+4+3+1=12),遵循规则1,比例为4 : 4 : 3 : 1
如果存在可用的containers,且现有的container本地性可以完全满足所需的本地性要求。
例如,如果每个节点上都有5个container:(host1: 5, host2: 5, host3: 5, host4: 5),可以满足当前请求所需的本地性。
hostToLocalTaskCounts:首选机器及其上运行的可能任务数的映射
numLocalityAwareTasks:有本地计算需求的任务数量
肯定是在stage提交task的地方。
跟踪job提交的过程。
DAGScheduler.scala
一路跟踪,最后到了一个stage中最后还是到了RDD.scala
final def preferredLocations(split: Partition): Seq[String] = {
checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
getPreferredLocations(split)
}
}
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
并不是所有task都有本地计算的需求的,比如内存中创建的RDD。找一个有本地计算需求的RDD如HadoopRDD,查看该方法的重写,可以看出一个task计算的首选位置由其分区中的文件块所在的位置决定。(sparkContext.textFile())
总结:一个RDD partition的最佳计算位置信息获取规则在RDD类中自定义实现,其具体的值在对应的task对象生成时计算确定。这些信息肯定是通过后台通讯系统传递给YarnAllocator,但是在哪调用的还没摸清楚。



