累加器用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的变量,在Executor端的每个Task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge(合并操作)
举个例子,计算1+2+3+4:
val rdd = sc.makeRDD(List(1,2,3,4))
var sum = 0
rdd.foreach(
num => {
sum += num
}
)
println("sum = " + sum)
//sum = 0,因为executer的sum值无法传到driver中。
//若用rdd.collect().foreach()则为10
使用累加器:
val rdd = sc.makeRDD(List(1,2,3,4))
// 获取系统累加器
// Spark默认就提供了简单数据聚合的累加器
val sumAcc = sc.longAccumulator("sum")
//sc.doubleAccumulator
//sc.collectionAccumulator
rdd.foreach(
num => {
// 使用累加器
sumAcc.add(num)
}
)
// 获取累加器的值
println(sumAcc.value)
需要注意的是:
容易出现少加/多加的情况,比如转换算子中调用累加器,如果没有行动算子的话,那么不会执行。一般情况下,累加器会放置在行动算子进行操作。



