flink中状态如果不清理就会越来越大,实际上很多状态是可以清理的,比如说我们在计算日活时,使用日期作为key划分流,为了过滤掉重复的用户,在每个key内都维护了一个MapState。而我们实际上只关注当前日期的日活(因为之前的日活我们已经知道了),所有可以将之前日期的状态都清理。手动清理很麻烦,我们可以为状态设置超时时间,当超过这个时间之后,flink会自动清除这些数据:
static class RihuoProcessFunction extends ProcessFunction> {
private MapState hasLogin;
private ValueState count;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 设置超时时间,超过24小时后,就会被清除
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.hours(24))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
MapStateDescriptor mapStateDescriptor=new MapStateDescriptor("has login?",String.class,Boolean.class);
mapStateDescriptor.enableTimeToLive(ttlConfig);
hasLogin = getRuntimeContext().getMapState(mapStateDescriptor);
ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor<>("uv", Integer.class);
count=getRuntimeContext().getState(valueStateDescriptor);
}
@Override
public void processElement(LoginLog loginLog, ProcessFunction>.Context context, Collector> collector) throws Exception {
String uid = loginLog.getCommon().getUid();
// 如果还没登陆过
if (!hasLogin.contains(uid)){
hasLogin.put(uid,true);
if (count.value()==null){
count.update(1);
} else {
count.update(count.value()+1);
}
Date date = new Date(Long.parseLong(loginLog.getTs()));
// 这个month是从0到11的,我也懒得管它了
String logDate = date.getYear()+1900+"-"+date.getMonth()+"-"+date.getDate();
collector.collect(Tuple2.apply(logDate,count.value()));
}
}
}
这里设置的超时时间是系统时间,就是在插入数据时设置一个计时器,计时器到时间之后就会被清除(毕竟咱在这里也没有指定实践时间)。



