栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink去重方案

Flink去重方案

Flink去重

​ 去重计算应该是数据分析业务里面常见的指标计算,例如网站一天的访问用户数、广告的点击用户数等等,离线计算是一个全量、一次性计算的过程通常可以通过distinct的方式得到去重结果,而实时计算是一种增量、长期计算过程,我们在面对不同的场景,例如数据量的大小、计算结果精准度要求等可以使用不同的方案。此篇介绍如何通过编码方式实现精确去重,以一个实际场景为例:计算每个广告每小时的点击用户数,广告点击日志包含:广告位ID、用户设备ID(idfa/imei/cookie)、点击时间。

以下介绍4种去重方式:

MapState去重

SQL去重

HyperLogLog去重

bitmap精确去重

一 MapState去重 1.1 实现步骤分析
    为了当天的数据可重现,这里选择事件时间也就是广告点击时间作为每小时的窗口期划分数据分组使用广告位ID+点击事件所属的小时选择processFunction来实现,一个状态用来保存数据、另外一个状态用来保存对应的数据量计算完成之后的数据清理,按照时间进度注册定时器清理
1.2 实现 1.2.1 广告数据
case class AdData(id:Int,devId:String,time:Long)
1.2.2 分组数据
case class AdKey(id:Int,time:Long)
1.2.3 主流程
val env=StreamExecutionEnvironment.getExecutionEnvironment
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val kafkaConfig=new Properties()
    kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092")
    kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test1")
    val consumer=new FlinkKafkaConsumer[String]("topic1",new SimpleStringSchema,kafkaConfig)
    val ds=env.addSource(consumer)
      .map(x=>{
        val s=x.split(",")
        AdData(s(0).toInt,s(1),s(2).toLong)
      }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[AdData](Time.minutes(1)) {
      override def extractTimestamp(element: AdData): Long = element.time
    })
      .keyBy(x=>{
        val endTime= TimeWindow.getWindowStartWithOffset(x.time, 0,
          Time.hours(1).toMilliseconds) + Time.hours(1).toMilliseconds
        AdKey(x.id,endTime)
      })

​ 指定时间时间属性,这里设置允许1min的延时,可根据实际情况调整;
时间的转换选择TimeWindow.getWindowStartWithOffset Flink在处理window中自带的方法,使用起来很方便,第一个参数 表示数据时间,第二个参数offset偏移量,默认为0,正常窗口划分都是整点方式,例如从0开始划分,这个offset就是相对于0的偏移量,第三个参数表示窗口大小,得到的结果是数据时间所属窗口的开始时间,这里加上了窗口大小,使用结束时间与广告位ID作为分组的Key。

1.3 去重逻辑

​ 自定义Distinct1ProcessFunction 继承了KeyedProcessFunction, 方便起见使用输出类型使用Void,这里直接使用打印控制台方式查看结果,在实际中可输出到下游做一个批量的处理然后在输出;
定义两个状态:MapState,key表示devId, value表示一个随意的值只是为了标识,该状态表示一个广告位在某个小时的设备数据,如果我们使用rocksdb作为statebackend, 那么会将mapstate中key作为rocksdb中key的一部分,mapstate中value作为rocksdb中的value, rocksdb中value 大小是有上限的,这种方式可以减少rocksdb value的大小;另外一个ValueState,存储当前MapState的数据量,是由于mapstate只能通过迭代方式获得数据量大小,每次获取都需要进行迭代,这种方式可以避免每次迭代。

class Distinct1ProcessFunction extends KeyedProcessFunction[AdKey, AdData, Void] {
  var devIdState: MapState[String, Int] = _
  var devIdStateDesc: MapStateDescriptor[String, Int] = _

  var countState: ValueState[Long] = _
  var countStateDesc: ValueStateDescriptor[Long] = _

  override def open(parameters: Configuration): Unit = {

    devIdStateDesc = new MapStateDescriptor[String, Int]("devIdState", TypeInformation.of(classOf[String]), TypeInformation.of(classOf[Int]))
    devIdState = getRuntimeContext.getMapState(devIdStateDesc)

    countStateDesc = new ValueStateDescriptor[Long]("countState", TypeInformation.of(classOf[Long]))
    countState = getRuntimeContext.getState(countStateDesc)
  }

