IO流的方式读取数据
1、字节流的方式读取数据(一次读取一个字节)
InputStream in = new FileInputStream("input")
2、字节缓冲流的方式读取数据(把数据缓存起来,批量读取)
InputStream in = new BufferedInputStream(FileInputStream("input"))
3、一次读取一行的缓存流的方式读取数据
BufferedReader reader = new BufferedReader(new InputStreamReader( new FileInputStream("input"), "UTF-8") )
流的读取,都是在调用read。称之为惰性加载。
IO流和RDD之间关系
//读取外部文件
val textRDD: RDD[String] = sc.textFile("D:\mywork\IDEAproject\spark-11\src\main\input\")
//对读取到的内容进行切割并进行扁平化处理
val flatMapRDD: RDD[String] = textRDD.flatMap(_.split(" "))
//对数据集中的内容进行结构转换
val mapRDD: RDD[(String, Int)] = flatMapRDD.map((_, 1))
//对相同的单词key的value进行聚合
val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)
//将执行结果进行收集
val res: Array[(String, Int)] = reduceRDD.collect()
res.foreach(println)
1)产生一个lineRdd
2)flatMap包装了lineRdd,返回wordRdd
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))}
3)map包装了wordRdd ,返回wordToOneRdd
def map[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))}
4)只有最后调用collect()才会执行
2.RDD
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象。 代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
1)弹性
存储的弹性:内存与磁盘的自动切换;
容错的弹性:数据丢失可以自动恢复;
计算的弹性:计算出错重试机制;
分片的弹性:可根据需要重新分片。
2)分布式
数据存储在大数据集群不同节点上
3)数据集
RDD封装了计算逻辑,并不保存数据
4)数据抽象
RDD是一个抽象类,需要子类具体实现
5)不可变
RDD封装了计算逻辑,是不可以改变的,想要改变,只能产生新的RDD,在新的RDD里面封装计算逻辑
6)可分区、并行计算
3.RDD特性 Internally, each RDD is characterized by five main properties:
- A list of partitions
- A function for computing each split
- A list of dependencies on other RDDs
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
- Optionally, a list of preferred locations to compute each split on (e.g. block locations for
an HDFS file)
(1)一组分区(Partition),即是数据集的基本组成单位;
protected def getPartitions:Array[Partition]
(2)一个计算每个分区的函数;
def compute(split: Partition, context: TaskContext): Interator[T]
(3)RDD之间的依赖关系;
protected def getDependencies: Seq[Dependency[_]] = deps
(4)一个Partitioner,即RDD的分片函数;控制分区的数据流向(键值对)
val partitioner : scala.Option[org.apache.spark.Partitioner]
(5)一个列表,存储存取每个Partition的优先位置(preferred location)。 移动数据不如移动计算,除非资源不够。
protected def getPreferredLocations(split : Partition) : scala.Seq[String]
A list of partitions
多个分区,分区可以看成是数据集的基本组成单位
对于 RDD 来说, 每个分区都会被一个计算任务处理, 并决定了并行计算的粒度。
用户可以在创建 RDD 时指定 RDD 的分区数, 如果没有指定, 那么就会采用默认值。 默认值就是程序所分配到的 CPU Core 的数目.
每个分配的存储是由BlockManager 实现的, 每个分区都会被逻辑映射成 BlockManager 的一个 Block,,而这个 Block 会被一个 Task 负责计算。
A function for computing each split
- (分区)的函数.
- 中 RDD 的计算是以分片为单位的,每个 RDD 都会实现compute函数以达到这个目的
- A list of dependencies on other RDDs
- RDD 之间的依赖关系
- 的每次转换都会生成一个新的 RDD, 所以 RDD 之间会形成类似于流水线一样的前后依赖关系。 在部分分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失的分区数据, 而不是对 RDD 的所有分区进行重新计算
Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
- RDD,还有一个可选的分区器
- key-value的 RDD,才会有 Partitioner, 非key-value的 RDD 的 Partitioner 的值是 None;Partitiner 不但决定了 RDD 的本区数量, 也决定了 parent RDD Shuffle 输出时的分区数量
Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
- (preferred location)位置的列表
- HDFS 文件来说, 这个列表保存的就是每个 Partition 所在文件块的位置. 按照“移动数据不如移动计算”的理念, Spark 在进行任务调度的时候, 会尽可能地将计算任务分配到其所要处理数据块的存储位置.
1.编程模型
在Spark中,RDD被表示为对象,通过对象上的方法调用来对RDD进行转换。RDD经过一系列的transformations转换定义之后,就可以调用actions触发RDD的计算,action可以是向应用程序返回结果,或者是向存储系统保存数据。在Spark中,只有遇到action,才会执行RDD的计算(即延迟计算)。
sc.textFile(args(0)).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile(args(1))
算子:从认知心理学角度来讲,解决问题其实是将问题的初始状态,通过一系列的转换操作(operator),变成解决状态。
2.RDD的创建
在Spark中创建RDD的创建方式可以分为三种:从集合中创建RDD、从外部存储创建RDD、从其他RDD创建。
1.IDEA环境准备
1)新建maven工程添加依赖
org.apache.spark spark-core_2.112.1.1 SparkCoreTest net.alchim31.maven scala-maven-plugin3.4.6 compile testCompile
3)添加scala 框架支持
添加框架支持勾选scala
3)创建一个scala文件夹,并把它修改为Source Root
2.从集合中创建1)从集合中创建RDD,Spark主要提供了两种函数:parallelize和makeRDD
package com.spark.day02
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_CreateRDD_mem {
def main(args: Array[String]): Unit = {
//创建Spark配置文件对象
val conf: SparkConf = new SparkConf().setAppName("Spark01_CreateRDD_mem").setMaster("local[*]")
//创建SparkContext对象,该对象时提交Spark App的入口
val sc = new SparkContext(conf)
//创建一个集合 对象
val list = List(1, 2, 3, 4)
//根据集合创建RDD 方式一
//val rdd: RDD[Int] = sc.parallelize(list)
//根据集合创建RDD 方式二
val rdd: RDD[Int] = sc.makeRDD(list)
rdd.foreach(println)
//释放资源
sc.stop()
}
}
3.从外部存储系统的数据集创建
由外部存储系统的数据集创建RDD包括:本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、HBase等
1)数据准备
在项目中新建input文件夹,新建1.txt和2.txt。每个文件里面准备一些word单词。
2)创建RDD
package com.spark.day02
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_CreateRDD_File {
def main(args: Array[String]): Unit = {
//创建Spark配置文件对象
val conf: SparkConf = new SparkConf().setAppName("Spark01_CreateRDD_mem").setMaster("local[*]")
//创建SparkContext对象,该对象时提交Spark App的入口
val sc = new SparkContext(conf)
//从本地文件读取数据 创建RDD
//val rdd: RDD[String] = sc.textFile("D:\mywork\IDEAproject\spark-11\src\main\input\*")
//从HDFS上读取数据,创建RDD
val rdd: RDD[String] = sc.textFile("hdfs://hadoop102:8020/input")
rdd.collect().foreach(println)
//释放资源
sc.stop()
}
}
3.分区规则
1.默认分区源码(RDD数据从集合中创建)
1)默认分区数源码解读
2)代码
package com.spark.day02
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark03_Partition_default {
def main(args: Array[String]): Unit = {
//创建Spark配置文件对象
val conf: SparkConf = new SparkConf().setAppName("Spark01_CreateRDD_mem").setMaster("local[*]")
//创建SparkContext对象,该对象时提交Spark App的入口
val sc = new SparkContext(conf)
//通过集合创建RDD
//val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
//分区默认数 numSlices: Int = defaultParallelism) scheduler.conf.getInt("spark.default.parallelism", totalCores)
//def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
//通过读取外部文件创建RDD
val rdd: RDD[String] = sc.textFile("D:\mywork\IDEAproject\spark-11\src\main\input\1.txt")
//def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
//查看分区效果
println(rdd.partitions.size)
//rdd.saveAsTextFile("D:\output")
//释放资源
sc.stop()
}
}
2.分区源码(RDD数据从集合中创建)
1)分区测试(RDD数据从集合中创建)
package com.spark.day02
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark04_Partition_mem {
def main(args: Array[String]): Unit = {
//创建Spark配置文件对象
val conf: SparkConf = new SparkConf().setAppName("Spark01_CreateRDD_mem").setMaster("local[*]")
//创建SparkContext对象,该对象时提交Spark App的入口
val sc = new SparkContext(conf)
//输入数据1 2 3 4 ,采用默认分区方式, 最终分区数2, 0->1 1-> 2,3,4
//val rdd: RDD[String] = sc.textFile("D:\mywork\IDEAproject\spark-11\src\main\input\2.txt")
//输入数据1 2 3 4 ,minPartitions设置为3, 最终分区数3, 0->1,2 1->3 2->4
//val rdd: RDD[String] = sc.textFile("D:\mywork\IDEAproject\spark-11\src\main\input\2.txt",3)
//输入数据123456,minPartitions设置为3, 最终分区数3, 0->123456 1-> 2->
//val rdd: RDD[String] = sc.textFile("D:\mywork\IDEAproject\spark-11\src\main\input\2.txt",3)
//输入数据123 4567,minPartitions设置为3, 最终分区数3, 0->123 1->4567 2->
//val rdd: RDD[String] = sc.textFile("D:\mywork\IDEAproject\spark-11\src\main\input\2.txt",3)
//19个字节/5
val rdd: RDD[String] = sc.textFile("D:\mywork\IDEAproject\spark-11\src\main\input\3.txt",5)
rdd.saveAsTextFile("D:\mywork\IDEAproject\spark-11\src\main\output")
//释放资源
sc.stop()
}
}
3.分区源码(RDD数据从文件中读取后创建)
1)分区测试
package com.spark.day02
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark05_Partition_file {
def main(args: Array[String]): Unit = {
//创建Spark配置文件对象
val conf: SparkConf = new SparkConf().setAppName("Spark01_CreateRDD_mem").setMaster("local[*]")
//创建SparkContext对象,该对象时提交Spark App的入口
val sc = new SparkContext(conf)
//通过集合创建RDD
//集合中四个数据,默认分区数---实际输出12个分区---分区中的数据分布
//2分区->1 5分区->2 8分区->3 11分区->4
//val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
//集合中四个数据,--实际输出4个分区---分区中的数据分布
//0分区->1 1分区->2 2分区->3 3分区->4
//val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4),4)
//集合中四个数据,---实际输出3个分区---分区中的数据分布
//0分区->1 1分区->2 2分区->3 4
//val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4),3)
//集合中五个数据,---实际输出3个分区---分区中的数据分布
//0分区->1 1分区->2 3 2分区->4 5
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5),3)
//start = ((i * 元素个数)/分区数)
//end = (((i + 1) * 元素个数)/分区数)
//前闭后开
rdd.saveAsTextFile("D:\mywork\IDEAproject\spark-11\src\main\output")
//释放资源
sc.stop()
}
}
2)源码解析
getSplits文件返回切片规划,真正读取是在compute方法中创建LineRecordReader读取的,有两个关键变量
start=split.getStart() end = start + split.getLength
4.Transformation转换算子Spark_Transformation转换算子_asd623444055的博客-CSDN博客
5.Action行动算子Spark_Action行动算子_asd623444055的博客-CSDN博客
6.DD序列化在实际开发中我们往往需要自己定义一些对于RDD的操作,那么此时需要注意的是,初始化工作是在Driver端进行的,而实际运行程序是在Executor端进行的,这就涉及到了跨进程通信,是需要序列化的。
1.闭包检查
1)闭包引入
object serializable01_object {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3.创建两个对象
val user1 = new User()
user1.name = "zhangsan"
val user2 = new User()
user2.name = "lisi"
val userRDD1: RDD[User] = sc.makeRDD(List(user1, user2))
//3.1 打印,ERROR报java.io.NotSerializableException
//userRDD1.foreach(user => println(user.name))
//3.2 打印,RIGHT
val userRDD2: RDD[User] = sc.makeRDD(List())
//userRDD2.foreach(user => println(user.name))
//3.3 打印,ERROR Task not serializable 注意:没执行就报错了
userRDD2.foreach(user => println(user1.name))
//4.关闭连接
sc.stop()
}
}
//class User {
// var name: String = _
//}
class User extends Serializable {
var name: String = _
}
2)闭包检查
2.序列化方法和属性
1)说明
Driver:算子以外的代码都是在Driver端执行
Executor:算子里面的代码都是在Executor端执行
2)代码实现
object serializable02_function {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3.创建一个RDD
val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark", "hive", "atguigu"))
//3.1创建一个Search对象
val search = new Search("hello")
// Driver:算子以外的代码都是在Driver端执行
// Executor:算子里面的代码都是在Executor端执行
//3.2 函数传递,打印:ERROR Task not serializable
search.getMatch1(rdd).collect().foreach(println)
//3.3 属性传递,打印:ERROR Task not serializable
search.getMatche2(rdd).collect().foreach(println)
//4.关闭连接
sc.stop()
}
}
class Search(query:String) extends Serializable {
def isMatch(s: String): Boolean = {
s.contains(query)
}
// 函数序列化案例
def getMatch1 (rdd: RDD[String]): RDD[String] = {
//rdd.filter(this.isMatch)
rdd.filter(isMatch)
}
// 属性序列化案例
def getMatche2(rdd: RDD[String]): RDD[String] = {
//rdd.filter(x => x.contains(this.query))
rdd.filter(x => x.contains(query))
//val q = query
//rdd.filter(x => x.contains(q))
}
}
3)问题一说明
//过滤出包含字符串的RDD
def getMatch1 (rdd: RDD[String]): RDD[String] = {
rdd.filter(isMatch)
}
(1)在这个方法中所调用的方法isMatch()是定义在Search这个类中的,实际上调用的是this. isMatch(),this表示Search这个类的对象,程序在运行过程中需要将Search对象序列化以后传递到Executor端。
(2)解决方案
类继承scala.Serializable即可。
class Search() extends Serializable{...}
4)问题二说明
//过滤出包含字符串的RDD
def getMatche2(rdd: RDD[String]): RDD[String] = {
rdd.filter(x => x.contains(query))
}
(1)在这个方法中所调用的方法query是定义在Search这个类中的字段,实际上调用的是this. query,this表示Search这个类的对象,程序在运行过程中需要将Search对象序列化以后传递到Executor端。
(2)解决方案一
(a)类继承scala.Serializable即可。
class Search() extends Serializable{...}
(b)将类变量query赋值给局部变量
修改getMatche2为
//过滤出包含字符串的RDD
def getMatche2(rdd: RDD[String]): RDD[String] = {
val q = this.query//将类变量赋值给局部变量
rdd.filter(x => x.contains(q))
}
(3)解决方案二
把Search类变成样例类,样例类默认是序列化的。
case class Search(query:String) extends Serializable {...}
3.Kryo序列化框架
Java的序列化能够序列化任何的类。但是比较重,序列化后对象的提交也比较大.
Spark出于性能的考虑,Spark2.0开始支持另外一种Kryo序列化机制。Kryo速度是Serializable的10倍。当RDD在Shuffle数据的时候,简单数据类型、数组和字符串类型已经在Spark内部使用kryo来序列化。
注意:即使使用kryo序列化,也要继承Serializable接口。
object serializable03_Kryo {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setAppName("SerDemo")
.setMaster("local[*]")
// 替换默认的序列化机制
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册需要使用 kryo 序列化的自定义类
.registerKryoClasses(Array(classOf[Searcher]))
val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark", "spark", "hahah"), 2)
val searcher = new Searcher("hello")
val result: RDD[String] = searcher.getMatchedRDD1(rdd)
result.collect.foreach(println)
}
}
case class Searcher(val query: String) {
def isMatch(s: String) = {
s.contains(query)
}
def getMatchedRDD1(rdd: RDD[String]) = {
rdd.filter(isMatch)
}
def getMatchedRDD2(rdd: RDD[String]) = {
val q = query
rdd.filter(_.contains(q))
}
}
7.RDD依赖关系
1.查看血缘关系
RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
toDebugString
2.查看依赖关系
dependencies
object Spark04_TestLineage {
def main(args: Array[String]): Unit = {
//创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//创建RDD
val rdd: RDD[String] = sc.makeRDD(List("hello spark","hello jingjing"),2)
//查看RDD的血缘关系
println(rdd.toDebugString)
//查看RDD的依赖关系
println(rdd.dependencies)
println("------------------------------")
val flatMapRDD: RDD[String] = rdd.flatMap(_.split(" "))
println(flatMapRDD.toDebugString)
println(flatMapRDD.dependencies)
println("------------------------------")
val mapRDD: RDD[(String, Int)] = flatMapRDD.map((_,1))
println(mapRDD.toDebugString)
println(mapRDD.dependencies)
println("------------------------------")
val resRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_+_)
println(resRDD.toDebugString)
println(resRDD.dependencies)
println("------------------------------")
// 关闭连接
sc.stop()
}
}
3.窄依赖
窄依赖表示每一个父RDD的Partition最多被子RDD的一个Partition使用,窄依赖我们形象的比喻为独生子女。
4.宽依赖
宽依赖表示同一个父RDD的Partition被多个子RDD的Partition依赖,会引起Shuffle,总结:宽依赖我们形象的比喻为超生。
具有宽依赖的 transformations 包括: sort, reduceByKey, groupByKey, join, 和调用rePartition函数的任何操作.
宽依赖对 Spark 去评估一个 transformations 有更加重要的影响, 比如对性能的影响.
5.Spark中的Job调度
一个Spark应用包含一个驱动进程(driver process,在这个进程中写Spark的逻辑代码)和多个执行器进程(executor process,跨越集群中的多个节点)。Spark 程序自己是运行在驱动节点, 然后发送指令到执行器节点。
一个Spark集群可以同时运行多个Spark应用, 这些应用是由集群管理器(cluster manager)来调度。
Spark应用可以并发的运行多个job, job对应着给定的应用内的在RDD上的每个 action操作。
- Spark应用
一个Spark应用可以包含多个Spark job, Spark job是在驱动程序中由SparkContext 来定义的。
当启动一个 SparkContext 的时候, 就开启了一个 Spark 应用。 一个驱动程序被启动了, 多个执行器在集群中的多个工作节点(worker nodes)也被启动了。 一个执行器就是一个 JVM, 一个执行器不能跨越多个节点, 但是一个节点可以包括多个执行器。
一个 RDD 会跨多个执行器被并行计算. 每个执行器可以有这个 RDD 的多个分区, 但是一个分区不能跨越多个执行器.
- Spark Job 的划分
由于Spark的懒执行, 在驱动程序调用一个action之前, Spark 应用不会做任何事情,
针对每个action,Spark 调度器就创建一个执行图(execution graph)和启动一个 Spark job。
每个 job 由多个stages 组成, 这些 stages 就是实现最终的 RDD 所需的数据转换的步骤。一个宽依赖划分一个stage。每个 stage 由多个 tasks 来组成, 这些 tasks 就表示每个并行计算, 并且会在多个执行器上执行。
6.Stage任务划分
1)DAG有向无环图
DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。原始的RDD通过一系列的转换就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据。例如,DAG记录了RDD的转换过程和任务的阶段。
2)RDD任务切分中间分为:Application、Job、Stage和Task
(1)Application:初始化一个SparkContext即生成一个Application;
(2)Job:一个Action算子就会生成一个Job;
(3)Stage:Stage等于宽依赖的个数加1;
(4)Task:一个Stage阶段中,最后一个RDD的分区个数就是Task的个数。
注意:Application->Job->Stage->Task每一层都是1对n的关系。
def main(args: Array[String]): Unit = {
val sc: SparkContext = new SparkContext(newSparkConf()
.setAppName("SparkCoreTest")
.setMaster("local[*]"))
val resultRDD = sc.textFile("input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) resultRDD.collect()
resultRDD.saveAsTextFile("output")
}
1.执行main方法->初始化sc->执行到Action算子1
2.DAGScheduler对job0切分Stage,Stage产生Task
3.TaskScheduler通过TaskSet获取job0的所有Task,然后序列化发往Executor
3)代码
object Spark05_task {
def main(args: Array[String]): Unit = {
//创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//创建RDD
val dataRDD: RDD[Int] = sc.makeRDD(List(1,2,3,4,1,2),2)
//聚合
val resultRDD: RDD[(Int, Int)] = dataRDD.map((_,1))
// Job:一个Action算子就会生成一个Job;
//job1打印到控制台
resultRDD.collect().foreach(println)
//job2输出到磁盘
resultRDD.saveAsTextFile("D:\")
Thread.sleep(10000000)
// 关闭连接
sc.stop()
}
}
4)查看Job个数
查看localhost:4040,发现Job有两个。
5)查看Stage个数
查看Job0的Stage。由于只有1个Shuffle阶段,所以Stage个数为2。
查看Job1的Stage。由于只有1个Shuffle阶段,所以Stage个数为2。
7.Stage任务划分源码分析
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.length)
}
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: Iterator[T] => U,
partitions: Seq[Int]): Array[U] = {
val cleanedFunc = clean(func)
runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
}
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int]): Array[U] = {
val results = new Array[U](partitions.size)
runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
results
}
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
… …
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
def runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
val start = System.nanoTime
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
… …
}
}
//提交作业
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
}
def post(event: E): Unit = {
eventQueue.put(event)
}
快捷键ctrl+H,查看eventQueue的子实现类DAGSchedulerEventProcessLoop
在DAGSchedulerEventProcessLoop里面找到onReceive方法
override def onReceive(event: DAGSchedulerEvent): Unit = {
val timerContext = timer.time()
try {
doOnReceive(event)
} finally {
timerContext.stop()
}
}
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
… …
}
//划分阶段,提交阶段
private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties) {
var finalStage: ResultStage = null
try {
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: Exception =>
… …
return
}
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
... …
submitStage(finalStage)
}
private def createResultStage( rdd: RDD[_], func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int], jobId: Int, callSite: CallSite): ResultStage = {
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
getShuffleDependencies(rdd).map { shuffleDep =>
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}
//获取shuffle依赖
private[scheduler] def getShuffleDependencies( rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
val parents = new HashSet[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new Stack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
if (!visited(toVisit)) {
visited += toVisit
toVisit.dependencies.foreach {
case shuffleDep: ShuffleDependency[_, _, _] =>
parents += shuffleDep
case dependency =>
waitingForVisit.push(dependency.rdd)
}
}
}
parents
}
//创建阶段
private def createResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
getShuffleDependencies(rdd).map { shuffleDep =>
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}
private def getOrCreateShuffleMapStage(
shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int): ShuffleMapStage = {
shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) =>
stage
case None =>
getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
createShuffleMapStage(dep, firstJobId)
}
}
createShuffleMapStage(shuffleDep, firstJobId)
}
}
def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
val rdd = shuffleDep.rdd
... ...
val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep)
stageIdToStage(id) = stage
shuffleIdToMapStage(shuffleDep.shuffleId) = stage
updateJobIdStageIdMaps(jobId, stage)
... ...
stage
}
//提交阶段
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
if (missing.isEmpty) {
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
if (missing.isEmpty) {
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}
//提交Task
private def submitMissingTasks(stage: Stage, jobId: Int) {
......
val tasks: Seq[Task[_]] = try {
stage match {
case stage: ShuffleMapStage =>
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId)
}
case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
}
}
}
if (tasks.size > 0) {
stage.pendingPartitions ++= tasks.map(_.partitionId)
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
} else {
markStageAsFinished(stage, None)
}
submitWaitingChildStages(stage)
}



