通俗的将Flink的状态即为存储算子中的子任务的中间值,相当于我们web的session.这里需要注意的是子任务是个线程,且这个线程在不断地处理数据,那flink的state就是存储线程中间变量的一个解决方案cuiyaonan2000@163.com
参考版本为: v1.13.2
参考地址:
- Working with State | Apache Flink
- 概览 | Apache Flink
随着我们自定的算子实现,我们不得不考虑一个问题.即数据的状态,流式数据处理其实也是批量的处理,那我们的计算就会有依赖,即依赖上一步的结果或者相关的处理结果才能继续.所以这里就必须要了解Flink的状态管理
流计算一般分为有状态和无状态两种(这里的解决方案用大家都懂的意思就是,使用一个全局的变量来存储每次的计算结果.这里的全局变量是跨机器,跨线程的,因为我们的算子是有并行度的概念的.cuiyaonan2000@163.com)
- 无状态计算指的是处理过程中不依赖于之前的数据处理结果或其他中间数据;
- 而有状态的计算会维护状态,并基于最新数据和当前状态生成输出结果。
Flink 保证 exactly-once 主要是通过他的 checkpoint 和 savepoint 机制.
Flink 状态Flink 应用程序的状态访问都在本地进行,因为这有助于其提高吞吐量和降低延迟。通常情况下 Flink 应用程序都是将状态存储在 JVM 堆上,但如果状态太大,我们也可以选择将其以结构化数据格式存储在高速磁盘中。(这里注意不同的算子,相同算子的并行子任务是无法共享状态的cuiyaonan2000@163.com)
如下为一个并行度为3的算子的状态示意图.
状态分类-
算子状态(Operator State): 算子状态的作用范围限定为算子任务
-
键控状态(keyed State):生产中应用案例较多,根据输入数据流中定义的key来维护和访问
keyed state 接口提供不同类型状态的访问接口,这些状态都作用于当前输入数据的 key 下。换句话说,这些状态仅可在 KeyedStream 上使用,在Java/Scala API上可以通过 stream.keyBy(...) 得到 KeyedStream
状态存储工具类所有类型的状态还有一个clear() 方法,清除当前 key 下的状态数据,也就是当前输入元素的 key。这里注意不同的算子,相同算子的并行子任务是无法共享状态的cuiyaonan2000@163.com
-
ValueState
: 保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)。 这个值可以通过 update(T) 进行更新,通过 T value() 进行检索。 -
ListState
: 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过 add(T) 或者 addAll(List ) 进行添加元素,通过 Iterable get() 获得整个列表。还可以通过 update(List ) 覆盖当前的列表。 -
ReducingState
: 保存一个单值,表示添加到状态的所有值的聚合。接口与 ListState 类似,但使用 add(T) 增加元素,会使用提供的 ReduceFunction 进行聚合。 -
AggregatingState
: 保留一个单值,表示添加到状态的所有值的聚合。和 ReducingState 相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与 ListState 类似,但使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合。 -
MapState
: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(Map ) 添加映射。 使用 get(UK) 检索特定 key。 使用 entries(),keys() 和 values() 分别检索映射、键和值的可迭代视图。你还可以通过 isEmpty() 来判断是否包含任何键值对。
创建状态必须使用StateDescriptor,根据不同的状态类型可以创建如下的
- ValueStateDescriptor,
- ListStateDescriptor,
- AggregatingStateDescriptor,
- ReducingStateDescriptor
- MapStateDescriptor。
状态通过 RuntimeContext 进行访问,因此只能在 rich functions 中使用。请参阅这里获取相关信息, 但是我们很快也会看到一个例子。RichFunction 中 RuntimeContext 提供如下方法:
- ValueState
getState(ValueStateDescriptor ) - ReducingState
getReducingState(ReducingStateDescriptor ) - ListState
getListState(ListStateDescriptor ) - AggregatingState
getAggregatingState(AggregatingStateDescriptor ) - MapState
getMapState(MapStateDescriptor )
示例:
@Override
public void open(Configuration config) {
MapStateDescriptor outputStateDescriptor
= new MapStateDescriptor(
// 状态的名字,说明同一个算子可以创建多个任务根据不同的名字
"outPutState",
//存储key的类型
String.class,
//存储value的类型
TravelStaticsOutputInfo.class);
//实际我们使用的是MapState
MapState firstStatusState = getRuntimeContext().getMapState(firstStatusStateDescriptor);
}
}
状态有效期 (TTL)
即设置状态什么时候过期,过期了怎么处理的设置.
在使用状态 TTL 前,需要先构建一个StateTtlConfig 配置对象。 然后把配置传递到 state descriptor 中启用 TTL 功能:
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
如上的代码详解:
TTL 配置有以下几个选项: newBuilder 的第一个参数表示数据的有效期,是必选项(过期时间设置)。
TTL 的更新策略(默认是 OnCreateAndWrite):----即刷过期时间从什么时候开始
- StateTtlConfig.UpdateType.OnCreateAndWrite - 仅在创建和写入时更新
- StateTtlConfig.UpdateType.OnReadAndWrite - 读取时也更新
数据在过期但还未被清理时的可见性配置如下(默认为 NeverReturnExpired):
- StateTtlConfig.StateVisibility.NeverReturnExpired - 不返回过期数据
- StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - 会返回过期但未清理的数据
NeverReturnExpired 情况下,过期数据就像不存在一样,不管是否被物理删除。这对于不能访问过期数据的场景下非常有用,比如敏感数据。 ReturnExpiredIfNotCleanedUp 在数据被物理删除前都会返回。
TTL清理策略默认情况下,过期数据会在读取的时候被删除,例如 ValueState#value,同时会有后台线程定期清理(如果 StateBackend 支持的话)。可以通过 StateTtlConfig 配置关闭后台清理():
import org.apache.flink.api.common.state.StateTtlConfig;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.disableCleanupInBackground()
.build();
关于清理策略和优化官网还有很多设置,具体参考官网.
Operator State这里其实需要注意有的算子本身就有状态,这种的更难以管理和使用.



