栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark编程03——RDD并行度与分区

spark编程03——RDD并行度与分区

默认情况下,Spark可以将一个作业切分多个任务后,发送给Executor节点并行计算,而能够并行计算的任务数量我们称之为并行度,这个数量可以在构建RDD时指定。

1.从内存中读取:数据可以按照并行度的设定进行数据的分区操作,数据分区规则的Spark核心源码如下:

def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
  (0 until numSlices).iterator.map { i =>
    val start = ((i * length) / numSlices).toInt
    val end = (((i + 1) * length) / numSlices).toInt
    (start, end)
  }
}

2.从文件中读取:数据是按照Hadoop文件读取的规则进行切片分区,而切片规则和数据读取的规则有些差异,具体Spark核心源码如下:

public InputSplit[] getSplits(JobConf job, int numSplits)
    throws IOException {

    long totalSize = 0;                           // compute total size
    for (FileStatus file: files) {                // check we have valid files
      if (file.isDirectory()) {
        throw new IOException("Not a file: "+ file.getPath());
      }
      totalSize += file.getLen();
    }

    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
    long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
      FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
      
    ...
    
    for (FileStatus file: files) {
    
        ...
    
    if (isSplitable(fs, path)) {
          long blockSize = file.getBlockSize();
          long splitSize = computeSplitSize(goalSize, minSize, blockSize);

          ...

  }
  protected long computeSplitSize(long goalSize, long minSize,
                                       long blockSize) {
    return Math.max(minSize, Math.min(goalSize, blockSize));
  }

例:

import org.apache.spark.{SparkConf, SparkContext}
object spark03_并行与分区 {
  def main(args: Array[String]): Unit = {
    val sparkConf=new SparkConf().setMaster("local").setAppName("spark03")
    val sc=new SparkContext(sparkConf)
    //从集合(内存)
    val rdd1=sc.makeRDD(List(1,2,3,4),3)
    rdd1.saveAsTextFile("output")
    sc.stop()
  }
}

所形成的三个分区: 

 

import org.apache.spark.{SparkConf, SparkContext}
object spark03_并行与分区 {
  def main(args: Array[String]): Unit = {
    val sparkConf=new SparkConf().setMaster("local").setAppName("spark03")
    val sc=new SparkContext(sparkConf)
    //读文件数据
    val rdd2=sc.makeRDD("datas/1.txt",2)
    rdd2.saveAsTextFile("output1")
    sc.stop()
  }
}

形成的两个分区:

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/752798.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号