栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink基于State做千万用户的pv

Flink基于State做千万用户的pv

需求:记录每天某一页面下所有用户的访问次数和第一次访问的时间
解法:

  1. redis做缓存,每天一个map,设置ttl,用户访问次数做累积,过滤完先存到redis,sink的时候读redis,查出这个用户的总访问次数
  2. 用flink的keyby(user_id+date),生成count和min,使用checkpoint进行容错

    对于上边两种做法各有缺陷,第一种需要借助外部存储,任务出问题的时候重启无法保证累计不重复更新;第二种需要会占用大量的内存,无法清理过期的user_id+date,没几天就oom。
    此时我想到第一种解决方案,用keyby(user_id).process来解决,对每个user_id生成一个管道,用ValueState进行管理,管理两个值(count, min_time),然后判断日志的时间来确定是那一天,然后用state的ttl来解决占有内存的问题,但是后来使用ttl的时候就把我劝退了,state的ttl是惰性删除,过期了不访问就不会删除,那我还要他有个屁用。
    于是想到第二种方案,用keyby(user_id%1000).process来解决,保证用户会分到同一个桶里,维护一个MapState[date_time, Map(user, (count, min_time))],定期删除过期的date_time,一是就解决了ttl的问题,二是保证每次都能删除昨天的分区,三是分多个桶也会保证数据不倾斜。另外还有个优点,从checkponit中恢复的时候不会有重复累计count,保证数据的准确性,用的是FsStateBackend。

核心代码如下

env.setStateBackend(new FsStateBackend(s"hdfs:///flink/checkpoint/xxx/$jobName"));

map((user_id%1000, user_id, timestamp))
.keyby(_._1)
.process(new MonitorKeyedProcessFunction)


class MonitorKeyedProcessFunction() extends KeyedProcessFunction[Long, (Long, String, String, String), JSONObject] {

    private var state: MapState[String, java.util.HashMap[String, (Int, String)]] = _

    override def open(parameters: Configuration): Unit = {
      // 创建 ValueStateDescriptor
      val descriptor = new MapStateDescriptor[String, java.util.HashMap[String, (Int, String)]]("myState", classOf[String], classOf[java.util.HashMap[String, (Int, String)]])

      // 基于 ValueStateDescriptor 创建 ValueState
      state = getRuntimeContext.getMapState(descriptor)

    }

    override def processElement(value: (Long, String, String, String),
                                context: KeyedProcessFunction[Long, (Long, String, String, String), JSONObject]#Context,
                                out: Collector[JSONObject]): Unit = {

      val user_id = value._2
      val time = value._3
      val date = DateTimeUtil.tranTimeToString(time, "yyyy-MM-dd")
      var current = state.get(date)
      // 总数
      var count = 0
      // 第一次访问时间
      var first_modified = time

      // 初始化
      if (current == null) {
        current = new java.util.HashMap[String, (Int, String)]
      }
      if (current.keys.contains(user_id)) {
        val info = current(user_id)
        count = info._1
        first_modified = info._2
      }
      // 最小时间
      if (time <  first_modified)
        first_modified = time
      // 累加
      count += 1
      // 提交
      current.put(user_id, (count, first_modified))
      // 更新
      state.put(date, current)
      
      // 删除过期日期数据 1点左右
      val yesterday = System.currentTimeMillis()-(86400+3600)*1000
      val yesterdayStr = DateTimeUtil.tranTimeToString(yesterday, "yyyy-MM-dd")
      if(state.get(yesterdayStr)!=null){
        if(state.get(yesterdayStr).size()!=0){
          state.remove(yesterdayStr)
          println(s"删除此分区${yesterdayStr}的状态数据")
        }
      }
      // 输出
      val res = new JSONObject()
      res.put("user_id", user_id)
      res.put("count", count)
      res.put("min_time", first_modified)
      out.collect(res)
    }
  }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/423511.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号