1. client运行脚本提交命令。 2. SparkSubmit实例化SparkSubmitArguments进行参数解析。 3. SparkSubmit实例化YarnClusterApplication并创建客户端。 4. 在客户端中封装YarnClient信息,包含提交参数和命令。 5. 将信息提交给RM。 6. RM向NM的yarnRMClient发送消息,启动APPmaster。 7. NM分配资源生成APPmaster,并启动Driver线程。 8. 执行代码,初始化SparkContext。 9. APPmaster向RM申请注册,要求分配资源。 10. RM向APPmaster返回可使用资源列表。 11. APPmaster实例化工作线程池launcherPool。 12. 工作线程池launcherPool中实例化ExecutorRunnalbe。 13. ExecutorRunnalbe向NM2的RPC申请启动Executor。 14. Driver的RPC和Executor的RPC自启动。 15. Executor的RPC向Driver的RPC通信,申请注册Executor。 16. Driver的RPC向Executor的RPC通信,返回注册成功。 17. NM2启动CoarseGrainedExecutorBackend进程作为Executor的守护进程,并实例化Executor,实例化线程池。 18. Executor的RPC向Driver的RPC通信,Executor启动成功,可以运行任务。 19. Driver进行任务切分。 20. Driver进行任务分配。宽窄依赖的区别,哪些算子有shuffle
窄依赖:父RDD的一个分区最多只能被一个子RDD使用。
宽依赖:父RDD的一个分区可以被多个子RDD使用。
引起shuffle的算子。
| 算子名称 | 转换 | 功能 |
|---|---|---|
| sortBy | 使用func先对数据进行处理,按照处理后的数据比较结果排序,默认为正序。 | |
| sortByKey | 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD | |
| reduceByKey | 对K-V类型的RDD按照Key对value进行聚合。 | |
| join | (K,V).join((K,W)) => (K,(K,W)) | 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD |
| leftOuterJoin | 左表不管有没有与右表相同的key,左表都全部显示 | |
| rightOuterJoin | 右表不管有没有与左表相同的key,左表都全部显示 | |
| fullOuterJoin | 返回左右数据集的全部数据,左右有一边不存在的数据以None填充 | |
| distinct | 对源RDD进行去重后返回一个新的RDD。默认情况下,只有8个并行任务来操作,但是可以传入一个可选的numTasks参数改变它。 | |
| cogroup | (K,V).cogroup((K,W)) => (K,(Iterable,Iterable)) | 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD |
| repartition | 根据分区数,重新通过网络随机洗牌所有数据,百分百进行shuffle。 | |
| groupByKey | 对K-V类型的RDD按照Key对value分组。 |
从整体功能上看
两者没有太大区别,都是将Map端数据重新分发至Reduce端的过程。
从数据流来看
两者有较大区别,MR是先分区排序,之后在磁盘上归并排序,reduce拉取后再进行归并排序。Spark之前默认的是hash-based,使用HashMap进行合并,但是不会提前排序,现在Spark使用sort-based进行shuffle,也会进行排序。
从整体流程上看
两者有较大区别,MR将处理流程明显的划分出map、spill、merge、shuffle、sort、reduce等过程,各个阶段各司其职,通过过程式编程实现每个阶段的功能。Spark并没有明确的阶段,只是通过不同的stage和一系列transformation实现各个阶段的功能。
Spark和MapReduce的异同
资源依赖的不同
Spark将中间数据放在内存中,避免数据落盘,数据计算效率高;MR将中间数据落在磁盘上,数据计算效率低,但是没有OOM风险。
容错程度的不同
Spark使用RDD来实现高容错,某一部分RDD出现问题,可以通过血缘回溯这部分数据;MR某个阶段发生问题会导致整个作业失败,需要重跑。
生态复杂程度的不同
Spark既是计算引擎,也是一个包含流计算、SQL、图计算、算法包的框架,而MR只有map-reduce两个算子。
Spark和Flink的异同
数据模型不同
Flink采用dataflow编程模式,配合事件组成数据模型,这样的模型在目前在时延和吞吐上是最优的,Spark使用RDD组成数据模型,RDD丰富的API降低了批处理的难度,但是SparkStreaming的DStream API很少,无法支撑复杂的流计算。
运行架构不同
Spark将DAG划分为不同的stage,DAG节点间有血缘关系,运行中一个stage完成后才会销毁;SparkStreaming则是对持续流划分为不同批次,定时执行不同批次的数据运算。(Spark将流当做连续的微小的批处理)
Flink使用Dataflow编程模式有统一的runtime,在Flink的流处理中,一个事件在一个节点处理完后输出就可以发到下一个节点立即处理,这样不会引入额外的延迟。
时延和吞吐不同
Flink是亚秒级别时延,SparkStreaming是秒级别时延。Flink吞吐量是SparkStreaming的1.X倍。
反压不同
Flink下游阻塞后,会将压力逐级上传直到数据源。Spark则是设置反压的吞吐量,到达阈值后开始限流。
一致性语义支持不同
Flink提供exactly-once语义,Spark提供at-least-once语义。
对状态的支持不同
Flink提供完整的状态管理,基于状态提供局部恢复快照。Spark通过RDD的API进行状态管理,因为Spark面向大数据集,快照不宜太频繁。
Spark调优讲解并举例
减少Spark中运行的数据库连接数。
使用foreachPartition代替foreach,在foreachPartition内获取数据库的连接。
优化TopN的获取方式
(1)取出所有的key进行迭代,每取出一个key利用排序算子进行排序。
(2)对key值建立自定义分区器,使不同的key进入到不同的分区,对每个分区进行排序。
降低map数
对map进行coalesce操作,降低map数量。
调高reduce缓存
spark.reduce.maxSizeInFilght 参数控制reduceTask一次可以拉取多少数据量。
spark.shuffle.file.buffer 参数控制shuffle文件输出流的内存缓冲区大小。
Spark数据倾斜讲解并举例 数据倾斜表现OOMshuffle报错单个Executor执行时间过长任务突然失败 数据倾斜原因
根:key值分布不均匀
内因:建表规范缺失
两张表做join时,关联字段存在对同一属性的不同散列声明,会导致作业直接卡死。
外因:业务数据激增
因为活动等因素导致某几个key值对应数据量激增从而导致数据倾斜。
定位数据倾斜代码 某个task执行特别慢的情况首先明确数据倾斜发生在哪个stage中,可以通过Spark web UI来看当前运行到了第几个stage,分析当前执行的stage有几个task,每个task运行时间和分配数据量,来判断是不是这个stage出现的问题。
之后通过阅读作业代码判断是哪个宽依赖算子引起的问题,根据数据来定位引起数据倾斜的算子。
某个task莫名其妙内存溢出的情况直接阅读日志,日志中会清晰的标明哪一行引起的内存溢出。
对key值采样分析分布情况先试用pairs采样10%的样本数据,之后使用countByKey算子统计出每个key出现的次数。
导致数据倾斜的原因是Hive表Hive预先对数据按照key进行聚合或其他操作,spark直接使用梳理好的数据。
但是没有从根本解决数据倾斜的问题,只能适用于短期解决问题的场景。
过滤少数导致倾斜的key对于少数几个数据量特别大的key,且对计算结果不是特别重要的话,可以过滤掉。
这种方式适用场景较少。
提升shuffle并行度给shuffle算子传入一个参数,比如reduceByKey(1000),该参数会增加该算子的并行度。
该方法不能解决数据倾斜,只能缓解数据倾斜带来的影响。
两阶段聚合(局部聚合+全局聚合)对聚合类算子和分组聚合时,比较适用。
首先进行局部聚合,对key值打上一个固定范围的随机数前缀,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1),之后进行聚合。
之后将随机数前缀去掉,在此进行全局聚合。
通常可以从根本上解决数据倾斜的问题,但是只能适用于聚合类业务。
将reduce join 转换为 map join使用broadcast变量和map类算子进行join操作,避免shuffle类操作,从根本上避免数据倾斜的发生和出现。将小表的数据通过collect算子拉取到driver端的内存中,然后创建一个Broadcast变量,广播给Executor节点。接着对另外一个RDD执行map类算子,使用广播变量获取较小的RDD全量数据,与当前RDD每一条数据按照key进行比对,如果连接的key相同,那就把这两个RDD的数据连接起来。
只适用于一个小表和一个大表进行join操作时引起的数据倾斜。
采样倾斜key并拆分join首先采样提取出几个数据量较大的key,给这几个可以打上随机数前缀,这样拆分数量大的key值,两边进行正常join,最后union即可。
适用于两个大表进行join操作,其中一方有数据倾斜,另一方数据分布较为均匀。
使用随机前缀和扩容RDD进行join将有大量数据倾斜的key的RDD,打上1-100以内的随机数前缀。将另一个key分布较为均匀的RDD膨胀100倍,两个处理后的RDD进行join即可。
以空间解决数据倾斜。



