栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Spark源码篇-分区:读取内存数据

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Spark源码篇-分区:读取内存数据

package pro_spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object ArrayCreateRDDByPartition03 {

  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf().setAppName("ArrayCreateRDDByPartition03").setMaster("local[*]")

    val sc = new SparkContext(conf)

    val listRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3), 3)

    listRDD.saveAsTextFile("output")

    """
      |  def makeRDD[T: ClassTag](
      |      seq: Seq[T],
      |      numSlices: Int = defaultParallelism): RDD[T] = withScope {
      |    parallelize(seq, numSlices)
      |  }
      |
      |  def parallelize[T: ClassTag](
      |      seq: Seq[T],
      |      numSlices: Int = defaultParallelism): RDD[T] = withScope {
      |    assertNotStopped()
      |    new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
      |  }
      |
      |  private[spark] class ParallelCollectionRDD[T: ClassTag](
      |    sc: SparkContext,
      |    @transient private val data: Seq[T],
      |    numSlices: Int,
      |    locationPrefs: Map[Int, Seq[String]])
      |    extends RDD[T](sc, Nil)
      |
      |  override def getPartitions: Array[Partition] = {
      |    val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
      |    slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
      |  }
      |
      |  def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
      |    if (numSlices < 1) {
      |      throw new IllegalArgumentException("Positive number of partitions required")
      |    }
      |    // Sequences need to be sliced at the same set of index positions for operations
      |    // like RDD.zip() to behave as expected
      |    def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
      |      (0 until numSlices).iterator.map { i =>
      |        val start = ((i * length) / numSlices).toInt
      |        val end = (((i + 1) * length) / numSlices).toInt
      |        (start, end)
      |      }
      |    }
      |    seq match {
      |      case r: Range =>
      |        positions(r.length, numSlices).zipWithIndex.map { case ((start, end), index) =>
      |          // If the range is inclusive, use inclusive range for the last slice
      |          if (r.isInclusive && index == numSlices - 1) {
      |            new Range.Inclusive(r.start + start * r.step, r.end, r.step)
      |          }
      |          else {
      |            new Range(r.start + start * r.step, r.start + end * r.step, r.step)
      |          }
      |        }.toSeq.asInstanceOf[Seq[Seq[T]]]
      |      case nr: NumericRange[_] =>
      |        // For ranges of Long, Double, BigInteger, etc
      |        val slices = new ArrayBuffer[Seq[T]](numSlices)
      |        var r = nr
      |        for ((start, end) <- positions(nr.length, numSlices)) {
      |          val sliceSize = end - start
      |          slices += r.take(sliceSize).asInstanceOf[Seq[T]]
      |          r = r.drop(sliceSize)
      |        }
      |        slices
      |      case _ =>
      |        val array = seq.toArray // To prevent O(n^2) operations for List etc
      |        positions(array.length, numSlices).map { case (start, end) =>
      |            array.slice(start, end).toSeq
      |        }.toSeq
      |    }
      |  }
      |
      |  override def slice(from: Int, until: Int): Array[T] = {
      |     val reprVal = repr
      |     val lo = math.max(from, 0)
      |     val hi = math.min(math.max(until, 0), reprVal.length)
      |     val size = math.max(hi - lo, 0)
      |     val result = java.lang.reflect.Array.newInstance(elementClass, size)
      |     if (size > 0) {
      |      Array.copy(reprVal, lo, result, 0, size)
      |     }
      |     result.asInstanceOf[Array[T]]
      |  }
      |
      |""".stripMargin


    """
      | seq = 1, 2, 3 、numSlices = 3
      |
      | def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
      |      (0 until numSlices).iterator.map { i =>
      |        val start = ((i * length) / numSlices).toInt
      |        val end = (((i + 1) * length) / numSlices).toInt
      |        (start, end)
      |      }
      |    }
      |
      |  start = (0 * 3) / 3 = 0
      |  end   = (0 + 1) * 3 / 3 = 1
      |  (0,1)
      |  .......
      |  start = (2 * 3) / 3 = 2
      |  end   = (2 + 1) * 3 / 3 = 3
      |  (2,3)
      |
      |  override def slice(from: Int, until: Int): Array[T] = {
      |     val reprVal = repr
      |     val lo = math.max(from, 0)
      |     val hi = math.min(math.max(until, 0), reprVal.length)
      |     val size = math.max(hi - lo, 0)
      |     val result = java.lang.reflect.Array.newInstance(elementClass, size)
      |     if (size > 0) {
      |      Array.copy(reprVal, lo, result, 0, size)
      |     }
      |     result.asInstanceOf[Array[T]]
      |  }
      |  seq = 1, 2, 3 (1,1)(2,2)(3,3)
      |
      |""".stripMargin

  }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/888043.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号