定义
import org.apache.spark.util.AccumulatorV2
import scala.collection.mutable
// T: (String, Int) 这里使用案例是统计词频 (单词,出现次数)
class WordCountAccumulator[T] private extends AccumulatorV2[T, mutable.Map[String, Int]] {
val _map = mutable.Map[String, Int]()
// 判断累加器是否是初始状态
override def isZero: Boolean = _map.isEmpty
override def copy(): AccumulatorV2[T, mutable.Map[String, Int]] = {
// 复制当前累加器内容到新的累加器里面并返回新的累加器
val newAcc = new WordCountAccumulator[T]
for (i <- _map) {
newAcc._map += i
}
newAcc
}
override def reset(): Unit = {
// 重置累加器数据
_map.clear()
}
override def add(v: T): Unit = {
// v应该是PariRDD
// 在累加器中累加单词个数
val wordMapping: (String, Int) = v.asInstanceOf[(String, Int)]
_map += Tuple2(wordMapping._1, wordMapping._2 + _map.getOrElse(wordMapping._1, 0))
}
override def merge(other: AccumulatorV2[T, mutable.Map[String, Int]]): Unit =