  override def processElement(value: AdData, ctx: KeyedProcessFunction[AdKey, AdData, Void]#Context, out: Collector[Void]): Unit = {

    val currW=ctx.timerService().currentWatermark()
    if(ctx.getCurrentKey.time+1<=currW) {
        println("late data:" + value)
        return
      }

    val devId = value.devId
    devIdState.get(devId) match {
      case 1 => {
        //表示已经存在
      }
      case _ => {
        //表示不存在
        devIdState.put(devId, 1)
        val c = countState.value()
        countState.update(c + 1)
        //还需要注册一个定时器
        ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey.time + 1)
      }
    }
    println(countState.value())
  }

  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[AdKey, AdData, Void]#OnTimerContext, out: Collector[Void]): Unit = {
    println(timestamp + " exec clean~~~")
    println(countState.value())
    devIdState.clear()
    countState.clear()
  }
}

​ 数据清理通过注册定时器方式ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey.time + 1)表示当watermark大于该小时结束时间+1就会执行清理动作,调用onTimer方法。

在处理逻辑里面加了

val currW=ctx.timerService().currentWatermark()
if(ctx.getCurrentKey.time+1<=currW){
        println("late data:" + value)
        return
  }

​ 主要考虑可能会存在滞后的数据比较严重,会影响之前的计算结果,做了一个类似window机制里面的一个延时判断,将延时的数据过滤掉,也可以使用OutputTag 单独处理。

二 SQL去重

​ 在MapState去重中介绍了使用编码方式完成去重,但是这种方式开发周期比较长,我们可能需要针对不同的业务逻辑实现不同的编码,对于业务开发来说也需要熟悉Flink编码,也会增加相应的成本,我们更多希望能够以sql的方式提供给业务开发完成自己的去重逻辑。本篇介绍如何使用sql方式完成去重。

​ 为了与离线分析保持一致的分析语义,Flink SQL 中提供了distinct去重方式,使用方式:

SELECT DISTINCT devId FROM pv

表示对设备ID进行去重,得到一个明细结果,那么我们在使用distinct来统计去重结果通常有两种方式, 仍然以统计每日网站uv为例。

2.1 第一种方式
SELECt datatime,count(DISTINCT devId) FROM pv group by datatime

​ 该语义表示计算网页每日的uv数量,其内部核心实现主要依靠DistinctAccumulator与CountAccumulator,DistinctAccumulator 内部包含一个map结构,key 表示的是distinct的字段,value表示重复的计数,CountAccumulator就是一个计数器的作用,这两部分都是作为动态生成聚合函数的中间结果accumulator,透过之前的聚合函数的分析可知中间结果是存储在状态里面的,也就是容错并且具有一致性语义的
其处理流程是:

    将devId 添加到对应的DistinctAccumulator对象中,首先会判断map中是否存在该devId, 不存在则插入map中并且将对应value记1,并且返回True;存在则将对应的value+1更新到map中,并且返回False

    只有当返回True时才会对CountAccumulator做累加1的操作,以此达到计数目的

2.2 第二种方式
select count(*),datatime from(select distinct devId,datatime from pv ) agroup by datatime

​ 内部是一个对devId,datatime 进行distinct的计算,在flink内部会转换为以devId,datatime进行分组的流并且进行聚合操作,在内部会动态生成一个聚合函数,该聚合函数createAccumulators方法生成的是一个Row(0) 的accumulator 对象,其accumulate方法是一个空实现,也就是该聚合函数每次聚合之后返回的结果都是Row(0),通过之前对sql中聚合函数的分析(可查看GroupAggProcessFunction函数源码), 如果聚合函数处理前后得到的值相同那么可能会不发送该条结果也可能发送一条撤回一条新增的结果,但是其最终的效果是不会影响下游计算的,在这里我们简单理解为在处理相同的devId,datatime不会向下游发送数据即可,也就是每一对devId,datatime只会向下游发送一次数据;

外部就是一个简单的按照时间维度的计数计算,由于内部每一组devId,datatime 只会发送一次数据到外部,那么外部对应datatime维度的每一个devId都是唯一的一次计数,得到的结果就是我们需要的去重计数结果。

