栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

SparkShuffle解析

SparkShuffle解析

Spark两种核心Shuffle:

 

HashShuffle

SortShuffle

一、HashShuffle 1.未经优化的HashShuffle

 在shuffleWrite阶段,也就是上层,每个task都会根据key进行hash划分,从而让相同hash值的key进入同一个blockfile文件,以供shuffleRead阶段的task拉取(溢写到磁盘形成blockfile之前,会先写入内存缓冲区中,填满后才可以溢写)。而blockfile的个数取决于下一个stage有多少个task,下游stage有多少个task,那么上游stage的每个task就会创建多少个blockfile。例如上图中,上游stage有4个task,下游有是哪个task,那么就一共会产生12个blockfile。

2.经过优化的HashShuffle

 为了优化HashShuffleManager我们可以设置一个参数,spark.shuffle. consolidateFiles,该参数默认值为false,将其设置为true即可开启优化机制。此时会出现shuffleFileGroup的概念,同一个Executor中的task会将数据写到同一个shuffleFileGroup

优化后的HashShuffle,主要在产生的blockfile的个数上。未经优化的blockFile个数是上游stage的task个数N乘以下游stage的task个数M。N*M个。经过优化的个数则是以executor个数X乘以下游stage的task个数M。X*M个。使得task产生的磁盘文件可以先进行一定程度上的合并,这样可以大大减少产生的磁盘文件个数。

二、SortShuffle 1.普通机制的SortShuffle

        SortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制。当shuffle read task的数量小于等于spark.shuffle.sort. bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制。

        在该模式下,数据会根据shuffle算子的类型,以不同的数据结构写入内存数据结构中。如果是reduceByKey这种算子,会用Map类型。如果是join这种算子,那就用Array类型。每写入一条数据,都会判断是否达到了溢写的阈值,如果达到了,将溢写到磁盘。再溢写之前,会根据Key对数据进行排序,每批次10000条数据写入到一个内存缓冲区,当内存缓冲区溢满之后再一起写到磁盘文件中,这样可以减少磁盘IO。

        在内存缓冲区溢写到磁盘的过程中, 会产生很多个溢写文件,最终会进行一次merge,合并成一个文件。因为一个task只产生这一个溢写文件,该文件包含了下游Stage所有task所需的数据,所以还需要生成一个索引文件,用来标记每个下游Task在该溢写文件中的的startOffset和EndOffset。

        在此种Shuffle过程中,产生溢写文件的个数根据上游Stage中的 task个数决定。假如上游Task有50个,Executor有10个,下游Stage的Task有100个,未经优化的HashShuffle需要产生 50 *100=5000个溢写文件,优化的HashShuffle需要产生10*100=1000个溢写文件。而此种shuffle只需要50个溢写文件。

2.Bypass机制的SortShuffle

 

bypass运行机制的触发条件如下:

①shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值

②不是聚合类的shuffle算子

启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/487211.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号