栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

flink设置状态超时时间

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

flink设置状态超时时间

flink中状态如果不清理就会越来越大,实际上很多状态是可以清理的,比如说我们在计算日活时,使用日期作为key划分流,为了过滤掉重复的用户,在每个key内都维护了一个MapState。而我们实际上只关注当前日期的日活(因为之前的日活我们已经知道了),所有可以将之前日期的状态都清理。手动清理很麻烦,我们可以为状态设置超时时间,当超过这个时间之后,flink会自动清除这些数据:

    
    static class RihuoProcessFunction extends ProcessFunction> {
        private MapState hasLogin;
        private ValueState count;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            // 设置超时时间,超过24小时后,就会被清除
            StateTtlConfig ttlConfig = StateTtlConfig
                    .newBuilder(Time.hours(24))
                    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                    .build();
            MapStateDescriptor mapStateDescriptor=new MapStateDescriptor("has login?",String.class,Boolean.class);
            mapStateDescriptor.enableTimeToLive(ttlConfig);
            hasLogin = getRuntimeContext().getMapState(mapStateDescriptor);

            ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor<>("uv", Integer.class);
            count=getRuntimeContext().getState(valueStateDescriptor);
        }

        @Override
        public void processElement(LoginLog loginLog, ProcessFunction>.Context context, Collector> collector) throws Exception {
            String uid = loginLog.getCommon().getUid();
            // 如果还没登陆过
            if (!hasLogin.contains(uid)){
                hasLogin.put(uid,true);
                if (count.value()==null){
                    count.update(1);
                } else {
                    count.update(count.value()+1);
                }

                Date date = new Date(Long.parseLong(loginLog.getTs()));
                // 这个month是从0到11的,我也懒得管它了
                String logDate = date.getYear()+1900+"-"+date.getMonth()+"-"+date.getDate();
                collector.collect(Tuple2.apply(logDate,count.value()));
            }
        }
    }

这里设置的超时时间是系统时间,就是在插入数据时设置一个计时器,计时器到时间之后就会被清除(毕竟咱在这里也没有指定实践时间)。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/867599.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号