2.3 两种方式对比
    这两种方式最终都能得到相同的结果,但是经过分析其在内部实现上差异还是比较大,第一种在分组上选择datatime ,内部使用的累加器DistinctAccumulator 每一个datatime都会与之对应一个对象,在该维度上所有的设备id, 都会存储在该累加器对象的map中,而第二种选择首先细化分组,使用datatime+devId分开存储,然后外部使用时间维度进行计数,简单归纳就是:
    第一种: datatime->Value{devI1,devId2…}
    第二种: datatime+devId->row(0)
    聚合函数中accumulator 是存储在ValueState中的,第二种方式的key会比第一种方式数量上多很多,但是其ValueState占用空间却小很多,而在实际中我们通常会选择Rocksdb方式作为状态后端,rocksdb中value大小是有上限的,第一种方式很容易到达上限,那么使用第二种方式会更加合适;这两种方式都是全量保存设备数据的,会消耗很大的存储空间,但是我们的计算通常是带有时间属性的,那么可以通过配置StreamQueryConfig设置状态ttl。
三 HyperLogLog去重 3.1 实现

​ HyperLogLog算法 也就是基数估计统计算法,预估一个集合中不同数据的个数,也就是我们常说的去重统计,在redis中也存在hyperloglog 类型的结构,能够使用12k的内存,允许误差在0.81%的情况下统计2^64个数据,在这种大数据量情况下能够减少存储空间的消耗,但是前提是允许存在一定的误差。关于HyperLogLog算法原理可以参考这篇文章:https://www.jianshu.com/p/55defda6dcd2里面做了详细的介绍,其算法实现在开源java流式计算库stream-lib提供了其具体实现代码,由于代码比较长就不贴出来(可以后台回复hll ,获取flink使用hll去重的完整代码)。

测试一下其使用效果,准备了97320不同数据:

public static void main(String[] args) throws Exception{

        String filePath = "000000_0";
        BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)));

        Set values =new HashSet<>();
        HyperLogLog logLog=new HyperLogLog(0.01); //允许误差

        String line = "";
        while ((line = br.readLine()) != null) {
            String[] s = line.split(",");
            String uuid = s[0];
            values.add(uuid);
            logLog.offer(uuid);
        }
       
        long rs=logLog.cardinality();
    }

当误差值为0.01 时; rs为98228,需要内存大小int[1366] //内部数据结构
当误差值为0.001时;rs为97304 ,需要内存大小int[174763]
误差越小也就越来越接近其真实数据,但是在这个过程中需要的内存也就越来越大,这个取舍可根据实际情况决定。

在开发中更多希望通过sql方式来完成,那么就将hll与udaf结合起来使用,实现代码如下:

public class HLLDistinctFunction extends AggregateFunction {

    @Override public HyperLogLog createAccumulator() {
        return new HyperLogLog(0.001);
    }

    public void accumulate(HyperLogLog hll,String id){
      hll.offer(id);
    }

    @Override public Long getValue(HyperLogLog accumulator) {
        return accumulator.cardinality();
    }
}

定义的返回类型是long 也就是去重的结果,accumulator是一个HyperLogLog类型的结构。

测试:

case class AdData(id:Int,devId:String,datatime:Long)object Distinct1 {  def main(args: Array[String]): Unit = {
    val env=StreamExecutionEnvironment.getExecutionEnvironment
    val tabEnv=StreamTableEnvironment.create(env)
    tabEnv.registerFunction("hllDistinct",new HLLDistinctFunction)
    val kafkaConfig=new Properties()
   kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092")
    kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test1")
    val consumer=new FlinkKafkaConsumer[String]("topic1",new SimpleStringSchema,kafkaConfig)
    consumer.setStartFromLatest()
    val ds=env.addSource(consumer)
      .map(x=>{
        val s=x.split(",")
        AdData(s(0).toInt,s(1),s(2).toLong)
      })
    tabEnv.registerDataStream("pv",ds)
    val rs=tabEnv.sqlQuery(      """ select hllDistinct(devId) ,datatime
                                          from pv group by datatime
      """.stripMargin)
    rs.writeToSink(new PaulRetractStreamTableSink)
    env.execute()
  }
}

准备测试数据

1,devId1,1577808000000
1,devId2,1577808000000
1,devId1,1577808000000

得到结果:

4> (true,1,1577808000000)
4> (false,1,1577808000000)
4> (true,2,1577808000000)

其基本使用介绍到这里,后续还将进一步优化。

3.2 HyperLogLog去重优化

