栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

9.2.6、Spark Core

9.2.6、Spark Core

1、求3.14的值(基本分区数)

分区数可以设置,由parallelize第二个参数指定,默认分区数等于并行度,并行度大小可以由conf.setMaster设置

object Demo11Pi {

  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo11Pi")

    
    conf.setMaster("local")

    val sc: SparkContext = new SparkContext(conf)

    
    val sum: Int = 10000
    val listRDD: RDD[Int] = sc.parallelize(1 to sum, 5)

    val random: Random = new Random()

    //在圆内的
    val circleCount: Long = listRDD.filter(line => {
      val x: Double = random.nextDouble() * 2 - 1
      val y: Double = random.nextDouble() * 2 - 1
      (x * x + y * y <= 1)
    }).count()

    println(circleCount.toDouble / sum * 4)
  }
}
2、运行结构

在executor中还有一个blockmanager用于管理executor中的数据

(1)RDD中的数据
(2)累加器和广播变量
(3)shuffle产生的文件

3、shuffle过程,reduce分区数

问题:每一个key对应的value不一定都是在一个partition中 ,也不太可能在同一个节点上,因为RDD是分布式的弹性的数据集,他的partition极有可能分布在各个节点上。

如何聚合?

Shuffle Write:上一个stage的每个map task就必须保证将自己处理 的当前分区中的数据相同的key写入一个分区文件中,可能会写入多个 不同的分区文件中

Shuffle Read:reduce task就会从上一个stage的所有task所在的机 器上寻找属于自己的那些分区文件,这样就可以保证每一个key所对应 的value都会汇聚到同一个节点上去处理和聚合

//分区聚合产生shuffle过程
//reduceByKey过程可以指定在reduce阶段的分区数
//shuffle类的算子可以手动指定分区数,reduce的个数

conf.set("spark.default.parallelism","4")

repartition:可以通过shuffle改变分区数,不做任何逻辑上的处理,该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true
coalesce :根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率,参数shuffle默认为false

object Demo11words {

  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("Demo06cache").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)
    val wordRDD = sc.textFile("sparkproject/data/words.txt", 2)
    println("wordRDD的分区数为:" + wordRDD.getNumPartitions)
    //如果没有shuffle过程,子RDD的分区个数由父RDD分区数决定
    val flatRDD = wordRDD.flatMap(line => line.split(","))
    println("flatRDD的分区数为:" + flatRDD.getNumPartitions)

    val wordsRDD = flatRDD.map(word => (word, 1))
    //也可以改变分区数
    val eeRDD = wordsRDD.repartition(5)
    //分区聚合产生shuffle过程
    //reduceByKey过程可以指定在reduce阶段的分区数
    //shuffle类的算子可以手动指定分区数,reduce的个数

    
    conf.set("spark.default.parallelism","4")
    val reduceRDD = wordsRDD.reduceByKey(_ + _)
    println("reduceRDD的分区数为:" + reduceRDD.getNumPartitions)
  }
}
4、SortShuffle的运行机制(2种)

早期的shuffle是hashshuffle,每个task最终一起进行shuffle,导致每个核多个task,小文件较多
改进的hashshuffle机制,每个核里面的文件先聚合,每个核一个文件进行shuffle,但是任务过多,核过多小文件还是多
最终使用sorkshuffle

普通运行机制:10000条写一次(根据数据量,溢写排序)(mapreduce是80M溢写) 在划分 stage 时,最后一个 stage
称为 finalStage,它本质上是一个 ResultStage 对象,前

面的所有 stage 被称为 ShuffleMapStage。

ShuffleMapStage 的结束伴随着 shuffle 文件的写磁盘

bypass运行机制(不排序,提高效率)

bypass运行机制的触发条件如下:
shuffle reduce task数量小于spark.shuffle.sort.bypassMergeThreshold=200参数的值
不是聚合类的 shuffle 算子

5、任务调度

driver是一个进程,applicationMaster是一个进程(集群模式就是将Driver放在内部)

Spark RDD 通过其 Transactions 操作,形成了 RDD 血缘(依赖)关系图,即 DAG,最后通过 Action 的调用,触发 Job 并调度执行,执行过程中会创建两个调度器:DAGScheduler和 TaskScheduler。
➢ DAGScheduler 负责 Stage 级的调度,主要是将 job 切分成若干 Stages,并将每个 Stage打包成 TaskSet 交给 TaskScheduler 调度。

➢ TaskScheduler 负责 Task 级的调度,将 DAGScheduler 给过来的 TaskSet 按照指定的调度策略分发到 Executor 上执行,调度过程中 SchedulerBackend 负责提供可用资源,其中SchedulerBackend 有多种实现,分别对接不同的资源管理系统

6、 Spark Task 级调度

TaskSetManager 负 责监控 管理 同一 个 Stage 中的 Tasks, TaskScheduler 就是以TaskSetManager 为单元来调度任务

7、内存

统一内存管理

存储内存和执行内存:
若存储内存不够借用执行内存,当执行内存需要时必须还回去
若执行内存不够借用存储内存,当存储内存需要时不用还回去(存储丢了可以找回,时间资源而已,但是执行内存不够导致执行结果不准确)

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/582430.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号