栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

大数据之flink容错机制

大数据之flink容错机制

一、概念理解

1、State状态

Flink实时计算程序为了保证计算过程中,出现异常可以容错,就要将中间的计算结果数据存储起来,这些中间数据就叫做State。

2、StateBackEnd

用来保存State的存储后端就叫做StateBackEnd,默认是保存在JobManager的内存中,也可以保存的本地文件系统或HDFS这样的分布式文件系统

3、CheckPointing

Flink实时计算为了容错,可以将中间数据定期保存到起来,这种定期触发保存中间结果的机制叫CheckPointing

二、重启策略

设置重启策略:固定间隔、失败率、无限重启

package cn._51doit.flink.day06;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

//设置重启策略
public class RestartStrategyDemo1 {

    public static void main(String[] args) throws Exception{

        //创建Flink流计算执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //设置重启策略
        //env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));
        //env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.seconds(30), Time.seconds(3)));  //30秒内不能达到3次,每次重启延迟时间为3秒
        //开启checkpoint
        env.enableCheckpointing(10000); //如果开启checkpoint,默认的重启策略是无限重启

        //创建DataStream
        //Source
        DataStreamSource lines = env.socketTextStream("localhost", 8888);

        //调用Transformation开始
        //调用Transformation
        SingleOutputStreamOperator> wordAndOne = lines.flatMap(new FlatMapFunction>() {
            @Override
            public void flatMap(String line, Collector> collector) throws Exception {
                String[] words = line.split(" ");
                for (String word : words) {
                    if("error".equals(word)) {
                        throw new RuntimeException("出现异常了!!!!!");
                    }
                    //new Tuple2(word, 1)
                    collector.collect(Tuple2.of(word, 1));
                }
            }
        });

        //分组
        KeyedStream, String> keyed = wordAndOne.keyBy(new KeySelector, String>() {
            @Override
            public String getKey(Tuple2 tp) throws Exception {
                return tp.f0;
            }
        });

        //聚合
        SingleOutputStreamOperator> summed = keyed.sum(1);

        //Transformation结束s
        //调用Sink
        summed.print();

        //启动执行
        env.execute("StreamingWordCount");

    }
}

三、

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/751993.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号