栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark源代码解读之rdd分区策略

Spark源代码解读之rdd分区策略

分区数量设置

目录

分区数量设置

 分区数据分配策略


//创建RDD
val rdd1 = sc.parallelize(seq)

 parallelize方法源码:

def parallelize[T: ClassTag](
    seq: Seq[T],
    numSlices: Int = defaultParallelism): RDD[T] = withScope {
  assertNotStopped()
  new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}

第二个参数numSlices就是表示分区数量,当使用parallelize方法创建rdd时可以指定分区数量也可以不指定,当不指定分区数量时会有一个默认分区数量(并行度)。

当没有配置分区数量时:

def defaultParallelism: Int = {
  assertNotStopped()
  taskScheduler.defaultParallelism
}

 点击taskScheduler.defaultParallelism的defaultParallelism

// Get the default level of parallelism to use in the cluster, as a hint for sizing jobs.
def defaultParallelism(): Int

可以看到是一个抽象的方法,我们找它的实现方法,

override def defaultParallelism(): Int = backend.defaultParallelism()

 再点击defaultParallelism()

def defaultParallelism(): Int

依然是一个抽象方法,继续找实现类LocalSchedulerBackend中的实现方法

override def defaultParallelism(): Int =
  scheduler.conf.getInt("spark.default.parallelism", totalCores)

可以看到如果我们配置spark.default.parallelism这个参数,默认分区就是spark.default.parallelism的值,如果没有就是当前环境的所有核数totalCores。

 分区数据分配策略

 每个分区存储哪些数据呢?具体的计算方式如下:点击ParallelCollectionRDD类,

private[spark] class ParallelCollectionRDD[T: ClassTag](
    sc: SparkContext,
    @transient private val data: Seq[T],
    numSlices: Int,
    locationPrefs: Map[Int, Seq[String]])
    extends RDD[T](sc, Nil) {
  // TODO: Right now, each split sends along its full data, even if later down the RDD chain it gets
  // cached. It might be worthwhile to write the data to a file in the DFS and read it in the split
  // instead.
  // UPDATE: A parallel collection can be checkpointed to HDFS, which achieves this goal.
    
//获取分区数
  override def getPartitions: Array[Partition] = {
    val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
    slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
  }

  override def compute(s: Partition, context: TaskContext): Iterator[T] = {
    new InterruptibleIterator(context, s.asInstanceOf[ParallelCollectionPartition[T]].iterator)
  }

  override def getPreferredLocations(s: Partition): Seq[String] = {
    locationPrefs.getOrElse(s.index, Nil)
  }
}

 ParallelCollectionRDD类中getPartitions方法便是获取分区分配结果,slice方法中进行具体分配。

def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
  if (numSlices < 1) {
    throw new IllegalArgumentException("Positive number of partitions required")
  }
  // Sequences need to be sliced at the same set of index positions for operations
  // like RDD.zip() to behave as expected
  def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
    (0 until numSlices).iterator.map { i =>
      val start = ((i * length) / numSlices).toInt
      val end = (((i + 1) * length) / numSlices).toInt
      (start, end)
    }
  }
....
}

 可以看到positions方法就是具体的分区分配策略。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/350087.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号