栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink 之 MapState使用总结

Flink 之 MapState使用总结

1. Flink MapState TTL 设置
public class IopvDeduplicateProcessFunction extends RichFlatMapFunction {
    private MapState mapState;

    
    @Override
    public void open(Configuration parameters) throws Exception {
        // MapState 状态的TTL 是针对状态的处理时间定义 (ProcessingTime)
        StateTtlConfig stateTtlConfig = StateTtlConfig
                // 状态有效时间    1天过期
                .newBuilder(Time.days(1))
                // 设置状态的更新类型
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                // 已过期还未被清理掉的状态数据不返回给用户
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                // 过期对象的清理策略 全量清理
                .cleanupFullSnapshot()
                .build();

        // MapState 状态管理配置
        MapStateDescriptor descriptor = new MapStateDescriptor("MapDescriptor", String.class, FundIopvIndicators.class);

        // 启用状态的存活时间
        descriptor.enableTimeToLive(stateTtlConfig);
        // 获取状态实例
        mapState = getRuntimeContext().getMapState(descriptor);

    }

    @Override
    public void flatMap(FundIopvIndicators iopvIndicators, Collector collector) throws Exception {
        String strTime = iopvIndicators.getStrTime();
        if (mapState.get(strTime) == null) {
            mapState.put(strTime, iopvIndicators);
            collector.collect(iopvIndicators);
        }
    }
}
2. TTL 过期时间 和 作用域

目前(Flink1.13.1)状态的TTL是针对于状态的处理时间定义的

表示状态的过期时间

通过newBuilder(Time.seconds(1))设置

一旦设置了 TTL,那么如果上次访问的时间戳 + TTL超过了当前时间,则表明状态过期了。

3. 状态时间戳更新时机
通过setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)设置


表示状态时间戳的更新的时机(延长状态有效期),是一个 Enum 对象。

如果设置为 Disabled,则表明不更新时间戳;

如果设置为 OnCreateAndWrite,则表明当状态创建或每次写入时都会更新时间戳;

如果设置为 OnReadAndWrite,在状态创建、写入、读取均会更新状态的时间戳。

4. 过期状态处理策略(是否返回给用户)
通过setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)设置

ReturnExpiredIfNotCleanedUp,那么即使这个状态的时间戳表明它已经过期了,但是只要还未被真正清理掉,就会被返回给调用方;(即即使状态过期了,仍会把过期的状态返回给用户)

NeverReturnExpired,那么一旦这个状态过期了,那么永远不会被返回给调用方,只会返回空状态,避免了过期状态带来的干扰。(过期的状态不会返回给用户)

5. 过期状态清理策略
.cleanupIncrementally() 增量清理

.cleanupFullSnapshot() 全量清理

FULL_STATE_SCAN_SNAPSHOT ,对应的是 EmptyCleanupStrategy 类,表示对过期状态不做主动清理,当执行完整快照(Snapshot / Checkpoint)时,会生成一个较小的状态文件,但本地状态并不会减小。唯有当作业重启并从上一个快照点恢复后,本地状态才会实际减小,因此可能仍然不能解决内存压力的问题。

为了应对这个问题,Flink 还提供了增量清理的枚举值,分别是针对 Heap StateBackend 的 INCREMENTAL_CLEANUP(对应 IncrementalCleanupStrategy 类)


以及对 RocksDB StateBackend 有效的 ROCKSDB_COMPACTION_FILTER(对应 RocksdbCompactFilterCleanupStrategy 类)

对于增量清理功能,Flink 可以被配置为每读取若干条记录就执行一次清理操作,而且可以指定每次要清理多少条失效记录;对于 RocksDB 的状态清理,则是通过 JNI 来调用 C++ 语言编写的 FlinkCompactionFilter 来实现,底层是通过 RocksDB 提供的后台 Compaction 操作来实现对失效状态过滤的。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/460718.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号