建议阅读:
Working with State状态管理及容错机制
如果当前的计算依赖于前面数据产生的结果,那就需要依赖状态;比如Word Count,需要通过状态来保存前面数据的统计结果。
状态类型
Flink Managed State分为两类,一是Keyed State,二是Operator State。
用户经常用到的是Keyed State。
Keyed State
几种 Keyed State 的差异具体体现在:
ValueState 存储单个值,比如 Wordcount,用 Word 当 Key,State 就是它的 Count。这里面的单个值可能是数值或者字符串,作为单个值,访问接口可能有两种,get 和 set。在 State 上体现的是 update(T) / T value()。MapState 的状态数据类型是 Map,在 State 上有 put、remove等。需要注意的是在 MapState 中的 key 和 Keyed state 中的 key 不是同一个。ListState 状态数据类型是 List,访问接口如 add、update 等。ReducingState 和 AggregatingState 与 ListState 都是同一个父类,但状态数据类型上是单个值,原因在于其中的 add 方法不是把当前的元素追加到列表中,而是把当前元素直接更新进了 Reducing 的结果中。AggregatingState 的区别是在访问接口,ReducingState 中 add(T)和 T get() 进去和出来的元素都是同一个类型,但在 AggregatingState 输入的 IN,输出的是 OUT。
示例代码
几种 Keyed State 的差异具体体现在:
ValueState 存储单个值,比如 Wordcount,用 Word 当 Key,State 就是它的 Count。这里面的单个值可能是数值或者字符串,作为单个值,访问接口可能有两种,get 和 set。在 State 上体现的是 update(T) / T value()。MapState 的状态数据类型是 Map,在 State 上有 put、remove等。需要注意的是在 MapState 中的 key 和 Keyed state 中的 key 不是同一个。ListState 状态数据类型是 List,访问接口如 add、update 等。ReducingState 和 AggregatingState 与 ListState 都是同一个父类,但状态数据类型上是单个值,原因在于其中的 add 方法不是把当前的元素追加到列表中,而是把当前元素直接更新进了 Reducing 的结果中。AggregatingState 的区别是在访问接口,ReducingState 中 add(T)和 T get() 进去和出来的元素都是同一个类型,但在 AggregatingState 输入的 IN,输出的是 OUT。
在FlatMap中使用State:
public class CountWindowAverage extends RichFlatMapFunction, Tuple2 > { private transient ValueState > sum; @Override public void flatMap(Tuple2 input, Collector > out) throws Exception { // access the state value Tuple2 currentSum = sum.value(); // update the count currentSum.f0 += 1; // add the second field of the input value currentSum.f1 += input.f1; // update the state sum.update(currentSum); // if the count reaches 2, emit the average and clear the state if (currentSum.f0 >= 2) { out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0)); sum.clear(); } } @Override public void open(Configuration config) { ValueStateDescriptor > descriptor = new ValueStateDescriptor<>( "average", // the state name TypeInformation.of(new TypeHint >() {}), // type information Tuple2.of(0L, 0L)); // default value of the state, if nothing was set sum = getRuntimeContext().getState(descriptor); } } // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env) env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L)) .keyBy(0) .flatMap(new CountWindowAverage()) .print(); // the printed output will be (1,4) and (1,5)
状态清理
如果状态不断累计的话,势必会造成内存和效率问题,所以状态的正确清理非常重要。可以在RichXXXFunction中通过Timer定期清理State,也可以使用Flink提供的TTL State。
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
线上已经默认开启了“后台清理”,一般用户通过上述代码即可正常使用状态清理,如果想对“后台清理”有更细致的控制,请查看官方文档。
对于State Backend为filesystem的用户,建议添加如下配置:
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupFullSnapshot()
.build();


