栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink state使用

Flink state使用

简介

Flink相比其他流计算引擎,最大的优势就是号称是有状态的流计算。可见state在Flink中极其重要的位置。数据流是由一个个单独的事件按时间序列组合成的,虽然数据流中的许多操作一次只查看一个单独的事件(例如事件解析器,即不关注状态,不需要过往信息),但有些操作会跨多个事件记住信息(例如窗口操作符)。这些操作称为有状态操作。

下面是一些有状态的操作的使用场景:

1)对一个时间窗口内的数据进行聚合分析
2)在线机器学习场景下,需要根据新流入数据不断更新机器学习的模型参数
3)数据流中的数据有重复,我们想对重复数据去重,需要记录哪些数据已经流入过应用,当新数据流入时,根据已流入过的数据来判断去重等

除了这些有用的应用场景外,state也是Flink使用checkpoints和savepoints实现容错的关键。

现在Flink正在慢慢实现让用户在运行时从Flink外部访问state,当然在发展中,可能api等都会改变,目前不是很稳定,不过未来应该是个不错的功能。

state的分类

state主要分为两类:Keyed State和Operator State

Keyed State

Keyed State只能用在KeyedStream上,所以要先形成KeyedStream(使用stream.keyBy(…))。

Flink的数据模型不是基于键值对的。因此,不需要将数据集类型物理地打包到键和值中。键是“虚拟的”:它们被定义为在实际数据上的函数,以指导分组操作符。

对于Keyed State,Flink提供了几种现成的数据结构供我们使用:ValueState、ListState、ReducingState、AggregatingState、MapState。要注意理解,上面的5种state类型都是表示stream keyBy 的 key的value的state类型。为了与 keyBy 的 key 进行区分,所以 Flink 中把 MapState 的 key、value 分别叫 UserKey、UserValue。

ValueState:存储单一的值,即每个key只有一个值
ListState:存储一个list,即每个key有一个list值
MapState:存储一个map,即每个key有一个map值
ReducingState和AggregatingState与ListState同属于MergingState。与ListState不同的是,ReducingState只有一个元素,而不是一个列表。它的原理是新元素通过add(T)加入后,与已有的状态元素使用ReduceFunction合并为一个元素,并更新到状态里。AggregatingState与ReducingState类似,也只有一个元素,只不过AggregatingState的输入和输出类型可以不一样。ReducingState和AggregatingState与窗口上进行ReduceFunction和AggregateFunction很像,都是将新元素与已有元素做聚合。

因为本身支持这么多类型的,所以不要用ValueState去存list或者map这种数据类型,直接使用ListState和MapState效率会高很多。

State是通过RuntimeContext类获取的,所以使用State的地方就是rich functions,即实现RichFunction或其子接口,就可以获取State。在里面我们就可以通过StateTtlConfig设置State的TTL等。比如:

public class TTLCountMapFunction extends RichMapFunction, Tuple2> {
    private transient ValueState state;  
    ...
    public void open(Configuration parameters) throws Exception {
        StateTtlConfig ttlConfig = StateTtlConfig
                .newBuilder(Time.seconds(600))
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .build();

        ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<>("count-state", Long.class);
        stateDescriptor.enableTimeToLive(ttlConfig);
        state = getRuntimeContext().getState(stateDescriptor);
    }

StateTtlConfig相关的设置可以查看官网链接。

Flink 中 State 支持设置 TTL,TTL 只是将时间戳与 userValue 封装起来。
· MapState 的 TTL 是基于 UK 级别的
· ValueState 的 TTL 是基于整个 key 的

Operator State(non-keyed state)

Operator State是绑定到一个并行运算符实例(one parallel operator instance)的状态(即记录每个Task对应的状态值数据类型)。kafka connecttor是Flink中运算符状态使用的一个很好的示例。Kafka consumer的每个并行实例都维护一个主题分区和偏移的映射,作为其操作符状态。

在典型的有状态 Flink 应用程序中,你不需要Operator State。 它主要是一种特殊类型的状态,用于实现source/sink或你没有可以对状态进行分区的键的场景。

为了使用Operator State就得要实现CheckpointedFunction。请移步CheckpointedFunction说明。

Broadcast State是一种特殊的Operator State,有着特殊的应用场景,后续会说明如何使用,这里不再讲解。

state的存储

state的存储就是State Backends,在Flink1.13版本以前,老的分类是:MemoryStateBackend、FsStateBackend和RocksDBStateBackend。而在1.13版本以后分类就是:HashMapStateBackend和EmbeddedRocksDBStateBackend,再加上对应的storage。

下面列举新老对应关系:

MemoryStateBackend 相当于使用 HashMapStateBackend 和 JobManagerCheckpointStorage组合。

#flink-conf.yaml配置

state.backend: hashmap

# Optional, Flink will automatically default to JobManagerCheckpointStorage
# when no checkpoint directory is specified.
state.checkpoint-storage: jobmanager

//java代码设置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(new JobManagerStateBackend()); 

 FsStateBackend 相当于使用 HashMapStateBackend 和 FileSystemCheckpointStorage。

#flink-conf.yaml配置

state.backend: hashmap
state.checkpoints.dir: file:///checkpoint-dir/

# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem

//java代码设置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");


// Advanced FsStateBackend configurations, such as write buffer size
// can be set by manually instantiating a FileSystemCheckpointStorage object.
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));

RocksDBStateBackend 相当于使用 EmbeddedRocksDBStateBackend 和 FileSystemCheckpointStorage。

#flink-conf.yaml配置

state.backend: rocksdb
state.checkpoints.dir: file:///checkpoint-dir/

# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem

//java代码设置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");


// If you manually passed FsStateBackend into the RocksDBStateBackend constructor
// to specify advanced checkpointing configurations such as write buffer size,
// you can achieve the same results by using manually instantiating a FileSystemCheckpointStorage object.
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));

使用建议

1. Keyed State如何清空state,state.clear() 实际上只能清理当前 key 对应的 value 值,如果想要清空整个 state,需要借助于 applyToAllKeys 方法。

2. Operator State慎重使用长list

参考官方文档中对state的介绍和使用的页面:

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/stateful-stream-processing/

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/780932.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号