栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink的容错重启和恢复

Flink的容错重启和恢复

一、重启机制

Flink的重启机制在代码中的设置:

1. NoRestart

表示发生错误不会重启

env.setRestartStrategy(RestartStrategies.noRestart());
2. EnableCheckpointRestart

默认重启策略,固定频率

env.enableCheckpointing(2000);
3. FailureRateRestart

设置失败次数,启动间隔,当重启次数达到设置的失败次数,结束重启终止作业。

// 失败次数5次,时间段为5分钟内,重启等待时间5秒
env.setRestartStrategy(RestartStrategies.failureRateRestart(5, Time.of(5, TimeUnit.MINUTES), Time.of(5, TimeUnit.SECONDS)));
4. FixedDelayRestart

以固定频率重启作业。

// 重启3次,每次间隔2秒
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(2, TimeUnit.SECONDS) ));
二、从CheckPoint重启

checkpoint是当flink遇到异常节点时,从当前自动保存的checkpoint进行重启

1. EnableCheckpoint

设置间隔,自动检查点,当重启作业时会从检查点继续

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(2, TimeUnit.SECONDS) ));

// 间隔20建立检查点
env.enableCheckpointing(20);
2. FsStateBackend

将检查点保存在文件系统。

// 设置重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(2, TimeUnit.SECONDS) ));

// 打开Checkpoint
env.enableCheckpointing(20);

env.setStateBackend(new FsStateBackend("file:///tmp/chkdir", false));

// 作业停止后依然保留checkpoint文件
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
三、从SavePoint重启

savepoint一般是当系统升级修复时,需要暂时停止程序,这时将当前状态保存为savepoint.

1. SavePoint存储

主要需要设置配置文件flink-conf.yaml

# 配置statebackend
state.backend: filesystem
# 配置checkpoint&savepoint
state.checkpoints.dir: file:///tmp/chkdir
state.savepoints.dir: file:///tmp/chkdir
# 配置失败重启策略
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 2 s
# 配置checkpoint保存个数
state.checkpoints.num-retained: 2
# 配置local recovery for this state backend
state.backend.local-recovery: true

程序

// 打开Checkpoint, 我们也可以用 -D  CLI设置
env.enableCheckpointing(20);
// 作业停止后保留CP文件
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

然后maven打包,并在flink页面submit即可

mvn clean package -DskipTests

  • 也可以在命令行中进行提交
    ./bin/flink run -m localhost:6123 -c (job函数绝对路径) (本地jar文件绝对路径)
    

具体的提交job参数可参考:Flink命令行

2. 作业恢复
  • 可在命令行中直接从savepoint恢复作业
./bin/flink run -m localhost:6123 -s file:///tmp/chkdir/b2817e8637ffa46aab6cba3d640e8e37/chk-159 -c (job函数绝对路径) (本地jar文件绝对路径)
  • 也可以在页面中提交
3. 手动创建savepoint文件
./bin/flink savepoint (jobID)
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/696390.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号