模仿spark自带的累加器,重写方法
object Test{
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wordCount")
val sc: SparkContext = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List("hello","spark","hello"))
//累加器:
//创建累加器对象
val wcACC = new MyAccumulator
//向spark进行注册
sc.register(wcACC,"wordCount")
rdd.foreach(
//数据的累加(使用累加器)
word =>{
wcACC.add(word)
}
)
//获取累加器累加的结果
println(wcACC.value)//Map(spark -> 1, hello -> 2)
sc.stop()
}
//自定义数据累加器
//1,继承AccumulatorV2
//in:累加器输入的数据类型
//out:返回的数据类型
//2,重写方法(6)
class MyAccumulator extends AccumulatorV2[String,mutable.Map[String,Long]]{
private var wcMap = mutable.Map[String,Long]()
//判断是否为初始状态
//这里定义如果是
override def isZero: Boolean = {
wcMap.isEmpty
}
//复制
override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
new MyAccumulator
}
//重置
override def reset(): Unit = {
wcMap.clear()
}
//获取累加器需要计算的值
override def add(word: String): Unit = {
val newCnt = wcMap.getOrElse(word,0L)+1
wcMap.update(word,newCnt)
}
//driver端合并累加器
override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
val map1 = this.wcMap
val map2 = other.value
map2.foreach {
case (word, count) => {
val newCount = map1.getOrElse(word, 0L) + count
map1.update(word, newCount)
}
}
}
//累加器结果
override def value: mutable.Map[String, Long] = {
wcMap
}
}
}



