栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark中的累加器和广播变量

Spark中的累加器和广播变量

目录
  • 累加器
  • 广播变量
  • 总结

累加器

用来修改Executor端取不到的一些值

我们这里做一个计算,来计算这里foreach了多少次,按照我们传统的思维,这里我们是可以计算出结果是1000次的,也就是有1000个学生,但我们看到结果并非如此

def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("Demo16Cache")

    val sc: SparkContext = new SparkContext(conf)

    val stuRDD: RDD[String] = sc.textFile("D:\BigDaTa\JAVA_Project\ShuJia01\data\students.txt")

    var i:Int = 0
    println("i is " + i)
    stuRDD.foreach(line=>{
      i += 1
      println(line)
    })
    println("i is " + i)

我们可以看到这里面的结果,两个i都是0,这是为什么呢

我们把这里的代码拿出来单独做一个分析
这里一段代码是在Driver端的JVM中运行的,一段代码是在Executor端的JVM中运行的,所以我们最后的代码是0

这时候我们非要做一个累加操作,就需要用一个累加器

也就是算子内部的代码要使用外部的变量

val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("Demo16Cache")

    val sc: SparkContext = new SparkContext(conf)

    //在Driver端通过累加器创建一个变量l
    //可以当成long类型
    val l: LongAccumulator = sc.longAccumulator

    val stuRDD: RDD[String] = sc.textFile("D:\BigDaTa\JAVA_Project\ShuJia01\data\students.txt")

    println("l is " + l.value)
    stuRDD.foreach(line=>{
      //在算子内部使用累加器进行累加
      l.add(1)
    })
    //在Driver端获取累加器的最终结果
    println("l is " + l.value)

算子内部进行累加,算子外部计算出结果
这里我们可以看到结果已经被算出来了

广播变量

将数据发送到executor中,而不是task中
这里我们根据学号筛选出一部分学生

def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("Demo10Broadcast")

    val sc: SparkContext = new SparkContext(conf)
    val stuRDD: RDD[String] = sc.textFile("D:\BigDaTa\JAVA_Project\ShuJia01\data\students.txt")

    val stuIDs = List("1500100001","1500100011","1500100021","1500100031")

    stuRDD.filter(stu=>{
      val id: String = stu.split(",")(0)
      stuIDs.contains(id)
    })foreach(println)
  }

可以看到结果顺利的出来了

我们再去仔细的分析这段代码

这里我们可以看到,被发送到executor中执行的代码里,包含了stuIDs这个List,而每一个task任务都包含着一个List,那我们这里就需要拿取很多次的List,再发送到executor中,需要网络传输,这样就会十分的浪费资源,于是我们可以使用spark中的广播变量
我们只需要将一份我们需要的Driver端的数据放到Executor中即可,在Executor中的每一个task都能找到这份数据

使用广播变量

val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("Demo10Broadcast")

    val sc: SparkContext = new SparkContext(conf)

    val stuRDD: RDD[String] = sc.textFile("D:\BigDaTa\JAVA_Project\ShuJia01\data\students.txt")


    val stuIDs = List("1500100001","1500100011","1500100021","1500100031")

    //在driver端将stuIDs广播到每一个executor中
    val stuIDsBro: Broadcast[List[String]] = sc.broadcast(stuIDs)

//    stuRDD.filter(stu=>{
//      val id: String = stu.split(",")(0)
//      stuIDs.contains(id)
//    })foreach(println)

    stuRDD.filter(stu=>{
      val strings: Array[String] = stu.split(",")
      val id: String = strings(0)
      val stuIDsV: List[String] = stuIDsBro.value
      stuIDsV.contains(id)
    })foreach(println)
总结

累加器和广播变量的区别在于
累加器可以获取到Driver端的数据,并且做一个更改
广播变量可以获取到Driver端的数据发送到executor中,只是拿来使用,并不对Driver的数据做一个更改

感谢阅读,我是啊帅和和,一位大数据专业大四学生,祝你快乐。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/460478.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号