文章目录
- 1、实现原理
- 2、案例检测
- 3、自定义累加器 --- wordCount
1、实现原理
累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在 Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后,传回 Driver 端进行 merge。
2、案例检测
package test03_rdd.accumulator
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_RDD_Accumulator {
def main(args: Array[String]): Unit = {
// 创建 SparkConf 并设置 App 名称
val conf = new SparkConf().setMaster("local[*]").setAppName("Accumulator")
// 创建 SparkContext,该对象是提交 Spark App 的入口
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(List(1, 2, 3, 4))
// 方式一
val count1 = rdd.reduce(_+_)
println("count1_reduce",count1)
// 方式二
var sum = 0
val count2 = rdd.foreach(
num => {
sum += num
}
)
println("count2_foreach",count2)
sc.stop()
}
}
(count1_reduce,10)
(count2_foreach,())
通过上面的小案例我们设法实现累加的功能,reduce之前讲到过,分别进行分区内和分区间的计算,最终生成结果。同样的,按照分析,我们通过遍历数据得到每一个数据,然后通过sum进行累加求和,正常的应该是没有问题的,但是运行的结果却差强人意。
其实还是内部的原因,foreach遍历累加在Executor中进行,没有问题,但是计算好了之后并没有将计算的结果返回给Driver端,而Driver端的sum还是初始值0,println也在Driver端,所以此时输出会有问题。
而累加器的出现,就解决了节点计算,数据回送的问题。在初始的时候创建累加器,并将其发送到Executor端,Executor端计算完成后将累加器返回到Driver端,Driver端再将累加器中的数据进行合并,最终输出。
累加器是一个分布式共享只写变量:分布式在于它可以进行分发计算,共享在于Driver的累加器被多个Executor共享,只写变量在于累加器之间是互相读取不到的,不同的Executor之间累加器是独立的,只有Driver可以访问
使用Spark自带的累加器:
val rdd = sc.makeRDD(List(1, 2, 3, 4))
// 获取系统的累加器
val counter = sc.longAccumulator("sum")
rdd.foreach(
num => {
// 使用累加器
counter.add(num)
}
)
// 获取累加器的值
println(counter.value)
10
当我们把foreach算子转换为map算子的时候,运行结果又变成了0,这是因为map算子仅是转换算子,在没有触发job作业之前是不会执行的
rdd.map(
num => {
// 使用累加器
counter.add(num)
}
)
0
当我们将map操作转为RDD后,调用两次collect算子,会出现多加的情况。因为,当前累加器是全局的,每调用一次collect算子就会计算一次,两次collect也就会加两次。
val mapRDD = rdd.map(
num => {
// 使用累加器
counter.add(num)
}
)
mapRDD.collect()
mapRDD.collect()
20
注意:一般情况下,我们在行动算子中使用累加器!
返回顶部
3、自定义累加器 — wordCount
主要逻辑部分:
- 我们希望通过自定义累加器实现单词的统计,自定义累加器的使用需要向sc进行注册,注册完成后即可使用。
def main(args: Array[String]): Unit = {
// 创建 SparkConf 并设置 App 名称
val conf = new SparkConf().setMaster("local[*]").setAppName("Accumulator")
// 创建 SparkContext,该对象是提交 Spark App 的入口
val sc = new SparkContext(conf)
// 注册自定义的累加器
val myAccumulator = new MyAccumulator
// 向sc注册
sc.register(myAccumulator, "wc")
val rdd = sc.makeRDD(List("scala", "spark", "hadoop"))
rdd.foreach(
words => {
myAccumulator.add(words)
}
)
// 输出
println(myAccumulator.value)
sc.stop()
}
自定义累加器部分:
- 在定义累加器的时候我们需要使其继承AccoumulatorV2 ,并同时定义泛型
- IN:累加器输入的数据类型
- OUT:累加器返回的数据类型 mutable.Map[String,Long]
我们需要通过传入单词来进行统计,所以输入类型为String,在计算的时候通过单词进行统计,使用Map集合的形式,getOrElse()主要就是防范措施,如果有值,那就可以得到这个值,如果没有就会得到一个默认值;从API中可以看出,传入的参数是(key,default)这种形式,返回值是:如果有key那就get(key),如果没有,就返回default。这样也便于内部的计算。
class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {
// 创建接收器
private var wcMap = mutable.Map[String, Long]()
// 判断是否为初始状态
override def isZero: Boolean = {
wcMap.isEmpty
}
// 赋值一份
override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
new MyAccumulator()
}
// 重置(清空)
override def reset(): Unit = {
wcMap.clear()
}
// 获取累加器需要计算的值
override def add(word: String): Unit = {
val newCnt = wcMap.getOrElse(word, 0L) + 1
wcMap.update(word, newCnt)
}
// Driver合并多个累加器的结果
// 实际上就是两个mutable.Map的相加计算
override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
val map1 = this.wcMap
val map2 = other.value
map2.foreach {
case (word, count) => {
val newCount = map1.getOrElse(word, 0L) + count
map1.update(word, newCount)
}
}
}
// 累加器的结果
override def value: mutable.Map[String, Long] = wcMap
}
返回顶部



