栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark源码跟踪(六)RDD逻辑代码执行

spark源码跟踪(六)RDD逻辑代码执行

RDD逻辑代码执行
  • 一,驱动程序
  • 二,Task相关类
    • 2.1 查看ResultTask类runTask(context: TaskContext)方法:
    • 2.2,反序列化后执行func函数
  • 三,MapPartitionsRDD 类
  • 四,ParallelCollectionRDD
  • 五,ShuffleMapTask
  • 六,总结

一,驱动程序
val sparkConnf=new SparkConf().setAppName("wordCount").setMaster("local[3]")
val sparkContext=new SparkContext(sparkConnf)
val rdd = sparkContext.parallelize(Array(1, 2, 3, 4, 5), 3)
val rdd_increace=rdd.map(_+1)
rdd_increace.collect()
sparkContext.stop()


上面的代码创建了俩RDD,没有shuffle依赖,所以只有ResultTask,数量为MapPartitionsRDD分区数3。

def parallelize[T: ClassTag](
    seq: Seq[T],
    numSlices: Int = defaultParallelism): RDD[T] = withScope {
   assertNotStopped()
   new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
 }
 
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
}

其中MapPartitionsRDD对象中f函数的方法体是“_+1”。

private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
    var prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
    preservesPartitioning: Boolean = false,
    isFromBarrier: Boolean = false,
    isOrderSensitive: Boolean = false)
  extends RDD[U](prev) 
二,Task相关类

spark的分布式计算是通过将Task对象分发到不同的Excutor中反序列化后执行Task.runTask(context: TaskContext)方法来执行各个分区具体计算逻辑的。

2.1 查看ResultTask类runTask(context: TaskContext)方法:
override def runTask(context: TaskContext): U = {
  // Deserialize the RDD and the func using the broadcast variables.
  val threadMXBean = ManagementFactory.getThreadMXBean
  val deserializeStartTimeNs = System.nanoTime()
  val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
    threadMXBean.getCurrentThreadCpuTime
  } else 0L
  val ser = SparkEnv.get.closureSerializer.newInstance()
  val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
    ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
  _executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs
  _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
    threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
  } else 0L
  
  func(context, rdd.iterator(partition, context))
}

先通过反序列化拿到rdd和func,跟踪程序的方法调用链,
在这里rdd是驱动程序中的
rdd_increace:MapPartitionsRDD对象;
func是一个函数其类型为:
(ctx: TaskContext, it: Iterator[T]): => Iterator[U]

func函数的函数体来源于rdd_increace.collect() ,并在调用过程中一直往后传递,具体传递过程可跟踪源码中的调用链。

def collect(): Array[T] = withScope {
  val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
  Array.concat(results: _*)
}

在这里func反序列化后的大概结果

val func=(ctx: TaskContext, it: Iterator[Int]) => {it.toArray}
2.2,反序列化后执行func函数
func(context, rdd.iterator(partition, context))

查看 rdd.iterator(partition, context)
iterator(split: Partition, context: TaskContext)方法
是在RDD类中定义的不可变函数,其作用是要么从缓存checkpoint中读取数据要么计算数据。其中执行逻辑计算的关键代码是compute(split, context)。

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  if (storageLevel != StorageLevel.NONE) {
    getOrCompute(split, context)
  } else {
    computeOrReadCheckpoint(split, context)
  }
}


private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
  if (isCheckpointedAndMaterialized) {
    firstParent[T].iterator(split, context)
  } else {
    compute(split, context)
  }
}

compute(split: Partition, context: TaskContext): Iterator[T] 是定义在RDD中的抽象类,具体实现在RDD子类中。

def compute(split: Partition, context: TaskContext): Iterator[T]
三,MapPartitionsRDD 类
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
    var prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
    preservesPartitioning: Boolean = false,
    isFromBarrier: Boolean = false,
    isOrderSensitive: Boolean = false)
  extends RDD[U](prev) {
  .
  .
  .
override def compute(split: Partition, context: TaskContext): Iterator[U] =
  f(context, split.index, firstParent[T].iterator(split, context))
 .
 .
 .
}

此处f函数是类构造器参数,它的值是在上文驱动程序map转换操作的的值,其函数体为“_+1”操作。
在执行f函数之前,先计算函数参数的结果,这里firstParent[T]返回父依赖的第一个RDD(MapPartitionsRDD只会有一个父RDD,UnionRDD会有多个),父RDD重复这个iterator->compute方法调用过程执行自己的compute(split: Partition, context: TaskContext)函数并返回处理的结果组成的Iterator[T],
这里的父RDD为ParallelCollectionRDD

四,ParallelCollectionRDD
override def compute(s: Partition, context: TaskContext): Iterator[T] = {
  new InterruptibleIterator(context, s.asInstanceOf[ParallelCollectionPartition[T]].iterator)
}

ParallelCollectionRDD的compute函数将集合元素的迭代器包装为InterruptibleIterator后返回,因为其没有上一级的RDD,所以并没有继续迭代调用。到此为止RDD的依赖链执行完毕。

五,ShuffleMapTask

Task有俩类型,ResultTask和ShuffleMapTask。ShuffleMapTask会涉及到临时缓存文件的IO。

可以看出这里在ShuffleMapTask写缓存的时候也是和ResultTask通过iterator调用compute方法的方式触发rdd依赖链各算子的顺序计算的。

六,总结

1,RDD算子的计算逻辑是通过函数类型的类参数传递给各RDD类的,并在RDD的
def compute(split: Partition, context: TaskContext): Iterator[T]
类方法中被调用。compute被
final def iterator(split: Partition, context: TaskContext): Iterator[T]
调用,
2,如果RDD的父RDD不为Nil,则在执行compute方法时,会通过计算方法参数值的方式迭代执行其父RDD的compute方法。通过此种方式可以保证RDD DAG中每个RDD都会被顺序执行。如果某个RDD的父级RDD存在检查点checkpoint,则从缓存中读取数据不用重复计算。
3,RDD的执行时在Task任务的runTask(context: TaskContext)方法中被调用的,Task任务是在Application被提交后Task任务生成并分配到特定Excutor中执行的,转换RDD不会提交Task,行动RDD才会,所以没有行动RDD不会触发任务执行。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/583503.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号