栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink作业开发清单(3)——Checkpoint

Flink作业开发清单(3)——Checkpoint

Checkpoint

Checkpoint功能可以保证作业失败重启或升级重启后,从上次"离开"的位置继续运行;
比如Flink Kafka就是通过Checkpoint记录消费的Offset记录的;如果没有开启Checkpoint,那么每次重启作业可能会重复消费数据或者丢失数据(与配置相关);

1.开启Checkpoint
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
CheckpointConfig config = env.getCheckpointConfig();
config.setCheckpointInterval(5 * 60 * 1000); // Checkpoint的触发频率;
config.setMinPauseBetweenCheckpoints(5 * 60 * 1000); // Checkpoint之间的最小间隔;
config.setCheckpointTimeout(10 * 60 * 1000); // Checkpoint的超时时间;
config.setTolerableCheckpointFailureNumber(3); // 连续3次checkpoint失败,才会导致作业失败重启;默认值是0 。
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // Cancel Job之后保留Checkpoint文件;

说明:

    Checkpoint间隔不宜过短,3~5min为宜。为了避免Checkpoint抖动(比如写HDFS超时)导致作业重启,可以配置config.setTolerableCheckpointFailureNumber(3),即Checkpoint连续失败一定次数才重启作业。

2.指定UID

Operator在恢复状态的时候,是通过“UID”来判断要恢复的状态的,即UID和状态唯一绑定。如果不手动指定UID,那么修改代码后UID可能发生变化,导致状态无法正常恢复。

强烈建议通过如下方式,给有状态的Operator指定UID(包括Source和Sink):

env.addSource(new MySource()).uid("my-source")
        .keyBy(anInt -> 0)
        .map(new MyStatefulFunction()).uid("my-map")
        .addSink(new DiscardingSink<>()).uid("my-sink");

3.State Backend

推荐使用RocksDB。

    filesystem

    当前状态数据会被保存在TaskManager的内存中(容易出现OOM问题);优势是状态存取效率比较高;

    rocksdb

    RocksDB 是一种嵌入式的本地数据库,当前状态数据会被保存在TaskManager的本地磁盘上;(不容易出现内存问题)状态存取效率比filesystem稍微低一些;

当使用 RocksDB 时,状态大小只受限于磁盘可用空间的大小。这也使得 RocksDBStateBackend 成为管理超大状态的最佳选择。

当使用RocksDB的时候,建议添加如下配置:

containerized.heap-cutoff-ratio=0.5

RocksDB会使用JVM off-heap内存,该配置会提高off-heap占所有内存的比率,详细的可以看下面的内存配置。

4.内存配置

建议JobManager和TaskManager的内存最少2G(如果作业逻辑非常简单,可以调低TaskManager的内存);

用户配置的内存,并不是最终运行的Heap内存。Heap内存(-Xms和-Xmx)的具体值的计算可以简单的理解为如下公式:

用户配置内存 * (1 - containerized.heap-cutoff-ratio)

containerized.heap-cutoff-ratio的默认值为0.25,所以如果用户配置的内存为2G,则实际的Heap内存为2G * (1 - 0.25) = 1.5G,余下的0.5G会分配给DirectMemory。

因此:

如果Flink作业的TaskManager频繁被Yarn Kill,可以尝试调大"containerized.heap-cutoff-ratio=0.3”甚至更高,来避免Off Heap使用过多被Kill。

如果使用的RocksDB StateBackend,可以尝试调大"containerized.heap-cutoff-ratio=0.5”甚至更高,来提供给RocksDB更多可用内存。

5.Checkpoint目录

有两种方式配置Checkpoint目录:

在流式作业管理服务中配置【框架参数】:

state.backend=rocksdb
state.checkpoints.dir=hdfs://hadoop/user/xxxx/flink/flink-job-name
在代码中配置:
streamExecutionEnvironment.setStateBackend(new FsStateBackend("hdfs://hadoop/user/xxxx/flink/flink-job-name"))

特别注意:

Checkpoint目录要保证作业之间不冲突;如果同时使用了两种方式,会使用代码中指定的目录;推荐只使用第一种方式,因为第一种方式可以让流式作业管理服务自动处理Checkpoint的恢复;

6.恢复验证

可以在JobManager的Log文件中搜索是否存在如下日志:

INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Starting job 55001dbe3b54ee3505c0667bf028ebae from savepoint hdfs://xxxx/xxxx (allowing non restored state)
INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Reset the checkpoint ID of job 55001dbe3b54ee3505c0667bf028ebae to 158.
INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Restoring job 55001dbe3b54

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/784912.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号