package pro_spark
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object ArrayCreateRDDByPartition03 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("ArrayCreateRDDByPartition03").setMaster("local[*]")
val sc = new SparkContext(conf)
val listRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3), 3)
listRDD.saveAsTextFile("output")
"""
| def makeRDD[T: ClassTag](
| seq: Seq[T],
| numSlices: Int = defaultParallelism): RDD[T] = withScope {
| parallelize(seq, numSlices)
| }
|
| def parallelize[T: ClassTag](
| seq: Seq[T],
| numSlices: Int = defaultParallelism): RDD[T] = withScope {
| assertNotStopped()
| new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
| }
|
| private[spark] class ParallelCollectionRDD[T: ClassTag](
| sc: SparkContext,
| @transient private val data: Seq[T],
| numSlices: Int,
| locationPrefs: Map[Int, Seq[String]])
| extends RDD[T](sc, Nil)
|
| override def getPartitions: Array[Partition] = {
| val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
| slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
| }
|
| def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
| if (numSlices < 1) {
| throw new IllegalArgumentException("Positive number of partitions required")
| }
| // Sequences need to be sliced at the same set of index positions for operations
| // like RDD.zip() to behave as expected
| def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
| (0 until numSlices).iterator.map { i =>
| val start = ((i * length) / numSlices).toInt
| val end = (((i + 1) * length) / numSlices).toInt
| (start, end)
| }
| }
| seq match {
| case r: Range =>
| positions(r.length, numSlices).zipWithIndex.map { case ((start, end), index) =>
| // If the range is inclusive, use inclusive range for the last slice
| if (r.isInclusive && index == numSlices - 1) {
| new Range.Inclusive(r.start + start * r.step, r.end, r.step)
| }
| else {
| new Range(r.start + start * r.step, r.start + end * r.step, r.step)
| }
| }.toSeq.asInstanceOf[Seq[Seq[T]]]
| case nr: NumericRange[_] =>
| // For ranges of Long, Double, BigInteger, etc
| val slices = new ArrayBuffer[Seq[T]](numSlices)
| var r = nr
| for ((start, end) <- positions(nr.length, numSlices)) {
| val sliceSize = end - start
| slices += r.take(sliceSize).asInstanceOf[Seq[T]]
| r = r.drop(sliceSize)
| }
| slices
| case _ =>
| val array = seq.toArray // To prevent O(n^2) operations for List etc
| positions(array.length, numSlices).map { case (start, end) =>
| array.slice(start, end).toSeq
| }.toSeq
| }
| }
|
| override def slice(from: Int, until: Int): Array[T] = {
| val reprVal = repr
| val lo = math.max(from, 0)
| val hi = math.min(math.max(until, 0), reprVal.length)
| val size = math.max(hi - lo, 0)
| val result = java.lang.reflect.Array.newInstance(elementClass, size)
| if (size > 0) {
| Array.copy(reprVal, lo, result, 0, size)
| }
| result.asInstanceOf[Array[T]]
| }
|
|""".stripMargin
"""
| seq = 1, 2, 3 、numSlices = 3
|
| def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
| (0 until numSlices).iterator.map { i =>
| val start = ((i * length) / numSlices).toInt
| val end = (((i + 1) * length) / numSlices).toInt
| (start, end)
| }
| }
|
| start = (0 * 3) / 3 = 0
| end = (0 + 1) * 3 / 3 = 1
| (0,1)
| .......
| start = (2 * 3) / 3 = 2
| end = (2 + 1) * 3 / 3 = 3
| (2,3)
|
| override def slice(from: Int, until: Int): Array[T] = {
| val reprVal = repr
| val lo = math.max(from, 0)
| val hi = math.min(math.max(until, 0), reprVal.length)
| val size = math.max(hi - lo, 0)
| val result = java.lang.reflect.Array.newInstance(elementClass, size)
| if (size > 0) {
| Array.copy(reprVal, lo, result, 0, size)
| }
| result.asInstanceOf[Array[T]]
| }
| seq = 1, 2, 3 (1,1)(2,2)(3,3)
|
|""".stripMargin
}
}