栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink状态编程学习摘要

Flink状态编程学习摘要

主要参考博客

原文链接:Flink 高阶编程:状态编程

感觉写的还是挺不错的,例子举的也比较浅显易懂。接下来会对于重点进行摘抄记录。

状态定义

需要记住多个事件信息的操作就是有状态的,例如一段时间内水位平均值,最高值;一个操作仅需要当前独立事件就是无状态的,例如当水位超过20cm就报警。

应用场景
    去重检测:对比之前状态,判断是否有变化;聚合:时间窗口进行聚合,最大值/最小值/平均值更新机器学习模型
状态分类

自己绘制的一张分类图,帮助记忆

 

状态分类
Managed State托管状态Raw State原生状态
管理方式托管,自动恢复伸缩自行维护
数据结构

Keyed State(键控状态)

Operator State(算子状态)

字节数组
使用集成Rich函数类或其他接口类自定义使用

托管状态分类
托管状态分类
Operator State算子状态Keyed State键控状态
使用条件所有算子只使用 KeyedStream 上的算子
状态分配一个算子的子任务一个,即跟着并行度走一个key一个,即跟着key走
创建方式实现 CheckpointedFunction重写 RichFunction
横向拓展均匀分配或合并后每个得到全量并发改变, State 随着 Key 在实例间迁移
数据类型ListState/BroadCastStateValueState/ListState/MapState/ReduceState/ AggregatingState

Operator State算子状态

每个算子子任务或者说每个算子实例共享一个状态,流入这个算子子任务的数据可以访问和更新这个状态。Operator State 经常被用在 Source 或 Sink 等算子 上,用来保存流入数据的偏移量或对输出数据做缓存,以保证 Flink 应用的 Exactly-once 语义。

算子子任务之间的状态不可互相访问,即状态1不可以访问状态2

ListState
//调用
implements MapFunction, CheckpointedFunction
listState.add(value);

//cp
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
    System.out.println("snapshotState...");
    for (String ignored : listState.get()) {
    num++;
    }
}

//初始化
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
    System.out.println("初始化...");
    listState = context.getOperatorStateStore().getListState(new ListStateDescriptor("num", String.class));
}
BroadCastState

广播状态被引入以支持这样的用例:来自一个流的一些数据需要广播到所有下游任务,在那里它被本地存储,并用于处理另一个流上的所有传入元素。作为广播状态自然适合出现的一个例子,我们可以想象一个低吞吐量流,其中包含一组规则,我们希望根据来自另一个流的所有元素对这些规则进行评估。

// 广播流
MapStateDescriptor stateDescriptor = new MapStateDescriptor<>("state", String.class, String.class);
BroadcastStream broadcast = bDS.broadcast(stateDescriptor);

ReadOnlyBroadcastState broadcastState = ctx.getBroadcastState(stateDescriptor);
String state = broadcastState.get("state");

@Override
public void processBroadcastElement(String value, BroadcastProcessFunction.Context ctx, Collector out) throws Exception {
    // 把值放到广播流中
    BroadcastState broadcastState = ctx.getBroadcastState(stateDescriptor);
    broadcastState.put("state", value);
}
Keyed State键控状态

Flink 为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个 key 对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的 key。因此,具有相同 key 的所有数据都会访问相同的状态。Keyed State 很类似于一个分布式的 key-value map 数据结构, 只能用于 KeyedStream(keyBy 算子处理之后)。

ValueState

保存单个值,每有一个 key 就有一个状态值,T 为要保存值的泛型

new KeyedProcessFunction()

@Override
public void open(Configuration parameters) throws Exception {
    // 必须在 open 生命周期初始化
    valueState = getRuntimeContext().getState(new ValueStateDescriptor("state", Double.class));
}

Double lastValue = valueState.value();
ListState

保存元素列表。

new KeyedProcessFunction>()


@Override
public void open(Configuration parameters) throws Exception {
    listState = getRuntimeContext().getListState(new ListStateDescriptor("", Double.class));
}

listState.add(value.getVc());
listState.update(list);
MapState

存储键值对列表。

new KeyedProcessFunction>()


@Override
public void open(Configuration parameters) throws Exception {
    mapState = getRuntimeContext().getMapState(new MapStateDescriptor("state", Double.class, Integer.class));
}


mapState.put(value.getVc(), 1);
for (Double key : mapState.keys())
ReducingState

存储单个值, 表示把所有元素的聚合结果添加到状态中。与 ListState 类似, 但是当使用 add(T)的时候 ReducingState 会使用指定的 ReduceFunction 进行聚合。

@Override
public void open(Configuration parameters) throws Exception {
    reducingState = getRuntimeContext().getReducingState(new ReducingStateDescriptor("state", Double::sum, Double.class));
}
//注意Double::sum


reducingState.add(value.getVc());
AggregatingState

存储单个值。与 ReducingState 类似, 都是进行聚合。不同的是 AggregatingState 的聚合的结果和元素类型可以不一样。

@Override
public void open(Configuration parameters) throws Exception {
    aggregatingState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor, Double>(
"state", new MyAggFunction(),Types.TUPLE(Types.INT, Types.DOUBLE)));
}


    private static class MyAggFunction implements AggregateFunction, Double> {

        @Override
        public Tuple2 createAccumulator() {
            return Tuple2.of(0, 0D);
        }

        @Override
        public Tuple2 add(WaterSensor value, Tuple2 accumulator) {
            return Tuple2.of(accumulator.f0 + 1, accumulator.f1 + value.getVc());
        }

        @Override
        public Double getResult(Tuple2 accumulator) {
            return accumulator.f1 / accumulator.f0;
        }

        @Override
        public Tuple2 merge(Tuple2 a, Tuple2 b) {
            return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
        }
    }

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/746063.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号