- 1. 论文内容概要
- 2. 论文知识点整理
- 2.1 multi-process & multi-thread
- 2.2 Apache资源分配策略
- 2.2.1 static partitioning of resources
- 2.2.2 dynamic sharing of CPU cores
- 2.2.3 Resource Allocation Policy
- 2.2.3.1 Request Policy
- 2.2.3.2 Remove Policy
- 2.3 Apache Spark checkpoint
- 2.4 对executor、task、并行度概念的理解
- 2.5 Linux查看cpu信息
- 2.6 bin packing problem
- 2.7 Apache Spark Heartbeat
- 2.8 Block Manager
- 2.9 Yarn Client & Yarn Cluster
本文章主要考虑到了对于迭代应用,资源需求是随着迭代的次数递减的。因此,提出一种可以扩大、缩减executor数,充分利用资源的机制,很有必要,即iSpark。
Apache Spark只提供了动态增加executor数目的方式,却没有考虑到任务所需要的executor数目并不是一直增加的。更多的情况下,资源的利用率呈现出边际效益递减的规律。因此,如果可以在监听到资源利用率下降的情况下,回收一部分资源,并给另外的任务使用,那么系统整体的资源利用率将会有提升。
具体的,作者把资源分配问题视为装箱问题,因为所考虑的资源情况是多维的,因此,不能直接用降序最佳适应算法解决。作者通过先分析出主导因素,即Algorithm 1中的arg max Uj basd on DFS(Dominat Factor Scheduling)
对于Elastic Provisioning,作者通过增加了iController、iCacheManager、iMetricController、Monitor、iBlockManager,来将中间计算结果保存下来,并作为统一管理,保证了数据一致性(Data Consistency)。
文章原话
The key idea of keeping the data consistency is to preserve the intermediate results in memory before removing a running executor, i.e, to preempt a running processes safely.
在保存之后,iCahceManager会通知DAG scheduler更新RDD信息。
- iMetricCollector collects the real-time information (e.g. CPU and memory metrics) and the operation logic in- formation, i.e. RDD dependency, from DAG scheduler. Monitor will report the resources usage information of corresponding executors to iMetricCollector periodically via the system heartbeat.
- iController makes the provisioning decisions based on the metrics provided by iMetricsCollector, which will further request ExecutorAllocationManager (EAM) to perform the provisioning decision in terms of the number of executors.
- Centralized iCacheManager coordinates with iController to ensure the data consistency. iCacheManager is responsible for managing RDDs, which applies DAG-aware policy to preserve data partitions and updates the related RDD information to DAG scheduler.
- Distributed iBlockManager replicates the data blocks on these executors to be removed based on the provision- ing decision and the DAG-aware policy provided by iCacheManager.
即
iMetricCollector 监听资源变化,并把结果汇报给iController。iController决定资源伸缩。Cached RDD信息保存在iCacheManager中,具体的执行者是iBlockManager 。
此处,整理出论文中提到的“知识盲区”。
2.1 multi-process & multi-threadMapReduce是一种multi-process计算模型,我们可以通过修改JVM的资源来改变总体资源(不同任务运行在不同的JVM上)。
Apache Spark是一种在单节点上的multi-thread计算模型,因此,我们不能像修改multi-process计算模型资源那样修改Spark的资源。
在集群上运行时,每个Spark应用程序都有一组独立的执行器jvm,它们只运行该应用程序的任务和存储数据。如果多个用户需要共享您的集群,根据集群管理器的不同,可以使用不同的选项来管理分配。
2.2.1 static partitioning of resources通过这种方法,给每个应用程序提供了它可以使用的最大数量的资源,并在整个过程中保持这些资源。这是Spark的standalone模式和YARN模式以及粗粒度Mesos模式中使用的方法。
2.2.2 dynamic sharing of CPU cores在这种模式下,每个Spark应用程序仍然有一个固定且独立的内存分配(由Spark .executor.memory设置),但当应用程序不在某个机器上运行任务时,其他应用程序可能会在这些核心上运行任务。当希望有大量不太活跃的应用程序时,例如来自不同用户的shell会话,此模式非常有用。然而,它也有可能带来不可预测的延迟,因为当应用程序有工作要做时,它可能需要一段时间才能重新获得一个节点上的核心。
注意,目前没有一种模式提供跨应用程序的内存共享。如果希望以这种方式共享数据,可以运行单个服务器应用程序,通过查询相同的RDD来处理多个请求。
2.2.3 Resource Allocation Policy在较高级别上,Spark应该在不再使用executor时放弃executor,并在需要executor时获取executor。由于没有确定的方法来预测即将删除的executor是否会在不久的将来运行任务,或者即将添加的新executor是否实际处于空闲状态,因此我们需要一组启发式方法来确定何时删除和请求执行者。
2.2.3.1 Request Policy启用动态分配的Spark应用程序在有待调度的挂起任务时请求其他executor。此条件必然意味着现有的executor不足以同时饱和所有已提交但尚未完成的任务。
Spark要求executor轮换执行。实际请求在spark.DynamicLocation.schedulerBacklogTimeout秒有挂起任务时触发,如果挂起任务队列持续存在,则每隔spark.DynamicLocation.sustainedSchedulerBacklogTimeout秒再次触发一次。
此外,每轮申请的executor数量比前一轮成倍增加。例如,应用程序将在第一轮中添加1个executor,然后在随后的几轮中添加2个、4个、8个executor,依此类推。
指数增长政策的目的是双重的。首先,应用程序应该在开始时谨慎地请求executor,以防结果表明只有几个额外的executor就足够了。这与TCP慢启动的理由相呼应。其次,应用程序应该能够及时提高其资源使用率,以防实际需要许多executor。
2.2.3.2 Remove PolicySpark应用程序在executor空闲超过Spark.DynamicLocation.ExecutionIdleTimeout秒时删除executor。在大多数情况下,此条件与请求条件是互斥的,因为如果仍有待调度的挂起任务,则executor不应处于空闲状态。
2.3 Apache Spark checkpointcheckpoint的意思就是建立检查点,类似于快照。
例如在spark计算里面,计算流程DAG特别长,服务器需要将整个DAG计算完成得出结果,但是如果在这很长的计算流程中突然中间算出的数据丢失了,spark又会根据RDD的依赖关系从头到尾计算一遍,这样子就很费性能。
当然我们可以将中间的计算结果通过cache或者persist放到内存或者磁盘中,但是这样也不能保证数据完全不会丢失,存储的这个内存出问题了或者磁盘坏了,也会导致spark从头再根据RDD计算一遍,所以就有了checkpoint。
checkpoint的作用就是将DAG中比较重要的中间数据做一个检查点将结果存储到一个高可用的地方(通常这个地方就是HDFS里面)。
使用rdd.checkpoint()之前最好先rdd.cache(),因为rdd.checkpoint()自身会计算一遍没有计算的地方。
2.4 对executor、task、并行度概念的理解task:spark应用任务执行的最基本单位
并行度:一个spark应用中,每个stage之中的task数目,也是rdd分区的数目,即分区数目对应task数目
executor:spark-submit时可以设置cpu core和memory,task由一个虚拟core执行
并行度 = executor数目 * 每个executor核数
命令cat /proc/cpuinfo
cpu型号model name
装箱问题是NP问题(多项式时间内无法精确求解),常用的解法有:
普通算法
- 下次适应算法(Next Fit)
- 首次适应算法(First Fit)
- 最佳适应算法(Best Fit)
- 降序首次适应算法(First Fit Decreasing)
- 降序最佳适应算法(Best Fit Decreasing)
- 动态规划
元启发式算法
- 模拟退火算法(Simulated Annealing)
- 遗传算法(Genetic Algorithm)
心跳是分布式技术的基础,在Spark中,有一个Master和众多的worker,master借助心跳机制直到每个worker的情况。心跳除了传递信息,另一个主要的作用就是worker告诉master它还活着,当心跳停止时,方便master进行一些容错操作,比如数据转移备份等等。
2.8 Block Manager在Spark中,不论是shuffle数据还是节点本身存储的数据,都将数据抽象为block,而每个driver/executor中的block都是由BlockManager管理。对于block实际的读取,BlockManager根据block存储位置的不同交由ShuffleManager、MemoryStore或DiskStore来实际处理。
2.9 Yarn Client & Yarn Cluster区别:
Client模式将用于监控和调度的Driver模块在客户端执行,而不是在Yarn中,所以一般用于测试
Cluster模式将用于监控和调度的Driver模块启动在Yarn集群资源中执行,一般用于生产环境。
Client模式下Driver在任务提交的本地机器上运行,启动后会和ResourceManager通讯申请启动ApplicationMaster
Cluster模式下,任务提交后会和ResourceManager通讯申请启动ApplicationMaster,ResourceManager在合适的NodeManager上启动ApplicationMaster,此时的ApplicationMaster就是Driver



