- 案例
- shuffle write准备
- shuffle write、read
假设我们写一个wordcount的程序:
sc.
textFile("hdfs://hadoop102:8020/spark-input/word.txt")
.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.saveAsTextFile("hdfs://hadoop102:8020/spark-out/wcout")
观看job:
划分了两个stage,因为出现了shuffle:
一共是4个task,第一个stage两个task,第二个stage两个task。
shuffle write溢写了171.0 B数据到本地磁盘。shuffle read读取这171.0 B的数据。
在/tmp目录下,我们发现了溢写的内容:
一个data文件,一个index文件。
所以,他是如何工作的?
shuffle write准备
我们从executor注册开始讲起。
new Executor的时候会准备一个线程池。
driver将task发来之后,executor就要准备执行了。
他new了一个TaskRunner然后丢到executor的线程池中执行,所以我们要看TaskRunner的run方法。
他拿到一个序列化器将task反序列化出来。
然后进入task.run。
task是spark的最小执行单元。
task有两种:ShuffleMapTask和ResultTask,对应的stage分别是ShuffleMapStage和ResultStage。
ResultTask是直接产生结果的task,而ShuffleMapTask可以从源头读数据,可以从上一个stage中溢写到磁盘的溢写文件中读数据。
在我们的wordcount案例中,textFile,flatMap,Map属于ShuffleMapStage,reduceByKey,saveAsTextFile属于ResultStage。
所以在runTask(context)这个抽象方法中,我们先走ShuffleMapTask的runTask方法:
这里,他先拿到最后一个rdd,然后拿到rdd的依赖。然后往外写。
写之前,他要拿最后一个rdd的iterator。
他问我们有没有cache,persist,或者做checkpoint,显然我们没有,所以进入compute方法:
这是个抽象方法。因为我们stage0的最后一个rdd是MapPartitionsRDD,所以看这个类的compute方法:
但是他还要往上找iterator。
一直往上找,直到
到HadoopRDD这里就要从数据源读数据了。
并且他返回的也是一个迭代器。
既然数据都读来了,然后就从前往后flatMap,Map应用函数计算。
shuffle write、read
既然要shuffle write,我们要先拿到writer。
getWriter也是抽象的,幸运的是ShuffleManager只有一个实现:SortShuffleManager。
而这个SortShuffleManager是在初始化spark环境时创建的。
使用哪一种shuffle writer,需要一个handle的模式匹配,而这个handle是传进来的:
在这里我们能知道返回哪一个ShuffleHandle。
首先看是否使用BypassMergeSortShuffleHandle。
有mapSideCombine直接不行。
因为reduceByKey调用的是combineByKeyWithClassTag:
它的mapSideCombine默认是true。
我们debug一下:
首先可以看到它的血缘关系。
然后mapSideCombine也确实为true。
所以不使用BypassMergeSortShuffleHandle。
至于是否用SerializedShuffleHandle,我们接着看:
他首先问我们的序列化器是不是支持重排序的。
java的序列化器是不支持的,kyro是支持的。
我们用的是kyro,所以看下一个条件:
又是问有没有mapSideCombine,我们的为true,所以直接跳过这个handle。
所以最后返回的就是baseShuffleHandle。
那么我们用的writer就是SortShuffleWriter。
进入insertAll。
现在就要执行map端的聚合了,将定义的对第一个数的操作逻辑和分区内的操作逻辑取出来应用:
注意到此时才会用分区器的getPartition方法。
最后得到的结果就是分区号加上区内聚合的结果。
最后有一个写入磁盘的操作。
然后删除原来的index和data文件,将tmp文件重命名,因为他溢写的操作要有很多次。
那么stage1如何去读取这些数据呢?
看ShuffledRDD中的compute方法:
这里就会有一个读出来的操作。



