栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

shuffle过程

shuffle过程

shuffle
      • 案例
      • shuffle write准备
      • shuffle write、read

案例

假设我们写一个wordcount的程序:

sc.
textFile("hdfs://hadoop102:8020/spark-input/word.txt")
.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.saveAsTextFile("hdfs://hadoop102:8020/spark-out/wcout")

观看job:

划分了两个stage,因为出现了shuffle:


一共是4个task,第一个stage两个task,第二个stage两个task。

shuffle write溢写了171.0 B数据到本地磁盘。shuffle read读取这171.0 B的数据。

在/tmp目录下,我们发现了溢写的内容:



一个data文件,一个index文件。

所以,他是如何工作的?

shuffle write准备


我们从executor注册开始讲起。

new Executor的时候会准备一个线程池。


driver将task发来之后,executor就要准备执行了。


他new了一个TaskRunner然后丢到executor的线程池中执行,所以我们要看TaskRunner的run方法。


他拿到一个序列化器将task反序列化出来。

然后进入task.run。


task是spark的最小执行单元。

task有两种:ShuffleMapTask和ResultTask,对应的stage分别是ShuffleMapStage和ResultStage。

ResultTask是直接产生结果的task,而ShuffleMapTask可以从源头读数据,可以从上一个stage中溢写到磁盘的溢写文件中读数据。

在我们的wordcount案例中,textFile,flatMap,Map属于ShuffleMapStage,reduceByKey,saveAsTextFile属于ResultStage。


所以在runTask(context)这个抽象方法中,我们先走ShuffleMapTask的runTask方法:


这里,他先拿到最后一个rdd,然后拿到rdd的依赖。然后往外写。


写之前,他要拿最后一个rdd的iterator。


他问我们有没有cache,persist,或者做checkpoint,显然我们没有,所以进入compute方法:


这是个抽象方法。因为我们stage0的最后一个rdd是MapPartitionsRDD,所以看这个类的compute方法:


但是他还要往上找iterator。

一直往上找,直到


到HadoopRDD这里就要从数据源读数据了。


并且他返回的也是一个迭代器。

既然数据都读来了,然后就从前往后flatMap,Map应用函数计算。

shuffle write、read


既然要shuffle write,我们要先拿到writer。

getWriter也是抽象的,幸运的是ShuffleManager只有一个实现:SortShuffleManager。

而这个SortShuffleManager是在初始化spark环境时创建的。


使用哪一种shuffle writer,需要一个handle的模式匹配,而这个handle是传进来的:

在这里我们能知道返回哪一个ShuffleHandle。


首先看是否使用BypassMergeSortShuffleHandle。

有mapSideCombine直接不行。

因为reduceByKey调用的是combineByKeyWithClassTag:

它的mapSideCombine默认是true。

我们debug一下:

首先可以看到它的血缘关系。


然后mapSideCombine也确实为true。

所以不使用BypassMergeSortShuffleHandle。

至于是否用SerializedShuffleHandle,我们接着看:


他首先问我们的序列化器是不是支持重排序的。

java的序列化器是不支持的,kyro是支持的。


我们用的是kyro,所以看下一个条件:


又是问有没有mapSideCombine,我们的为true,所以直接跳过这个handle。

所以最后返回的就是baseShuffleHandle。


那么我们用的writer就是SortShuffleWriter。


进入insertAll。


现在就要执行map端的聚合了,将定义的对第一个数的操作逻辑和分区内的操作逻辑取出来应用:


注意到此时才会用分区器的getPartition方法。


最后得到的结果就是分区号加上区内聚合的结果。


最后有一个写入磁盘的操作。



然后删除原来的index和data文件,将tmp文件重命名,因为他溢写的操作要有很多次。

那么stage1如何去读取这些数据呢?

看ShuffledRDD中的compute方法:

这里就会有一个读出来的操作。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/342393.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号