栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark第二天的学习

Spark第二天的学习

Spark第二天学习     算子(多文件)         行动算子             foreachPartition:遍历的数据是每个partition的数据         转换算子             union                 合并两个数据集,然后两个数据集的类型要一致             转换算组join                 leftOuterJoin rightOuterJoin fullOuterJoin 这些 join 都是作用在 K,V 格式的 RDD 上。根据 key 值进行连接,例如: (K,V)join(K,W)返回(K,(V,W))。这里我感觉就是因为K相同,然后合并了下 注意: join 后的分区数与父RDD分区数多的那一个相同。             intersection                 取两个数据集的交集             subtract                 取两个数据集的差集             mapPartitions                 与map类似,单位是每个partition上的数据             distinct(map+reduceByKey+map)                 对RDD内的数据去重             cogroup                 当调用类型(k,v)和(k,w)的数据上时,返回一个数据集 (K,(Iterable,Iterable)) 。     宽窄依赖         RDD之间有一系列的依赖关系,分为窄依赖与宽依赖             窄依赖                 父RDD与子RDD之间的关系是一对一的,或者是多对一。简单说,就是无论前面有多少,我这里只有一个给你。且不会有shuffle的产生             宽依赖                 父RDD与子RDD之间是一对多的。会有shuffle产生         宽窄依赖图理解                  Stage         spark任务会根据RDD之间的依赖关系,形成一个DAG有向无环图,DAG会提交给DAGScheduler , DAGScheduler 会把 DAG 划分成相互依赖的多个 stage ,划分 stage 的依据就是 RDD 之间的宽窄依赖。遇到宽依赖就划分 stage ,每个 stage 包含一个或多个 task 任务。然后将这些 task 以 taskSet 的形式提交给 TaskScheduler 运行。 stage 是由一组并行的 task 组成。         stage切割规则             切割规则:从后往前,遇到宽依赖就切割stage                 1.从后向前推理,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入到Stage中; 2.每个Stage里面的Task的数量是由该Stage中最后一个RDD的Partition数量决定的; 3.最后一个Stage里面的任务的类型是ResultTask,前面所有其他Stage里面的任务类型都是ShuffleMapTask; 4.代表当前Stage的算子一定是该Stage的最后一个计算步骤;             总结:由于spark中stage的划分是根据shuffle来划分的,而宽依赖必然有shuffle过程,因此可以说spark是根据宽窄依赖来划分stage                          stage计算模式             1.pipeline 管道计算模式, pipeline 只是一种计算思想、模式。 2.在spark中pipeline是一个partition对应一个partition,所以在stage内部只有窄依赖。 3.数据一直在管道里面什么时候数据会落地? 对 RDD 进行持久化( cache , persist )。 shuffle write 的时候。 4.Stage 的 task 并行度是由 stage 的最后一个 RDD 的分区数来决定的 。 如何改变 RDD 的分区数? reduceByKey(XXX,3) GroupByKey(4) sc.textFile(path,numpartition) 5.使用算子时传递 分区num参数 就是分区 partition 的数量。 6.测试验证 pipeline 计算模式                              val conf = new SparkConf() conf.setMaster("local").setAppName("pipeline"); val sc = new SparkContext(conf) val rdd = sc.parallelize(Array(1,2,3,4)) val rdd1 = rdd.map { x => {  println("map--------"+x)  x }} val rdd2 = rdd1.filter { x => {  println("fliter********"+x)  true } } rdd2.collect() sc.stop()     算子(分区)         转换算子             mapPartitionsWithIndex 类似于 mapPartitions ,除此之外还会携带分区的索引值             repartition 增加或减少分区。此算子会产生 shuffle             coalesce coalesce 常用来减少分区,算子中第二个参数是减少分区的过程中是否产生 shuffle 。 true 为产生 shuffle , false 不产生 shuffle 。默认是 false 。 如果 coalesce 设置的分区数比原来的 RDD 的分区数还多的话,第二个参数设置为 false 不 会起作用(转换之后分区数大于之前),如果设置成 true ,效果和 repartition 一样。 repartition(numPartitions) = coalesce(numPartitions,true)             groupByKey 作用在 K,V 格式的 RDD 上。根据 Key 进行分组。作用在 (K,V) ,返回 (K,Iterable) 。             zip 将两个 RDD 中的元素( KV格式/非KV格式 )变成一个 KV 格式的 RDD ,两个 RDD 的个数必须相同。             zipWithIndex 该函数将 RDD 中的元素和这个元素在 RDD 中的索引号(从0开始)组合成 (K,V) 对。         行动算子             countByKey 作用到 K,V 格式的 RDD 上,根据 Key 计数相同 Key 的数据集元素。             countByValue 根据数据集每个元素相同的内容来计数。返回相同内容的元素对应的条数。             reduce 根据聚合逻辑聚合数据集中的每个元素。     Spark资源调度和任务调度         调度流程             启动集群后, Worker 节点会向 Master 节点汇报资源情况, Master 掌握了集群资源情况。 当 Spark 提交一个 Application 后,根据 RDD 之间的依赖关系将 Application 形成一个 DAG 有向 无环图。 任务提交后, Spark 会在 Driver 端创建两个对象: DAGScheduler 和 TaskScheduler , DAGScheduler 是任务调度的高层调度器,是一个对象。 DAGScheduler 的主要作用就是将 DAG 根据 RDD 之间的宽窄依赖关系划分为一个个的 Stage ,然 后将这些 Stage 以 TaskSet 的形式提交给 TaskScheduler ( TaskScheduler 是任务调度的低层 调度器,这里 TaskSet 其实就是一个集合,里面封装的就是一个个的 task 任务,也就是 stage 中 的并行的 task 任务)。 TaskSchedule 会遍历 TaskSet 集合,拿到每个 task 后会将 task 发送到 Executor 中去执行(其 实就是发送到 Executor 中的线程池 ThreadPool 去执行)。 task 在 Executor 线程池中的运行情况会向 TaskScheduler 反馈,当 task 执行失败时,则由 TaskScheduler 负责重试,将 task 重新发送给 Executor 去执行,默认重试3次。如果重试3次依 然失败,那么这个 task 所在的 stage 就失败了。 stage 失败了则由 DAGScheduler 来负责重试,重新发送 TaskSet 到 TaskScheduler , Stage 默 认重试4次。如果重试4次以后依然失败,那么这个 job 就失败了。 job 失败了, Application 就 失败了。 TaskScheduler 不仅能重试失败的 task ,还会重试 straggling (落后,缓慢) task ( 也就是执 行速度比其他task慢太多的task )。如果有运行缓慢的 task 那么 TaskScheduler 会启动一个新的 task 来与这个运行缓慢的 task 执行相同的处理逻辑。两个 task 哪个先执行完,就以哪个 task 的执行结果为准。这就是 Spark 的推测执行机制。在 Spark 中推测执行默认是关闭的。推测执行 可以通过 spark.speculation 属性来配置。 注意: 对于 ETL 类型要入数据库的业务要关闭推测执行机制,这样就不会有重复的数据入库。 如果遇到数据倾斜的情况,开启推测执行则有可能导致一直会有 task 重新启动处理相同 的逻辑,任务可能一直处于处理不完的状态。             流程图解                          粗细粒度资源申请             粗粒度资源申请(Spark)                 在 Application 执行之前,将所有的资源申请完毕,当资源申请成功后,才会进行任务的调度, 当所有的 task 执行完成后,才会释放这部分资源。 优点:在 Application 执行之前,所有的资源都申请完毕,每一个 task 直接使用资源就可以了, 不需要 task 在执行前自己去申请资源, task 启动就快了, task 执行快了, stage 执行就快了, job 就快了, application 执行就快了。 缺点:直到最后一个 task 执行完成才会释放资源,集群的资源无法充分利用。              细粒度资源申请(MR)                 Application 执行之前不需要先去申请资源,而是直接执行,让 job 中的每一个 task 在执行前自己去申请资源, task 执行完成就释放资源。 优点:集群的资源可以充分利用。 缺点: task 自己去申请资源, task 启动变慢, Application 的运行就响应的变慢了。              SparkShuffle         SparkShuffle概念             reduceByKey会将上一个RDD中的每一个Key对应的所有value聚合成一个value,然后生成一个新的RDD,元素类型是对的形式,这样每一个key对应一个聚合起来的value             问题                 聚合之前,每一个key对应的value不一定都是在一个partition中,也不太可能在同一个节点上,因为内RDD是分布式的弹性数据集,RDD的partition极有可能分布在不同的节点上             如何聚合                 Shuffle Write:上一个stage的每个map task就必须保证将自己处理的当前分区的数据相同的key写入一个分区文件中,可能会写入多个不同的分区文件中。                 Shuffle Read:reduce task就会从上一个stage的所有task所在的机器上寻找属于自己的那些分区文件,这样就可以保证每一个key所对应的value都会汇聚到同一个节点上去处理和聚合。             Spark中有两种Shuffle类型,HashShuffle和SortShuffle                 Spark1.2之前是HashShuffle Spark1.2引入SortShuffle spark2.0就只有sortshuffle。          HashShuffle             执行流程: 每一个map task将不同结果写到不同的buffer中,每个buffer的大小为32K。buffer起到数据缓存的作用。 每个buffer文件最后对应一个磁盘小文件。 reduce task来拉取对应的磁盘小文件。 总结: map task的计算结果会根据分区器(默认是hashPartitioner)来决定写入到哪一个磁盘小文件中去。ReduceTask会去Map端拉取相应的磁盘小文件。 产生的磁盘小文件的个数: M(map task的个数)*R(reduce task的个数) 产生的磁盘小文件过多,会导致以下问题: 在Shuffle Write过程中会产生很多写磁盘小文件的对象。 在Shuffle Read过程中会产生很多读取磁盘小文件的对象。 在JVM堆内存中对象过多会造成频繁的gc,gc还无法解决运行所需要的内存的话,就会OOM。 在数据传输过程中会有频繁的网络通信,频繁的网络通信出现通信故障的可能性大大增加,一 旦网络通信出现了故障会导致shuffle file cannot find 由于这个错误导致的task失败, TaskScheduler不负责重试,由DAGScheduler负责重试Stage。                          合并机制             执行流程: 合并机制就是复用buffer,开启合并机制的配置是spark.shuffle.consolidateFiles。该参数默 认值为false,将其设置为true即可开启优化机制。 在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了。此时会出 现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与 下游stage的task数量是相同的。一个Executor上有多少个CPU core,就可以并行执行多少 个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的 磁盘文件内。 假设第一个stage有50个task,第二个stage有100个task,总共还是有10个Executor,每个 Executor执行5个task。那么原本使用未经优化的HashShuffleManager时,每个Executor会 产生500个磁盘文件,所有Executor会产生5000个磁盘文件的。但是此时经过优化之后,每个Executor创建的磁盘文件的数量的计算公式为:CPU core的数量 * 下一个stage的task数 量。也就是说,每个Executor此时只会创建100个磁盘文件,所有Executor只会创建1000个磁盘文件。 总结 产生磁盘小文件的个数: C(core的个数)*R(reduce的个数)                           SortShuffle             执行流程 map task 的计算结果会写入到一个内存数据结构里面,内存数据结构默认是 5M 。 在 shuffle 的时候会有一个定时器,不定期的去估算这个内存结构的大小,当内存结构中的 数据超过 5M 时,比如现在内存结构中的数据为 5.01M ,那么他会申请 5.01*2-5=5.02M 内存给内存数据结构。 如果申请成功不会进行溢写,如果申请不成功,这时候会发生溢写磁盘。 在溢写之前内存结构中的数据会进行排序分区 然后开始溢写磁盘,写磁盘是以 batch 的形式去写(批量),一个 batch 是1万条数据。 map task 执行完成后,会将这些 磁盘小文件 合并成一个大的磁盘文件,同时生成一个 索引文件 。 reduce task 去 map 端拉取数据的时候,首先解析索引文件,根据索引文件再去拉取对应的数据。 总结 产生磁盘小文件的个数:  2*M(map task的个数)                          bypass机制             bypass 运行机制的触发条件如下: shuffle reduce task 的数量小于 spark.shuffle.sort.bypassMergeThreshold 的参数 值。这个值默认是 200 。 不需要进行 map 端的预聚合,比如 groupBykey , join 。 产生的磁盘小文件为: 2*M(map task的个数) 。                          Shuffle文件寻址               MapOutputTracker MapOutputTracker 是 Spark 架构中的一个模块,是一个主从架构。管理磁盘小文件的地址。 MapOutputTrackerMaster 是主对象,存在于 Driver 中。 MapOutputTrackerWorker 是从对象,存在于 Excutor 中。 2)  BlockManager BlockManager :块管理者,是 Spark 架构中的一个模块,也是一个主从架构。 BlockManagerMaster ,主对象,存在于Driver中。 BlockManagerWorker ,从对象,存在于Excutor中。 1.  BlockManagerMaster 会在集群中有用到广播变量和缓存数据或者删除缓存数据的时候,通知 BlockManagerSlave 传输或者删除数据。 2.  BlockManagerWorker ,从对象,存在于 Excutor 中。 3.  BlockManagerWorker 会与 BlockManagerWorker 之间通信。 无论在 Driver 端的 BlockManager 还是在 Excutor 端的 BlockManager 都含有四个对象: 1.  DiskStore :负责磁盘的管理。 2.  MemoryStore :负责内存的管理。 3.  · :负责连接其他的 BlockManagerWorker 。 4.  BlockTransferService :负责数据的传输。 3)  Shuffle 文件寻址图---看左边                                                Shuffle 文件寻址流程 1. 当 map task 执行完成后,会将 task 的执行情况和磁盘小文件的地址封装到 MapStatus 对象中, 通过 MapOutputTrackerWorker 对象向 Driver 中的 MapOutputTrackerMaster 汇报。 2. 在所有的 map task 执行完毕后, Driver 中就掌握了所有的磁盘小文件的地址。 3. 在 reduce task 执行之前,会通过 Excutor 中 MapOutPutTrackerWorker 向 Driver 端的 MapOutputTrackerMaster 获取磁盘小文件的地址。 4. 获取到磁盘小文件的地址后,会通过 BlockManager 中的 ConnectionManager 连接数据所在节点 上的 ConnectionManager ,然后通过 BlockTransferService 进行数据的传输。 5.  BlockTransferService 默认启动 5 个 task 去节点拉取数据。默认情况下, 5 个 task 拉取数据 量不能超过 48M 。 注意: spark1.2 之前没有 SortShuffle 。 spark1.2-spark1.6 之间是有 HashShuffle 和 SortShuffle 的。 spark2.0 之后就只有 SortShuffle 了, HashShuffle 被移除了。                          Shuffle调优             在代码中不推荐使用,硬编码                 new SparkConf().set(“spark.shuffle.file.buffer”,”64”)             在提交spark任务的时候,推荐使用。                 spark-submit --conf spark.shuffle.file.buffer=64 –-conf ….             在 conf 下的 spark-default.conf 配置文件中,不推荐,因为是写死后所有应用程序都要用。
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/651028.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号