栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark ShuffleRDD生成的两种Task

Spark ShuffleRDD生成的两种Task

Spark Core里的ShuffleRDD在阶段划分时会生成两个Stage,以下面的WordCount为例,这两个Stage的任务类型分别为ShuffleMapTask和ResultTask:

// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
// 创建 Spark 上下文环境对象(连接对象)
val sc = new SparkContext(sparkConf)
// 读取文件数据
val fileRDD = sc.textFile("input")
// 将文件中的数据进行分词
val wordRDD = fileRDD.flatMap(_.split(" "))
// 转换数据结构 word => (word, 1)
val word2oneRDD = wordRDD.map((_, 1))
// 将转换结构后的数据按照相同的单词进行分组聚合
val word2CountRDD = word2OneRDD.reduceByKey(_ + _)
// 将数据聚合结果采集到内存中
val word2Count = word2CountRDD.collect()

下面将描述一下这两种类型Task的处理过程,目的在于对ShuffleRDD带来的计算成本有更清楚的认识,也能够学习Spark对于Shuffle过程是如何进行高效实现的。下面的描述针对的Spark版本为3.1.2,与之前版本的Spark有所不同,老版本的spark.shuffle.consolidateFiles配置已经不存在,默认进行了合并。

1、ShuffleMapTask

1个task只会生成1个data文件和1个indexfile,data文件存储下游多个partition的数据,而indexfile里存储的就是每个partition在这个文件的offset。示例里的ShuffleMapTask下游的ResultTask数量为2,也就是也就是indexfile存储了两个partition的offset。

index文件存储的两个parition的offset,分别为0x41(65)和0x8b(139)也就是每个partition数据的长度。 对应的data文件:

 可以看到第一个LZ4Block结束于第65个字节,第二个结束于第139个字节。

2、ResultTask

从ShuffleMapTask生成的数据文件中获取数据,这个也是通过netty来读取的,对应的通信方是ExceutorBackend的Endpoint(用于执行task的是这个进程里的Executor对象)。ResultTask的数量如果没有设置spark.default.parallelism,则由上游rdd的最大partitions决定。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/752509.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号