- 计算的中间结果
- 状态恢复 节点挂了重启 之前的状态要能恢复
- 调整并行度后 状态的并行度也要能一并调整
- 2.1 什么是算子
-
实际上是一个本地变量。算子状态的作用范围限定为算子任务。这意味着由同一并行任务所处理的所有数据都可以访问到相同的状态,状态对于同一任务而言是共享的
- 2.2 算子状态的数据类型
- 列表状态:状态为一组值
- 联合列表状态:
- 广播状态:每个算子的状态都相同
- 2.3 算子状态的数据类型
public class StateTest1_OperatorState {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// socket文本流
DataStream inputStream = env.socketTextStream("localhost", 7777);
// 转换成SensorReading类型
DataStream dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// 定义一个有状态的map操作,统计当前分区数据个数
SingleOutputStreamOperator resultStream = dataStream.map(new MyCountMapper());
resultStream.print();
env.execute();
}
// 自定义MapFunction
public static class MyCountMapper implements MapFunction, ListCheckpointed{
// 定义一个本地变量,作为算子状态
private Integer count = 0;
@Override
public Integer map(SensorReading value) throws Exception {
count++;
return count;
}
@Override
public List snapshotState(long checkpointId, long timestamp) throws Exception {
return Collections.singletonList(count);
}
@Override
public void restoreState(List state) throws Exception {
for( Integer num: state )
count += num;
}
}
}
3.键控状态
- 2.1 什么是键控状态
- 键控状态是根据输入数据流中定义的键(key)来维护和访问的。Flink 为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个 key 对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的 key。因此,具有相同 key 的所有数据都会访问相同的状态。Keyed State 很类似于一个分布式的 key-value map 数据结构,只能用于 KeyedStream(keyBy 算子处理之后)
- 2.2 键控状态的数据类型
-
ValueState保存单个的值,值的类型为 T
get 操作: ValueState.value()
set 操作: ValueState.update(T value)
-
ListState保存一个列表,列表里的元素的数据类型为 T。
ListState.add(T value)
ListState.addAll(List values)
ListState.get()返回 Iterable
ListState.update(List values) -
MapState
保存 Key-Value 对 - MapState.get(UK key)
- MapState.put(UK key, UV value)
- MapState.contains(UK key)
- MapState.remove(UK key)
State.clear()是清空操作。
public class StateTest2_KeyedState {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// socket文本流
DataStream inputStream = env.socketTextStream("localhost", 7777);
// 转换成SensorReading类型
DataStream dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// 定义一个有状态的map操作,统计当前sensor数据个数
SingleOutputStreamOperator resultStream = dataStream
.keyBy("id")
// 键控状态
.map( new MyKeyCountMapper() );
resultStream.print();
env.execute();
}
// 自定义RichMapFunction
public static class MyKeyCountMapper extends RichMapFunction{
private ValueState keyCountState;
// 其它类型状态的声明
private ListState myListState;
private MapState myMapState;
private ReducingState myReducingState;
@Override
public void open(Configuration parameters) throws Exception {
// 状态名字 状态类型
keyCountState = getRuntimeContext().getState(new ValueStateDescriptor("key-count", Integer.class, 0));
myListState = getRuntimeContext().getListState(new ListStateDescriptor("my-list", String.class));
myMapState = getRuntimeContext().getMapState(new MapStateDescriptor("my-map", String.class, Double.class));
// myReducingState = getRuntimeContext().getReducingState(new ReducingStateDescriptor())
}
@Override
public Integer map(SensorReading value) throws Exception {
// 其它状态API调用
// list state
for(String str: myListState.get()){
System.out.println(str);
}
myListState.add("hello");
// map state
myMapState.get("1");
myMapState.put("2", 12.3);
myMapState.remove("2");
// reducing state
// myReducingState.add(value);
myMapState.clear();
Integer count = keyCountState.value();
count++;
keyCountState.update(count);
return count;
}
}
}
4.状态后端


