栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink checkPoint容错机制配置

Flink checkPoint容错机制配置

Flink checkPoint容错机制配置

flink版本:flink1.13.1

code
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.concurrent.TimeUnit;

public class StateBackendTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //  1.设置状态后台
        env.setStateBackend(new MemoryStateBackend()); // 本地内存状态后台
        //env.setStateBackend(new FsStateBackend("hdfs://hadoop100:9000/flink/checkpoints"));
        //env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));

        // 2. 设置检查点 每秒执行1次checkPoint
        env.enableCheckpointing(1000);

        // checkPoint 参数优化
        // checkPoint 模式,精确一次
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // checkPoint超时时间,60s
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // 同一时间只允许进行一个检查点
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // 设置两次检查点的最小的时间间隔
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        //设置是否优先使用检查点恢复机制。默认为false,即checkpoint和savepoint之间采用就近原则,设为true,则优先使用checkpoint
        env.getCheckpointConfig().setPreferCheckpointForRecovery(false);
        // 设置可容忍的检查点失败数量,默认是0,即不允许任何checkpoint检查点失败,如果checkpoint失败则任务失败,直接重启
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);

        // 3. 设置重启策略
        
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,60000L));

        
        env.setRestartStrategy(RestartStrategies.failureRateRestart(5, Time.of(5, TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));


        env.execute("StateBackend");
    }

}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/585114.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号