栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

【Spark程序执行1】SparkContext对象构建以及RDD依赖解析

【Spark程序执行1】SparkContext对象构建以及RDD依赖解析

【Spark程序执行1】SparkContext对象构建
【Spark程序执行2】阶段划分(dagScheduler)

1,构建SparkContext对象,以及该对象核心属性介绍:
class SparkContext(config: SparkConf) extends Logging {
   ......
   //spark配置信息:基础配置 | 环境配置
  private var _conf: SparkConf = _ 
  private var _eventLogDir: Option[URI] = None
  private var _eventLogCodec: Option[String] = None
  private var _listenerBus: LiveListenerBus = _
  //spark环境变量
  
  private var _env: SparkEnv = _
  //spark状态传输
  private var _statusTracker: SparkStatusTracker = _
  private var _progressBar: Option[ConsoleProgressBar] = None
  private var _ui: Option[SparkUI] = None
  //Hadoop配置信息属性,加载Hadoop相关属性:*.xml配置
  private var _hadoopConfiguration: Configuration = _
  //executer内存
  private var _executorMemory: Int = _
  //通信后台 ,主要和Excutor进行通信
  private var _schedulerBackend: SchedulerBackend = _
  //任务调度器,调度任务,
  private var _taskScheduler: TaskScheduler = _
  //心跳接受
  private var _heartbeatReceiver: RpcEndpointRef = _
  //阶段调度,阶段划分以及任务切分
  @volatile private var _dagScheduler: DAGScheduler = _
  private var _applicationId: String = _
  private var _applicationAttemptId: Option[String] = None
  private var _eventLogger: Option[EventLoggingListener] = None
  private var _driverLogger: Option[DriverLogger] = None
  private var _executorAllocationManager: Option[ExecutorAllocationManager] = None
  private var _cleaner: Option[ContextCleaner] = None
  //监听
  private var _listenerBusStarted: Boolean = false
  private var _jars: Seq[String] = _
  private var _files: Seq[String] = _
  private var _shutdownHookRef: AnyRef = _
  //状态存储
  private var _statusStore: AppStatusStore = _
  //心跳发射
  private var _heartbeater: Heartbeater = _
  private var _resources: scala.collection.immutable.Map[String, ResourceInformation] = _
  private var _shuffleDriverComponents: ShuffleDriverComponents = _
  private var _plugins: Option[PluginContainer] = None
......
}
2.RDD依赖 2.1:RDD之间的窄依赖

从构建RDD的其中一个方法入手:通过读取文件构建RDD

  
  def textFile(
      path: String,
      minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
    assertNotStopped()
    //读取hadoop file文件
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString).setName(path)
  }

通过map构建MapPartitionsRDD对象,在构建MapPartitionRDD会传递一个this对象,this就是当前RDD对象,那说明转换算子时,是将当前对象传递给下一个RDD对象;

  
  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    //
    new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
  }

如下图所示:

每个RDD会继承RDD抽象类,并且会调用RDD的构造函数

依赖RDD的辅助构造函数,并且在辅助构造函数中会传递一个OneToOneDependency对象,并且构建这个对象时,会将父RDD对象传递;

  
  def this(@transient oneParent: RDD[_]) =
  //在辅助构造函数中,会传递一个oneToOneDependency 对象
    this(oneParent.context, List(new OneToOneDependency(oneParent)))

RDD的构造函数

abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]] //依赖关系列表
  ) extends Serializable with Logging {
......
}

OneToOneDependency对象会继承NarrowDependency类,表示窄依赖;


@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int): List[Int] = List(partitionId)
}

而NarrowDependency 会将传递的RDD作为自己的属性对象,做后续操作

@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
  
  def getParents(partitionId: Int): Seq[Int]

  override def rdd: RDD[T] = _rdd
}

通过上述操作,转换算子通过窄依赖形成RDD之间的依赖关系;

2.2 RDD之间的宽依赖(Shuffle依赖):

通过rdd的groupBy来说明宽依赖

  def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)
      : RDD[(K, Iterable[T])] = withScope {
    val cleanF = sc.clean(f)
    //调用groupByKey
    this.map(t => (cleanF(t), t)).groupByKey(p)
  }
  def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
    // groupByKey shouldn't use map side combine because map side combine does not
    // reduce the amount of data shuffled and requires all map side data be inserted
    // into a hash table, leading to more objects in the old gen.
    val createCombiner = (v: V) => CompactBuffer(v)
    val mergevalue = (buf: CompactBuffer[V], v: V) => buf += v
    val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
    //合并操作
    val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
      createCombiner, mergevalue, mergeCombiners, partitioner, mapSideCombine = false)
    bufs.asInstanceOf[RDD[(K, Iterable[V])]]
  }

combineByKeyWithClassTag方法中会构建ShuffleRDD对象,

ShuffleRDD会继承RDD抽象类,并且在调用父RDD构造函数时,传递依赖参数时,传递Nil默认值。

@DeveloperApi
class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient var prev: RDD[_ <: Product2[K, V]],
    part: Partitioner)
    //Nil为默认的依赖关系
  extends RDD[(K, C)](prev.context, Nil) {
  ......
  }


getDependencies()方法,获取依赖关系,依赖关系为ShuffleDependency

 override def getDependencies: Seq[Dependency[_]] = {
    val serializer = userSpecifiedSerializer.getOrElse {
      val serializerManager = SparkEnv.get.serializerManager
      if (mapSideCombine) {
        serializerManager.getSerializer(implicitly[ClassTag[K]], implicitly[ClassTag[C]])
      } else {
        serializerManager.getSerializer(implicitly[ClassTag[K]], implicitly[ClassTag[V]])
      }
    }
    List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
  }

ShuffleDependency类属性

class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient private val _rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializer: Serializer = SparkEnv.get.serializer,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false,
    val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor)
  extends Dependency[Product2[K, V]] {
  }

依赖类说明


@DeveloperApi
abstract class Dependency[T] extends Serializable {
  def rdd: RDD[T]
}



 //不会引起shuffle操作的算子之间的依赖关系
@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
  
  def getParents(partitionId: Int): Seq[Int]

  override def rdd: RDD[T] = _rdd
}



 //引起shuffle操作的算子之间的依赖关系
@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient private val _rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializer: Serializer = SparkEnv.get.serializer,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false,
    val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor)
  extends Dependency[Product2[K, V]] {

  if (mapSideCombine) {
    require(aggregator.isDefined, "Map-side combine without Aggregator specified!")
  }
  override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]

  private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
  private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
  // Note: It's possible that the combiner class tag is null, if the combineByKey
  // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
  private[spark] val combinerClassName: Option[String] =
    Option(reflect.classTag[C]).map(_.runtimeClass.getName)

  val shuffleId: Int = _rdd.context.newShuffleId()

  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, this)

  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
  _rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)
}



 //一对一依赖,窄依赖的子类,也是不会引起shuffle操作算子之间的依赖关系
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int): List[Int] = List(partitionId)
}



 //多对一依赖 窄依赖的子类
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
  extends NarrowDependency[T](rdd) {

  override def getParents(partitionId: Int): List[Int] = {
    if (partitionId >= outStart && partitionId < outStart + length) {
      List(partitionId - outStart + inStart)
    } else {
      Nil
    }
  }
}

RDD之间的依赖关系图:
MapPartitionsRDD之间通过窄依赖,ShuffledRDD与MapPartitionsRDD之间,以及ShuffledRDD与ShuffledRDD之间通过ShuffleDependency

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/699978.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号