Flink有两种基本的状态:算子状态(Operator State)和键控状态(Keyed State),他们的主要区别就是作用范围不一样,算子状态的作用范围就是限定为算子任务(也就是当前一个分区执行的时候,所有数据来了都能访问到状态)。键控状态中并不是当前分区所有的数据都能访问所有的状态,而是按照keyby之后的key做划分,当前key只能访问自己的状态。
1.1 Operator State算子状态顾名思义,状态是和算子进行绑定的,一个算子的状态不能被其他算子所访问到。状态由同一并行任务所处理的所有数据都可以访问到相同的状态,状态对于同一并行任务而言是共享的。算子状态不能由相同或不同算子的另一个任务访问。
Kafka Connector是一个很好的operator state 使用示例。
Kafka consumer 的每个并行实例都维护一个topic partition和offset的映射作为其operator state 。
当更改并行时,operator state 接口支持在并行的实例重新分配state。有多种执行此重新分配的方案。
算子状态三种基本数据结构:
BroadcastState 用于广播的算子状态。
BroadcastState是 Operator State的一种特殊类型。引入它是为了支持需要将一个流的记录广播到所有下游任务的用例,这些记录用于在所有子任务之间保持相同的状态。
ListState 将状态表示为一组数据的列表。
UnionListState 存储列表类型的状态,与 ListState 的区别在于:如果并行度发生变化,ListState 会将该算子的所有并发的状态实例进行汇总,然后均分给新的 Task;而 UnionListState 只是将所有并发的状态实例汇总起来,具体的划分行为则由用户进行定义。
1.2 键控状态的类型Flink的Keyed State支持的数据类型如下:
特点和注意:
1. 每个状态都有clear()是清空操作。 2.在使用键控状态的时候,流都要有被 group by 的字段。 3. 设置监控状态,在进行状态编程时需要通过RuntimeContext注册StateDescriptor。StateDescriptor以状态state的名字和存储的数据型为参数。 4.ttl 策略根据自己的业务需求来设置。 5. 状态存储要做到定时或者一定规则的处理清空,大数据量的话会导致oom.2. flink 的 ListState 使用
import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Collector; import java.math.BigDecimal; import java.util.Collections; import java.util.List; @Slf4j public class SnapshotProcessFunction extends RichFlatMapFunction{ //之前的操作记录 private transient ListState listState; @Override public void open(Configuration parameters) { StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.days(1)) // 仅在创建和写入时更新 .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); ListStateDescriptor descriptor = new ListStateDescriptor ("listDescriptor", Snapshot.class); descriptor.enableTimeToLive(ttlConfig); listState = getRuntimeContext().getListState(descriptor); } @Override public void flatMap(Snapshot snapshot, Collector out) throws Exception { //拿取状态 Iterable snapshots = listState.get(); //如果是空,则初始化 if (snapshots == null) { listState.addAll(Collections.emptyList()); } else { //不为空,添加 listState.add(snapshot); } List allEles = Lists.newArrayList(listState.get()); Snapshot prevSnapshot = null; if (allEles.size() >= 2) { prevSnapshot = allEles.get(allEles.size() - 2); } if (prevSnapshot != null) { snapshot.setLowPx(prevSnapshot.getLowPx()); snapshot.setHighPx(prevSnapshot.getHighPx()); snapshot.setIncrementVolume(snapshot.getVolume() - prevSnapshot.getVolume()); snapshot.setIncrementAmount(snapshot.getAmount().subtract(prevSnapshot.getAmount())); // 昨收价取上个周期的收盘价 snapshot.setPreClosepx(prevSnapshot.getLastPx()); } else { snapshot.setIncrementVolume(snapshot.getVolume()); snapshot.setIncrementAmount(snapshot.getAmount()); } out.collect(snapshot); // 这也算是一个状态清楚的处理逻辑吧,根据业务需要来处理。 // 先全部清除 listState.clear(); // 在把上一个周期的对象塞进集合 listState.add(allEles.get(allEles.size() - 1)); } }
参考:
https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/datastream/fault-tolerance/state/



