原文链接:Flink 高阶编程:状态编程
感觉写的还是挺不错的,例子举的也比较浅显易懂。接下来会对于重点进行摘抄记录。
状态定义需要记住多个事件信息的操作就是有状态的,例如一段时间内水位平均值,最高值;一个操作仅需要当前独立事件就是无状态的,例如当水位超过20cm就报警。
应用场景- 去重检测:对比之前状态,判断是否有变化;聚合:时间窗口进行聚合,最大值/最小值/平均值更新机器学习模型
自己绘制的一张分类图,帮助记忆
| Managed State托管状态 | Raw State原生状态 | |
|---|---|---|
| 管理方式 | 托管,自动恢复伸缩 | 自行维护 |
| 数据结构 | Keyed State(键控状态) Operator State(算子状态) | 字节数组 |
| 使用 | 集成Rich函数类或其他接口类 | 自定义使用 |
| Operator State算子状态 | Keyed State键控状态 | |
|---|---|---|
| 使用条件 | 所有算子 | 只使用 KeyedStream 上的算子 |
| 状态分配 | 一个算子的子任务一个,即跟着并行度走 | 一个key一个,即跟着key走 |
| 创建方式 | 实现 CheckpointedFunction | 重写 RichFunction |
| 横向拓展 | 均匀分配或合并后每个得到全量 | 并发改变, State 随着 Key 在实例间迁移 |
| 数据类型 | ListState/BroadCastState | ValueState/ListState/MapState/ReduceState/ AggregatingState |
每个算子子任务或者说每个算子实例共享一个状态,流入这个算子子任务的数据可以访问和更新这个状态。Operator State 经常被用在 Source 或 Sink 等算子 上,用来保存流入数据的偏移量或对输出数据做缓存,以保证 Flink 应用的 Exactly-once 语义。
算子子任务之间的状态不可互相访问,即状态1不可以访问状态2
ListState//调用 implements MapFunctionBroadCastState, CheckpointedFunction listState.add(value); //cp @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { System.out.println("snapshotState..."); for (String ignored : listState.get()) { num++; } } //初始化 @Override public void initializeState(FunctionInitializationContext context) throws Exception { System.out.println("初始化..."); listState = context.getOperatorStateStore().getListState(new ListStateDescriptor ("num", String.class)); }
广播状态被引入以支持这样的用例:来自一个流的一些数据需要广播到所有下游任务,在那里它被本地存储,并用于处理另一个流上的所有传入元素。作为广播状态自然适合出现的一个例子,我们可以想象一个低吞吐量流,其中包含一组规则,我们希望根据来自另一个流的所有元素对这些规则进行评估。
// 广播流 MapStateDescriptorKeyed State键控状态stateDescriptor = new MapStateDescriptor<>("state", String.class, String.class); BroadcastStream broadcast = bDS.broadcast(stateDescriptor); ReadOnlyBroadcastState broadcastState = ctx.getBroadcastState(stateDescriptor); String state = broadcastState.get("state"); @Override public void processBroadcastElement(String value, BroadcastProcessFunction .Context ctx, Collector out) throws Exception { // 把值放到广播流中 BroadcastState broadcastState = ctx.getBroadcastState(stateDescriptor); broadcastState.put("state", value); }
Flink 为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个 key 对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的 key。因此,具有相同 key 的所有数据都会访问相同的状态。Keyed State 很类似于一个分布式的 key-value map 数据结构, 只能用于 KeyedStream(keyBy 算子处理之后)。
ValueState保存单个值,每有一个 key 就有一个状态值,T 为要保存值的泛型
new KeyedProcessFunctionListState() @Override public void open(Configuration parameters) throws Exception { // 必须在 open 生命周期初始化 valueState = getRuntimeContext().getState(new ValueStateDescriptor ("state", Double.class)); } Double lastValue = valueState.value();
保存元素列表。
new KeyedProcessFunctionMapState>() @Override public void open(Configuration parameters) throws Exception { listState = getRuntimeContext().getListState(new ListStateDescriptor ("", Double.class)); } listState.add(value.getVc()); listState.update(list);
存储键值对列表。
new KeyedProcessFunctionReducingState>() @Override public void open(Configuration parameters) throws Exception { mapState = getRuntimeContext().getMapState(new MapStateDescriptor ("state", Double.class, Integer.class)); } mapState.put(value.getVc(), 1); for (Double key : mapState.keys())
存储单个值, 表示把所有元素的聚合结果添加到状态中。与 ListState 类似, 但是当使用 add(T)的时候 ReducingState 会使用指定的 ReduceFunction 进行聚合。
@Override
public void open(Configuration parameters) throws Exception {
reducingState = getRuntimeContext().getReducingState(new ReducingStateDescriptor("state", Double::sum, Double.class));
}
//注意Double::sum
reducingState.add(value.getVc());
AggregatingState存储单个值。与 ReducingState 类似, 都是进行聚合。不同的是 AggregatingState 的聚合的结果和元素类型可以不一样。
@Override
public void open(Configuration parameters) throws Exception {
aggregatingState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor, Double>(
"state", new MyAggFunction(),Types.TUPLE(Types.INT, Types.DOUBLE)));
}
private static class MyAggFunction implements AggregateFunction, Double> {
@Override
public Tuple2 createAccumulator() {
return Tuple2.of(0, 0D);
}
@Override
public Tuple2 add(WaterSensor value, Tuple2 accumulator) {
return Tuple2.of(accumulator.f0 + 1, accumulator.f1 + value.getVc());
}
@Override
public Double getResult(Tuple2 accumulator) {
return accumulator.f1 / accumulator.f0;
}
@Override
public Tuple2 merge(Tuple2 a, Tuple2 b) {
return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
}
}



