栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

【Spark Core】累加器

【Spark Core】累加器

累加器 定义
  • 累加器是分布式的共享只写变量
    共享:累加器的值由Driver端共享给Executor端
    只写:Executor端互相之间读取不到对方的累加器

  • 累加器可以替换一些需要shuffle的操作

问题引入
package SparkCore._06_累加器

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object _01_ {
  def main(args: Array[String]): Unit = {
    val sc: SparkConf = new SparkConf().setMaster("local[*]").setAppName(" ")
    val sparkContext = new SparkContext(sc)
    val sourceRDD: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4))
    //TODO 1.用累加替换reduce()
    //直接遍历就完事了
    var sum = 0;
    sourceRDD.foreach(
      num=>{
        sum += num
      }
    )
    println("sum="+sum)
   
    sparkContext.stop()
  }
}

执行结果为0

原因分析

Driver端的变量,通过RDD的算子闭包发送到Executor端,Executor分别获取自己分区的数据,还有计算逻辑。

executor1计算: sum = 1+2 executor2计算: sum = 3+4

但是Executor计算完后,应该将计算结果返回到Driver端,Spark闭包只知道将数据带到Executor,不知道往Driver端返回。

引入累加器

累加器是Spark中第二种数据结构,Spark会将累加器从Driver传递到Executor,再将结果返回到Driver。

累加器用来把Executor端变量信息聚合到Driver端,在Driver程序中定义的变量,在Executor端的每个Task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge

累加器的使用
package SparkCore._06_累加器

import org.apache.spark.{SparkConf, SparkContext}


object _02_ {
  def main(args: Array[String]): Unit = {
    val conf : SparkConf = new SparkConf().setMaster("local[*]").setAppName(" ")
    val sc  = new SparkContext(conf)
    val rdd = sc.makeRDD(List(1,2,3,4,5))
    //todo 1.获取系统的累加器,Spark默认提供了简单的数据聚合的累加器
    var sum = sc.longAccumulator("sum"); //sum是累加器的名字

    rdd.foreach(
      num => {
        // todo 2.使用累加器
        sum.add(num)
      }
    )
    // todo 3.获取累加器的值
    println("sum = " + sum.value)
  }

}

系统累加器有:longAccumulator,DoubleAccumulator,还有CollectionAccumulator(List)

累加器使用注意事项 1.少加问题
val rdd = sc.makeRDD(List(1,2,3,4,5))
// 获取系统的累加器,Spark默认提供了简单的数据聚合的累加器
var sum = sc.longAccumulator("sum"); //sum是累加器的名字
rdd.map(
  num => {
    // 使用累加器
sum.add(num)
num
  }
)
// 获取累加器的值
println("sum = " + sum.value)

执行结果:0
少加问题:如果在转换算子中调用累加器,如果没有行动算子的话,那么不会执行
解决:添加一个行动算子

package SparkCore._06_累加器

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object _03_ {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName(" ")
    val sc = new SparkContext(conf)
    val rdd = sc.makeRDD(List(1, 2, 3, 4, 5))

    var sum = sc.longAccumulator("sum"); //sum是累加器的名字
    val value: RDD[Int] = rdd.map(
      num => {
        // 使用累加器
        sum.add(num)
        num
      }
    )
   
    value.collect()
     // 获取累加器的值
    println("sum = " + sum.value)
  }
}

执行结果为:sum = 15

2.多加问题
val rdd = sc.makeRDD(List(1,2,3,4,5))
// 获取系统的累加器,Spark默认提供了简单的数据聚合的累加器
var sum = sc.longAccumulator("sum"); //sum是累加器的名字
val mapRDD = rdd.map(
  num => {
    // 使用累加器
	sum.add(num)
	num
  }
)
mapRDD.collect()
mapRDD.collect()
// 获取累加器的值
println("sum = " + sum.value)

执行结果:30
多加问题:累加器在SparkContext环境中是全局共享的,行动算子调用一次会执行一次,如果有多个行动算子,就会累加多次。

3.累加器一般使用方式

解决方法:一般情况下,将累加器放在行动算子中进行操作。

自定义累加器
package SparkCore._06_累加器

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.util.AccumulatorV2

import scala.collection.mutable






object _04_diy {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName(" ")
    val sc = new SparkContext(conf)

    val rdd = sc.makeRDD(List("hello","Spark","hello"))

    //todo 1.创建累加器
    val wcAcc = new myAcc()
    //todo 2.注册
    sc.register(wcAcc,"wordCountAcc")

    //todo 3.使用
    rdd.foreach(
      word=>{
        wcAcc.add(word)
      }
    )
    //todo 4.获取
    println(wcAcc.value)
  }

}
//6个方法
//泛型:IN 累加器输入的数据  out 累加器输出的数据类型
class myAcc extends AccumulatorV2[String,mutable.Map[String,Long]] {
  //创建一个Map做返回值
  private var wcMap = mutable.Map[String,Long]()

    //累加
  override def add(word: String): Unit = {
    val cnt: Long = wcMap.getOrElse(word, 0l)
    wcMap.update(word,cnt+1)
  }

  override def value: mutable.Map[String, Long] = {
    wcMap
  }
  //Driver端合并多个累加器
  override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
    val map1 = this.wcMap
    val map2 = other.value

    map2.foreach{
      case (word,cnt) =>{
        val newCnt = map1.getOrElse(word,0l) + cnt
        map1.update(word,newCnt)
      }
    }

  }

  override def isZero: Boolean = {
    wcMap.isEmpty
  }

  override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
    new  myAcc()
  }

  override def reset(): Unit = wcMap.clear()
}

Map(Spark -> 1, hello -> 2)

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/312926.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号