​ 在HyperLogLog去重实现中,如果要求误差在0.001以内,那么就需要1048576个int, 也就是会消耗4M的存储空间,但是在实际使用中有很多的维度的统计是达不到这个数据量,那么可以在这里做一个优化,优化方式是:初始HyperLogLog内部使用存储是一个set集合,当set大小达到了指定大小(1048576)就转换为HyperLogLog存储方式。这种方式可以有效减小内存消耗。

实现代码:

public class OptimizationHyperLogLog {
    //hyperloglog结构
    private HyperLogLog hyperLogLog;
    //初始的一个set
    private Set set;
     
    private double rsd;
    
    //hyperloglog的桶个数,主要内存占用
    private int bucket;

    public OptimizationHyperLogLog(double rsd){
        this.rsd=rsd;
        this.bucket=1 << HyperLogLog.log2m(rsd);
        set=new HashSet<>();      
       }

   //插入一条数据
    public void offer(Object object){
        final int x = MurmurHash.hash(object);
        int currSize=set.size();
        if(hyperLogLog==null && currSize+1>bucket){ 
           //升级为hyperloglog
           hyperLogLog=new HyperLogLog(rsd);
           for(int d: set){
               hyperLogLog.offerHashed(d);
           }
           set.clear();
        }

        if(hyperLogLog!=null){
            hyperLogLog.offerHashed(x);
        }else {
            set.add(x);
        }
    }

    //获取大小
    public long cardinality() {
      if(hyperLogLog!=null) return hyperLogLog.cardinality();
      return set.size();
    }
}

初始化:入参同样是一个允许的误差范围值rsd,计算出hyperloglog需要桶的个数bucket,也就需要是int数组大小,并且初始化一个set集合hashset;

数据插入:使用与hyperloglog同样的方式将插入数据转hash, 判断当前集合的大小+1是否达到了bucket,不满足则直接添加到set中,满足则将set里面数据转移到hyperloglog对象中并且清空set, 后续数据将会被添加到hyperloglog中;

这种写法没有考虑并发情况,在实际使用情况中也不会存在并发问题。

四 bitmap精确去重

​ 在前面提到的精确去重方案都是会保存全量的数据,但是这种方式是以牺牲存储为代价的,而hyperloglog方式虽然减少了存储但是损失了精度,那么如何能够做到精确去重又能不消耗太多的存储呢,这篇主要讲解如何使用bitmap做精确去重。

ID-mapping

在使用bitmap去重需要将去重的id转换为一串数字,但是我们去重的通常是一串包含字符的字符串例如设备ID,那么第一步需要将字符串转换为数字,首先可能想到对字符串做hash,但是hash是会存在概率冲突的,那么可以使用美团开源的leaf分布式唯一自增ID算法,也可以使用Twitter开源的snowflake分布式唯一ID雪花算法,我们选择了实现相对较为方便的snowflake算法(从网上找的),代码如下:

public class SnowFlake {

    
    private final static long START_STMP = 1480166465631L;

    
    private final static long SEQUENCE_BIT = 12; //序列号占用的位数

    private final static long MACHINE_BIT = 5;   //机器标识占用的位数

    private final static long DATACENTER_BIT = 5;//数据中心占用的位数

    
    private final static long MAX_DATACENTER_NUM = -1L ^ (-1L << DATACENTER_BIT);

    private final static long MAX_MACHINE_NUM = -1L ^ (-1L << MACHINE_BIT);

    private final static long MAX_SEQUENCE = -1L ^ (-1L << SEQUENCE_BIT);

    
    private final static long MACHINE_LEFT = SEQUENCE_BIT;

    private final static long DATACENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT;

    private final static long TIMESTMP_LEFT = DATACENTER_LEFT + DATACENTER_BIT;

    private long datacenterId;  //数据中心

    private long machineId;     //机器标识

    private long sequence = 0L; //序列号

    private long lastStmp = -1L;//上一次时间戳

