栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Flink学习笔记(十二):Flink State生命周期 - Time-To-Live (TTL)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Flink学习笔记(十二):Flink State生命周期 - Time-To-Live (TTL)

在前面的 Flink学习笔记(十一):flink KeyedState运用介绍了如何使用state进行sum操作。但是数据流通常是长时间运行,那么存在的状态将越来越多,如何解决这个问题呢?

1、Flink State Time-To-Live (TTL)

Flink提供了StateTtlConfig机制进行处理。首先我们看下提供的策略类型:

  • TTL 刷新策略(默认OnCreateAndWrite)
策略类型描述
StateTtlConfig.UpdateType.Disabled禁用TTL,永不过期
StateTtlConfig.UpdateType.OnCreateAndWrite每次写操作都会更新State的最后访问时间
StateTtlConfig.UpdateType.OnReadAndWrite每次读写操作都会跟新State的最后访问时间
  • 状态可见性(默认NeverReturnExpired)
策略类型描述
StateTtlConfig.StateVisibility.NeverReturnExpired永不返回过期状态
StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp可以返回过期但尚未被清理的状态值

具体可以参考flink的官方文档

里面有更具体的介绍,包括state类型,清理策略和相关例子

2、实例

还是上面文章中的一个例子

我们可以看到在keybystream中配置了StateTtlConfig,配置方式如下,当一个状态超过两秒后重新计算状态

StateTtlConfig ttlConfig = StateTtlConfig
         newBuilder(Time.seconds(2))
         .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
         .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
         .build();
         
ValueStateDescriptor stateDescriptor = new ValueStateDescriptor("key-fruit", Types.TUPLE(Types.STRING, Types.INT));
stateDescriptor.enableTimeToLive(ttlConfig);

当然清除状态可以使用cleanupIncrementally,如

StateTtlConfig ttlConfig = StateTtlConfig
         newBuilder(Time.seconds(2))
         .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
         .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
         .cleanupIncrementally(10, true)
         .build();

我们看下完整代码

public class TestStateTtlConfig {
    private static final String[] FRUIT = {"苹果", "梨", "西瓜", "葡萄", "火龙果", "橘子", "桃子", "香蕉"};

    public static void main(String args[]) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream> fruit = env.addSource(new SourceFunction>() {
            private volatile boolean isRunning = true;
            private final Random random = new Random();

            @Override
            public void run(SourceContext> ctx) throws Exception {
                while (isRunning) {
                    TimeUnit.SECONDS.sleep(1);
                    ctx.collect(Tuple2.of(FRUIT[random.nextInt(FRUIT.length)], 1));
                }
            }

            @Override
            public void cancel() {
                isRunning = false;
            }
        });

        fruit.keyBy(0).map(new RichMapFunction, Tuple2>() {
            private ValueState> valueState;

            @Override
            public void open(Configuration parameters) throws Exception {
                StateTtlConfig ttlConfig = StateTtlConfig
                        .newBuilder(Time.seconds(2))
                        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                        .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                        .cleanupIncrementally(10, true)
                        .build();

                ValueStateDescriptor stateDescriptor = new ValueStateDescriptor("key-fruit", Types.TUPLE(Types.STRING, Types.INT));
                stateDescriptor.enableTimeToLive(ttlConfig);
                valueState = getRuntimeContext().getState(stateDescriptor);
            }

            @Override
            public Tuple2 map(Tuple2 tuple2) throws Exception {
                Tuple2 currentState = valueState.value();
                // 初始化 ValueState 值
                if (null == currentState) {
                    currentState = new Tuple2<>(tuple2.f0, 0);
                }

                Tuple2 newState = new Tuple2<>(currentState.f0, currentState.f1 + tuple2.f1);
                // 更新 ValueState 值
                valueState.update(newState);

                return Tuple2.of(newState.f0, newState.f1);
            }
        }).print();

        env.execute("fruit");
    }

执行结果

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/684450.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号