- 一,驱动程序
- 二,Task相关类
- 2.1 查看ResultTask类runTask(context: TaskContext)方法:
- 2.2,反序列化后执行func函数
- 三,MapPartitionsRDD 类
- 四,ParallelCollectionRDD
- 五,ShuffleMapTask
- 六,总结
val sparkConnf=new SparkConf().setAppName("wordCount").setMaster("local[3]")
val sparkContext=new SparkContext(sparkConnf)
val rdd = sparkContext.parallelize(Array(1, 2, 3, 4, 5), 3)
val rdd_increace=rdd.map(_+1)
rdd_increace.collect()
sparkContext.stop()
上面的代码创建了俩RDD,没有shuffle依赖,所以只有ResultTask,数量为MapPartitionsRDD分区数3。
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
}
其中MapPartitionsRDD对象中f函数的方法体是“_+1”。
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
var prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
preservesPartitioning: Boolean = false,
isFromBarrier: Boolean = false,
isOrderSensitive: Boolean = false)
extends RDD[U](prev)
二,Task相关类
spark的分布式计算是通过将Task对象分发到不同的Excutor中反序列化后执行Task.runTask(context: TaskContext)方法来执行各个分区具体计算逻辑的。
2.1 查看ResultTask类runTask(context: TaskContext)方法:override def runTask(context: TaskContext): U = {
// Deserialize the RDD and the func using the broadcast variables.
val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTimeNs = System.nanoTime()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L
func(context, rdd.iterator(partition, context))
}
先通过反序列化拿到rdd和func,跟踪程序的方法调用链,
在这里rdd是驱动程序中的
rdd_increace:MapPartitionsRDD对象;
func是一个函数其类型为:
(ctx: TaskContext, it: Iterator[T]): => Iterator[U]
func函数的函数体来源于rdd_increace.collect() ,并在调用过程中一直往后传递,具体传递过程可跟踪源码中的调用链。
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
在这里func反序列化后的大概结果
val func=(ctx: TaskContext, it: Iterator[Int]) => {it.toArray}
2.2,反序列化后执行func函数
func(context, rdd.iterator(partition, context))
查看 rdd.iterator(partition, context)
iterator(split: Partition, context: TaskContext)方法
是在RDD类中定义的不可变函数,其作用是要么从缓存checkpoint中读取数据要么计算数据。其中执行逻辑计算的关键代码是compute(split, context)。
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
getOrCompute(split, context)
} else {
computeOrReadCheckpoint(split, context)
}
}
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
if (isCheckpointedAndMaterialized) {
firstParent[T].iterator(split, context)
} else {
compute(split, context)
}
}
compute(split: Partition, context: TaskContext): Iterator[T] 是定义在RDD中的抽象类,具体实现在RDD子类中。
def compute(split: Partition, context: TaskContext): Iterator[T]三,MapPartitionsRDD 类
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
var prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
preservesPartitioning: Boolean = false,
isFromBarrier: Boolean = false,
isOrderSensitive: Boolean = false)
extends RDD[U](prev) {
.
.
.
override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context))
.
.
.
}
此处f函数是类构造器参数,它的值是在上文驱动程序map转换操作的的值,其函数体为“_+1”操作。
在执行f函数之前,先计算函数参数的结果,这里firstParent[T]返回父依赖的第一个RDD(MapPartitionsRDD只会有一个父RDD,UnionRDD会有多个),父RDD重复这个iterator->compute方法调用过程执行自己的compute(split: Partition, context: TaskContext)函数并返回处理的结果组成的Iterator[T],
这里的父RDD为ParallelCollectionRDD
override def compute(s: Partition, context: TaskContext): Iterator[T] = {
new InterruptibleIterator(context, s.asInstanceOf[ParallelCollectionPartition[T]].iterator)
}
ParallelCollectionRDD的compute函数将集合元素的迭代器包装为InterruptibleIterator后返回,因为其没有上一级的RDD,所以并没有继续迭代调用。到此为止RDD的依赖链执行完毕。
五,ShuffleMapTaskTask有俩类型,ResultTask和ShuffleMapTask。ShuffleMapTask会涉及到临时缓存文件的IO。
可以看出这里在ShuffleMapTask写缓存的时候也是和ResultTask通过iterator调用compute方法的方式触发rdd依赖链各算子的顺序计算的。
六,总结1,RDD算子的计算逻辑是通过函数类型的类参数传递给各RDD类的,并在RDD的
def compute(split: Partition, context: TaskContext): Iterator[T]
类方法中被调用。compute被
final def iterator(split: Partition, context: TaskContext): Iterator[T]
调用,
2,如果RDD的父RDD不为Nil,则在执行compute方法时,会通过计算方法参数值的方式迭代执行其父RDD的compute方法。通过此种方式可以保证RDD DAG中每个RDD都会被顺序执行。如果某个RDD的父级RDD存在检查点checkpoint,则从缓存中读取数据不用重复计算。
3,RDD的执行时在Task任务的runTask(context: TaskContext)方法中被调用的,Task任务是在Application被提交后Task任务生成并分配到特定Excutor中执行的,转换RDD不会提交Task,行动RDD才会,所以没有行动RDD不会触发任务执行。



