栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

sparkstream 自定义map累加器 支持删除kv对

sparkstream 自定义map累加器 支持删除kv对

删除kv对方法: 先用add方法设置value为-1,然后用reset清除掉value=-1的值.

import org.apache.spark.SparkContext
import org.apache.spark.streaming.Time
import org.apache.spark.util.AccumulatorV2

import scala.collection.mutable
import scala.collection.mutable.ListBuffer


object AccumulatorIfDataOut {

    // 设计单例模式用来初始化 map累加器 这里主要是针对从checkpoint恢复,累加器会丢失的现象来处理.
    @volatile private var instance: AccumulatorIfDataOut[(Time, Int)] = null

    def getInstance(sc: SparkContext) = {
        if (instance == null) {
            synchronized {
                if (instance == null) {
                    instance = new AccumulatorIfDataOut[(Time, Int)]()
                }
            }
        }
        // 累加器需要在spark上下文中注册才能使用; sc必须是rdd.sparkContext 而不是 ssc.sparkContext
        if (!instance.isRegistered) {
            sc.register(instance)
        }
        instance
    }

}



class AccumulatorIfDataOut[T] private extends AccumulatorV2[T, mutable.Map[Time, Int]]
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/433718.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号