栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink作业开发清单(4)——State/状态

Flink作业开发清单(4)——State/状态

建议阅读:

Working with State状态管理及容错机制

如果当前的计算依赖于前面数据产生的结果,那就需要依赖状态;比如Word Count,需要通过状态来保存前面数据的统计结果。

状态类型

 Flink Managed State分为两类,一是Keyed State,二是Operator State。

用户经常用到的是Keyed State。

Keyed State

几种 Keyed State 的差异具体体现在:

ValueState 存储单个值,比如 Wordcount,用 Word 当 Key,State 就是它的 Count。这里面的单个值可能是数值或者字符串,作为单个值,访问接口可能有两种,get 和 set。在 State 上体现的是 update(T) / T value()。MapState 的状态数据类型是 Map,在 State 上有 put、remove等。需要注意的是在 MapState 中的 key 和 Keyed state 中的 key 不是同一个。ListState 状态数据类型是 List,访问接口如 add、update 等。ReducingState 和 AggregatingState 与 ListState 都是同一个父类,但状态数据类型上是单个值,原因在于其中的 add 方法不是把当前的元素追加到列表中,而是把当前元素直接更新进了 Reducing 的结果中。AggregatingState 的区别是在访问接口,ReducingState 中 add(T)和 T get() 进去和出来的元素都是同一个类型,但在 AggregatingState 输入的 IN,输出的是 OUT。

示例代码

在FlatMap中使用State:

public class CountWindowAverage extends RichFlatMapFunction, Tuple2> {

    
    private transient ValueState> sum;

    @Override
    public void flatMap(Tuple2 input, Collector> out) throws Exception {

        // access the state value
        Tuple2 currentSum = sum.value();

        // update the count
        currentSum.f0 += 1;

        // add the second field of the input value
        currentSum.f1 += input.f1;

        // update the state
        sum.update(currentSum);

        // if the count reaches 2, emit the average and clear the state
        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint>() {}), // type information
                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }
}

// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
        .keyBy(0)
        .flatMap(new CountWindowAverage())
        .print();

// the printed output will be (1,4) and (1,5)

状态清理

如果状态不断累计的话,势必会造成内存和效率问题,所以状态的正确清理非常重要。可以在RichXXXFunction中通过Timer定期清理State,也可以使用Flink提供的TTL State。

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();

ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);

线上已经默认开启了“后台清理”,一般用户通过上述代码即可正常使用状态清理,如果想对“后台清理”有更细致的控制,请查看官方文档。

对于State Backend为filesystem的用户,建议添加如下配置:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupFullSnapshot()
    .build();

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/780570.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号