    public SnowFlake(long datacenterId, long machineId) {
        if (datacenterId > MAX_DATACENTER_NUM || datacenterId < 0) {
            throw new IllegalArgumentException("datacenterId can't be greater than MAX_DATACENTER_NUM or less than 0");
        }
        if (machineId > MAX_MACHINE_NUM || machineId < 0) {
            throw new IllegalArgumentException("machineId can't be greater than MAX_MACHINE_NUM or less than 0");
        }
        this.datacenterId = datacenterId;
        this.machineId = machineId;
    }

    
    public synchronized long nextId() {
        long currStmp = getNewstmp();
        if (currStmp < lastStmp) {
            throw new RuntimeException("Clock moved backwards.  Refusing to generate id");
        }

        if (currStmp == lastStmp) {
            //相同毫秒内,序列号自增
            sequence = (sequence + 1) & MAX_SEQUENCE;
            //同一毫秒的序列数已经达到最大
            if (sequence == 0L) {
                currStmp = getNextMill();
            }
        } else {
            //不同毫秒内,序列号置为0
            sequence = 0L;
        }

        lastStmp = currStmp;

        return (currStmp - START_STMP) << TIMESTMP_LEFT //时间戳部分
                | datacenterId << DATACENTER_LEFT       //数据中心部分
                | machineId << MACHINE_LEFT             //机器标识部分
                | sequence;                             //序列号部分
    }

    private long getNextMill() {
        long mill = getNewstmp();
        while (mill <= lastStmp) {
            mill = getNewstmp();
        }
        return mill;
    }

    private long getNewstmp() {
        return System.currentTimeMillis();
    }


}

snowflake算法的实现是与机器码以及时间有关的,为了保证其高可用做了两个机器码不同的对外提供的服务。那么整个转换流程如下图:

首先会从Hbase中查询是否有UID对应的ID,如果有则直接获取,如果没有则会调用ID-Mapping服务,然后将其对应关系存储到Hbase中,最后返回ID至下游处理。

UDF化

​ 为了方便提供业务方使用,同样需要将其封装成为UDF, 由于snowflake算法得到的是一个长整型,因此选择了Roaring64NavgabelMap作为存储对象,由于去重是按照维度来计算,所以使用UDAF,首先定义一个accumulator:

public class PreciseAccumulator{

    private Roaring64NavigableMap bitmap;

    public PreciseAccumulator(){
        bitmap=new Roaring64NavigableMap();
    }

    public void add(long id){
        bitmap.addLong(id);
    }

    public long getCardinality(){
        return bitmap.getLongCardinality();
    }
}

udaf实现

public class PreciseDistinct extends AggregateFunction {

    @Override public PreciseAccumulator createAccumulator() {
        return new PreciseAccumulator();
    }

    public void accumulate(PreciseAccumulator accumulator,long id){
        accumulator.add(id);
    }

    @Override public Long getValue(PreciseAccumulator accumulator) {
        return accumulator.getCardinality();
    }
}

那么在实际使用中只需要注册udaf即可。

参考: https://mp.weixin.qq.com/s/gnDvVPZh3JZArfmYii_trQ
https://mp.weixin.qq.com/s?__biz=MzI0NTIxNzE1Ng==&mid=2651218664&idx=1&sn=7babf9a45d44df2e7d7f36c3b9e98e60&chksm=f2a32003c5d4a9154a07be428d858df9c332457b44b779e8a0a2c061df3b222187ff95460e7a&mpshare=1&scene=1&srcid=0207smlUIuucD2KtY465Fvla&sharer_sharetime=1644202659963&sharer_shareid=8d60858916c3f82614fc6e0541627176#rd

https://mp.weixin.qq.com/s?__biz=MzU5MTc1NDUyOA==&mid=2247484077&idx=1&sn=8f1a30f5383c145f909904f2a9f04bd5&chksm=fe2b66e2c95ceff4a8e741805e866f8473838e6152242365509f17aa2c7361a8bca5691efb9d&mpshare=1&scene=1&srcid=0207XIFFuxOAwNGH11epSkhc&sharer_sharetime=1644202710856&sharer_shareid=8d60858916c3f82614fc6e0541627176#rd

https://mp.weixin.qq.com/s?__biz=MzI0NTIxNzE1Ng==&mid=2651218682&idx=1&sn=d88ecc45b89c14ade879c10d379a73a3&chksm=f2a32011c5d4a9076d735c6d4f9e6b3af0efbf1f87ee6f26924d16c83bbac9aa233ebac21130&mpshare=1&scene=1&srcid=0207JrzqcdaRJFQbxHCeIzg7&sharer_sharetime=1644202721656&sharer_shareid=8d60858916c3f82614fc6e0541627176#rd

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/730440.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号