栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink的详细使用

Flink的详细使用

Flink的详细使用

文章目录
  • Flink的详细使用
  • 1. 安装部署
    • 安装
  • 2. 执行任务
    • Standalone 模式
      • 启动/停止
      • 执行任务
    • Yarn 模式
      • Session-cluster 模式
        • 启动 yarn-session
        • 执行任务
      • Per-Job-Cluster 模式
  • 3. 执行环境
    • Environment
      • getExecutionEnvironment(常用)
      • createLocalEnvironment
      • createRemoteEnvironment
    • Source、Sink
    • Transform(算子)
      • map
      • flatMap
      • Filter
      • KeyBy
      • 滚动聚合算子
      • Reduce
      • Split 和 Select
      • Connect 和 CoMap
      • Union
      • 广播(broadcast)
  • 4. Window
    • TimeWindow
    • CountWindow
  • 5. 时间语义与 Watermark
    • 时间语义
    • Watermark(和 Window 一起使用)
      • 乱序时间的引用
      • 顺序时间的引用
  • 6. 状态管理
    • 键控状态(keyed state)
      • Keyed State 支持数据类型
      • 例子:ValueState
  • 7. ProcessFunction API
    • KeyedProcessFunction
    • TimerService 和 定时器(Timers)
    • 侧输出流(SideOutput)
  • 8. 检查点(CheckPoint)
  • 9. 状态一致性
    • 分类
    • 端到端 exactly-once
      • 事务写入
        • 预写日志(不常用)
        • 两阶段提交
    • Flink+Kafka 端到端状态一致性的保证
  • Maven(pom.xml)

1. 安装部署 安装
  • 第一步:将 flink-1.10.1-bin-scala_2.12.tgz 上传到服务器中并解压缩

  • 第二步:修改 conf/flink-conf.yaml 文件

    # 修改 jobmanager.rpc.address 参数,修改为 jobmanager 的机器
    jobmanager.rpc.address: hadoop151
    
  • 第三步:修改 conf/slaves 文件

    # slave 机器
    hadoop152
    hadoop153
    
  • 第四步:将 flink 整个目录分发到其他机器上

2. 执行任务 Standalone 模式 启动/停止
  • 命令

    # 启动
    bin/start-cluster.sh
    
    # 停止
    bin/stop-cluster.sh
    
  • 访问 web 页面

    • http://hadoop151:8081

执行任务
# =================== 启动任务 ===================
bin/flink run -c 全限定类名 –p 分区个数 jar包
# 示例
bin/flink run -c com.itfzk.flink.wordcount.KafkaStreamWordCount -p 3 FlinkStudyDemo-1.0-SNAPSHOT-jar-with-dependencies.jar


# =================== 停止任务 ===================
bin/flink cancel JobId
# 示例
bin/flink cancel f69fbd0650ae4202b2a46b3ad2089606

Yarn 模式 Session-cluster 模式 启动 yarn-session
  • 命令

    # =================== 启动 yarn-session ===================
    # -n(--container):TaskManager 的数量
    # -s(--slots): 每个 TaskManager 的 slot 数量,默认一个 slot 一个 core,默认每个 taskmanager 的 slot 的个数为 1,有时可以多一些 taskmanager,做冗余
    # -jm:JobManager 的内存(单位 MB)
    # -tm:每个 taskmanager 的内存(单位 MB)
    # -nm:yarn 的 appName(现在 yarn 的 ui 上的名字)
    # -d:后台执行
    bin/yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test -d
    
    
    # =================== 停止 yarn-session ===================
    yarn application -kill Application-Id
    # 示例
    yarn application -kill application_1633171918776_0003
    
  • 访问 web 页面

    • 启动 yarn-session 后会出现 web 地址,例如:http://hadoop153:42189

执行任务
# =================== 启动任务 ===================
bin/flink run -c 全限定类名 –p 分区个数 jar包
# 示例
bin/flink run -c com.itfzk.flink.wordcount.KafkaStreamWordCount -p 3 FlinkStudyDemo-1.0-SNAPSHOT-jar-with-dependencies.jar


# =================== 停止任务 ===================
bin/flink cancel JobId
# 示例
bin/flink cancel f69fbd0650ae4202b2a46b3ad2089606

Per-Job-Cluster 模式
# =================== 启动任务 ===================
bin/flink run –m yarn-cluster -c 全限定类名 –p 分区个数 jar包
# 示例
bin/flink run –m yarn-cluster -c com.itfzk.flink.wordcount.KafkaStreamWordCount -p 3 FlinkStudyDemo-1.0-SNAPSHOT-jar-with-dependencies.jar


# =================== 停止任务 ===================
bin/flink cancel JobId
# 示例
bin/flink cancel f69fbd0650ae4202b2a46b3ad2089606

3. 执行环境 Environment getExecutionEnvironment(常用)
  • 创建一个执行环境,表示当前执行程序的上下文。 getExecutionEnvironment 会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式

    // 普通运行环境
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
    // 流式运行环境(常用)
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    

createLocalEnvironment
  • 返回本地执行环境,需要在调用时指定默认的并行度

    LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
    

createRemoteEnvironment
  • 返回集群执行环境,将 Jar 提交到远程服务器。需要在调用时指定 JobManager的 IP 和端口号,并指定要在集群中运行的 Jar 包

    StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123, "YOURPATH//WordCount.jar");
    

Source、Sink
  • 官网地址

Transform(算子)
  • 官网地址
map
DataStream mapStram = dataStream.map(new MapFunction() {
    public Integer map(String value) throws Exception {
    }
});
flatMap
DataStream flatMapStream = dataStream.flatMap(new FlatMapFunction() {
    public void flatMap(String value, Collector out) throws Exception {
    }
});
Filter
DataStream filterStream = dataStream.filter(new FilterFunction() {
    public boolean filter(String value) throws Exception {
    }
});
KeyBy
  • DataStream → KeyedStream
  • 将一个流拆分成不相交的分区,每个分区包含具有相同 key 的元素,在内部以 hash 的形式实现的
滚动聚合算子
  • sum()
  • max()
  • min()
  • maxBy()
  • minBy()
Reduce
  • KeyedStream → DataStream
  • 一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果
Split 和 Select

Split

  • DataStream → SplitStream
    • 根据某些特征把一个 DataStream 拆分成两个或者多个 DataStream

Select

  • SplitStream→DataStream
    • 从一个 SplitStream 中获取一个或者多个DataStream
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    String filePath = "E:\~fzk\java\IDEA\bigdata\FlinkStudyDemo\test\test1";
    DataStream inputDataStream = env.readTextFile(filePath);
    
    DataStream map = inputDataStream.map(new MapFunction() {
        public SensorsData map(String value) throws Exception {
            String[] splits = value.split(" ");
            return new SensorsData(splits[0], new Long(splits[1]), new Double(splits[2]));
        }
    });
    
    KeyedStream keyedStream = map.keyBy("id");

    // split:分流
    SplitStream splitStream = keyedStream.split(new OutputSelector() {
        public Iterable select(SensorsData value) {
            return value.getWendu() > 37 ? Collections.singletonList("h") : Collections.singletonList("d");
        }
    });

    // select:选择一个或多个 DataStream
    DataStream resultDataStream = splitStream.select("d");

    env.execute();
}

Connect 和 CoMap

Connect

  • DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被 Connect 之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立

CoMap

  • ConnectedStreams → DataStream:作用于 ConnectedStreams 上,功能与 map 和 flatMap 一样,对 ConnectedStreams 中的每一个 Stream 分别进行 map 和 flatMap处理
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    String filePath = "E:\~fzk\java\IDEA\bigdata\FlinkStudyDemo\test\test1";
    DataStream inputDataStream = env.readTextFile(filePath);

    DataStream map = inputDataStream.map(new MapFunction() {
        public SensorsData map(String value) throws Exception {
            String[] splits = value.split(" ");
            return new SensorsData(splits[0], new Long(splits[1]), new Double(splits[2]));
        }
    });

    KeyedStream keyedStream = map.keyBy("id");

    SplitStream splitStream = keyedStream.split(new OutputSelector() {
        public Iterable select(SensorsData value) {
            return value.getWendu() > 37 ? Collections.singletonList("high") : Collections.singletonList("low");
        }
    });

    DataStream highDataStream = splitStream.select("high");
    DataStream lowDataStream = splitStream.select("low");

    // connect & CoMapFunction:合流
    ConnectedStreams connectedStreams = highDataStream.connect(lowDataStream);
    
    DataStream resultDataStream = connectedStreams.map(new CoMapFunction() {
        public Object map1(SensorsData value) throws Exception {
            return value;
        }

        public Object map2(SensorsData value) throws Exception {
            return value;
        }
    });

    env.execute();
}
 

Union
  • DataStream → DataStream:对两个或者两个以上的 DataStream 进行 union 操作,产生一个包含所有 DataStream 元素的新 DataStream
  • 连接流的类型一样

广播(broadcast)
  • DataStream → DataStream:向每个分区广播元素

    dataStream.broadcast();
    

4. Window

官网地址

  • TimeWindow:按照时间生成 Window

  • CountWindow:按照指定的数据条数生成一个 Window,与时间无关

TimeWindow
// 窗口大小为 10s,每次滑动 5s
DataStream> resultDataStream = stringDataSource
                .flatMap(new MyFlatMapFunction())
                .keyBy(0)
                .timeWindow(Time.seconds(10), Time.seconds(5))
                .sum(1);

CountWindow
// 窗口大小为 10 个数,每次滑动 5 个数
DataStream> resultDataStream = stringDataSource
                .flatMap(new MyFlatMapFunction())
                .keyBy(0)
                .countWindow(10, 5)
                .sum(1);

5. 时间语义与 Watermark 时间语义
  • Event Time:是事件创建的时间
  • Ingestion Time:是数据进入 Flink 的时间
  • Processing Time:是每一个执行基于时间操作的算子的本地系统时间

时间语义的引入

// EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// IngestionTime
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

// ProcessingTime
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

Watermark(和 Window 一起使用)
  • Watermark 是一种衡量 Event Time 进展的机制
  • Watermark 是用于处理乱序事件的,而正确的处理乱序事件,通常用 Watermark 机制结合 window 来实现
  • Watermark 可以理解成一个延迟触发机制,我们可以设置 Watermark 的延时 时长 t,每次系统会校验已经到达的数据中最大的 maxEventTime,然后认定 eventTime 小于 maxEventTime - t 的所有数据都已经到达,如果有窗口的停止时间等于 maxEventTime – t,那么这个窗口被触发执行

乱序时间的引用
  • 接口:AssignerWithPeriodicWatermarks

  • 使用前设置时间语义

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        //设置时间语义
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //周期性的生成 watermar,默认周期是 200 毫秒
        env.getConfig().setAutoWatermarkInterval(5000);
    
        String filePath = "E:\~fzk\java\IDEA\bigdata\FlinkStudyDemo\test\test1";
        DataStream inputDataStream = env.readTextFile(filePath);
    
        DataStream map = inputDataStream.map(new MapFunction() {
            public SensorsData map(String value) throws Exception {
                String[] splits = value.split(" ");
                return new SensorsData(splits[0], new Long(splits[1]), new Double(splits[2]));
            }
        });
    
        //乱序时间情况下的 watermark
        //Time.milliseconds(1000) :延迟时间,1000ms
        DataStream eventTimeDataStream = map.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1000)) {
            @Override
            public long extractTimestamp(SensorsData element) {
                return element.getTimestamp();
            }
        });
    
        env.execute();
    }
    
    
    // 类
    public class SensorsData {
        private String id;
        private Long timestamp;
        private double wendu;
    }
    

顺序时间的引用
  • 接口:AssignerWithPunctuatedWatermarks

  • 使用前设置时间语义

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        //设置时间语义
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
        String filePath = "E:\~fzk\java\IDEA\bigdata\FlinkStudyDemo\test\test1";
        DataStream inputDataStream = env.readTextFile(filePath);
    
        DataStream map = inputDataStream.map(new MapFunction() {
            public SensorsData map(String value) throws Exception {
                String[] splits = value.split(" ");
                return new SensorsData(splits[0], new Long(splits[1]), new Double(splits[2]));
            }
        });
    
        //顺序时间情况下的 watermark
        DataStream eventTimeDataStream = map.assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {
            @Override
            public long extractAscendingTimestamp(SensorsData element) {
                return element.getTimestamp();
            }
        });
    
        env.execute();
    }
    
    
    // 类
    public class SensorsData {
        private String id;
        private Long timestamp;
        private double wendu;
    }
    

6. 状态管理 键控状态(keyed state)
  • 键控状态是根据输入数据流中定义的键(key)来维护和访问的。Flink 为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个 key 对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的 key。因此,具有相同 key 的所有数据都会访问相同的状态。Keyed State 很类似于一个分布式的 key-value map 数据结构,只能用于 KeyedStream(keyBy 算子处理之后)
  • 存储一份状态值
Keyed State 支持数据类型
  • ValueState保存单个的值,值的类型为 T
    • get 操作: ValueState.value()
    • set 操作: ValueState.update(T value)
  • ListState保存一个列表,列表里的元素的数据类型为 T
    • ListState.add(T value)
    • ListState.addAll(List values)
    • ListState.get()返回 Iterable
    • ListState.update(List values)
  • MapState保存 Key-Value 对
    • MapState.get(UK key)
    • MapState.put(UK key, UV value)
    • MapState.contains(UK key)
    • MapState.remove(UK key)

例子:ValueState
  • 我们可以利用 Keyed State,实现这样一个需求:检测传感器的温度值,如果连续的两个温度差值超过 10 度,就输出报警

    public class Test {
        public static void main(String[] args) throws Exception {
            //创建执行环节
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            DataStream inputDataStream = env.socketTextStream("localhost", 9999);
    
            SingleOutputStreamOperator myBeanDataStream = inputDataStream.map(new MapFunction() {
                @Override
                public MyBean map(String s) throws Exception {
                    String[] split = s.split(" ");
                    return new MyBean(split[0], Double.valueOf(split[1]));
                }
            });
    
            SingleOutputStreamOperator> resultDataStream = myBeanDataStream
                    .keyBy((KeySelector) data -> data.getId())
                    .flatMap(new MyRichFlatMapFunction(10.0));
    
            resultDataStream.print();
    
            env.execute();
        }
    }
    
    
    // 富方法:存储状态值
    public class MyRichFlatMapFunction extends RichFlatMapFunction> {
        private ValueState myValueState;
      
        private Double abs;
    
        public MyRichFlatMapFunction(Double abs) {
            this.abs = abs;
        }
    
        @Override
        public void open(Configuration parameters) throws Exception {
            // 创建状态值
            myValueState = getRuntimeContext().getState(new ValueStateDescriptor("my-flatmap", Double.class));
        }
    
        @Override
        public void flatMap(MyBean myBean, Collector> collector) throws Exception {
            // 获取状态值
            Double lastWendu = myValueState.value();
            if(lastWendu != null){
                double absWebdu = Math.abs(myBean.getWendu() - lastWendu);
                if (absWebdu > abs){
                    collector.collect(new Tuple3<>(myBean.getId(), lastWendu, myBean.getWendu()));
                }
            }
            // 修改状态值
            myValueState.update(myBean.getWendu());
        }
    
        @Override
        public void close() throws Exception {
            // 清除状态值
            myValueState.clear();
        }
    }
    
    
    // 实体类
    public class MyBean {
        private String id;
        private Double wendu;
    }
    

7. ProcessFunction API
  • DataStream API 提供了一系列的 Low-Level 转换算子。可以访问时间戳、watermark 以及注册定时事件。还可以输出特定的一些事件,例如超时事件等
  • Flink 提供了 8 个 Process Function
    • ProcessFunction
    • KeyedProcessFunction
    • CoProcessFunction
    • ProcessJoinFunction
    • BroadcastProcessFunction
    • KeyedBroadcastProcessFunction
    • ProcessWindowFunction
    • ProcessAllWindowFunction

KeyedProcessFunction
  • KeyedProcessFunction 用来操作 KeyedStream。KeyedProcessFunction 会处理流的每一个元素,输出为 0 个、1 个或者多个元素。所有的 Process Function 都继承自RichFunction 接口,所以都有 open()、close()和 getRuntimeContext()等方法。

  • KeyedProcessFunction还额外提供了两个方法

    • processElement
      • 流中的每一个元素都会调用这个方法,调用结果将会放在 Collector 数据类型中输出。Context 可以访问元素的时间戳,元素的 key,以及 TimerService 时间服务。Context 还可以将结果输出到别的流(side outputs)
    • onTimer
      • 回调函数。当之前注册的定时器触发时调用。参数 timestamp 为定时器所设定的触发的时间戳。Collector 为输出结果的集合。onTimerContext 和processElement 的 Context 参数一样,提供了上下文的一些信息,例如定时器触发的时间信息(事件时间或者处理时间)
    class MyKeyedProcessFunction extends KeyedProcessFunction {
        @Override
        public void open(Configuration parameters) throws Exception {
        }
    
        @Override
        public void processElement(MyBean myBean, KeyedProcessFunction.Context context, Collector collector) throws Exception {
            collector.collect(myBean);
        }
    
        @Override
        public void onTimer(long timestamp, KeyedProcessFunction.onTimerContext ctx, Collector out) throws Exception {
        }
    
        @Override
        public void close() throws Exception {
        }
    }
    

TimerService 和 定时器(Timers)
  • Context 和 onTimerContext 所持有的 TimerService 对象拥有以下方法

    • long currentProcessingTime() :返回当前处理时间
    • long currentWatermark() :返回当前 watermark 的时间戳
    • void registerProcessingTimeTimer(long timestamp) :会注册当前 key 的processing time 的定时器。当 processing time 到达定时时间时,触发 timer。
    • void registerEventTimeTimer(long timestamp) :会注册当前 key 的 event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
    • void deleteProcessingTimeTimer(long timestamp) :删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行
    • void deleteEventTimeTimer(long timestamp) :删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行
    class MyKeyedProcessFunction extends KeyedProcessFunction {
        @Override
        public void processElement(MyBean myBean, KeyedProcessFunction.Context context, Collector collector) throws Exception {
            long currentProcessingTime = context.timerService().currentProcessingTime();
            long currentWatermark = context.timerService().currentWatermark();
            context.timerService().registerProcessingTimeTimer(10000l);
            context.timerService().registerEventTimeTimer(10000l);
            context.timerService().deleteProcessingTimeTimer(10000l);
            context.timerService().deleteEventTimeTimer(10000l);
        }
    }
    

侧输出流(SideOutput)
  • process function 的 side outputs 功能可以产生多条流,并且这些流的数据类型可以不一样。一个 side output 可以定义为 OutputTag[X]对象,X 是输出流的数据类型。process function 可以通过 Context 对象发射一个事件到一个或者多个 side outputs

  • 事例:监控传感器温度值,将温度值低于 30 度的数据输出到 side output

    public class Test {
        public static void main(String[] args) throws Exception {
            //创建执行环节
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            env.setParallelism(1);
    
            DataStream inputDataStream = env.socketTextStream("localhost", 9999);
    
            SingleOutputStreamOperator> myBeanDataStream = inputDataStream.map(new MapFunction>() {
                @Override
                public Tuple2 map(String s) throws Exception {
                    String[] split = s.split(" ");
                    return new Tuple2(split[0], Double.valueOf(split[1]));
                }
            });
    
            //定义侧输出流
            OutputTag> outputTag = new OutputTag>("high-output") {};
    
            // 使用自定义算子:ProcessFunction
            SingleOutputStreamOperator> resultDataStream = myBeanDataStream
                    .process(new MyProcessFunction(30.0, outputTag));
    
            resultDataStream.print("low-wendu");
    
            // 获取侧输出流并输出
            resultDataStream.getSideOutput(outputTag).print("high-wendu");
    
            env.execute();
        }
    
    
    
        private static class MyProcessFunction extends ProcessFunction, Tuple2> {
            private Double wenduLimit;
            private OutputTag> outputTag;
    
            // 初始化
            public MyProcessFunction(Double wenduLimit, OutputTag> outputTag) {
                this.wenduLimit = wenduLimit;
                this.outputTag = outputTag;
            }
    
            @Override
            public void processElement(Tuple2 myBean, ProcessFunction, Tuple2>.Context context, Collector> collector) throws Exception {
                // 温度高于限制温度就将数据加入到侧输出流,否则正常输出
                if(myBean.f1 > wenduLimit){
                    context.output(outputTag, myBean);
                }else {
                    collector.collect(myBean);
                }
            }
        }
    }
    

8. 检查点(CheckPoint)
  • 在执行流应用程序期间,Flink 会定期保存状态的一致检查点
  • 如果发生故障, Flink 将会使用最近的检查点来一致恢复应用程序的状态,并重新启动处理流程
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
  
    // 检查点数据保存位置
    // RocksDB
    env.setStateBackend(new RocksDBStateBackend(new URI("hdfs:///tmp/check-point-test")));
    // 文件
    //env.setStateBackend(new FsStateBackend("file:///tmp/check-point-test"));
    // 内存
    //env.setStateBackend(new MemoryStateBackend());

    // 检查点配置
    // 设置 checkpoint 周期时间
    env.enableCheckpointing(10000l);
    // 设置 exactly-once 模式
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    // 设置检查点在被丢弃之前可能需要的最长时间
    env.getCheckpointConfig().setCheckpointTimeout(60000);
    // 设置检查点尝试之间的最小暂停
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
    // 设置可能同时进行的检查点尝试的最大次数
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    // 使检查点能够在外部持久化,当拥有的作业失败或暂停时
    env.getCheckpointConfig().enableExternalizedCheckpoints(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

    // 重启策略
    env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 60000));
}

9. 状态一致性 分类
  • AT-MOST-ONCE(最多一次)
    • 当任务故障时,最简单的做法是什么都不干,既不恢复丢失的状态,也不重播丢失的数据。最多处理一次事件
  • AT-LEAST-ONCE(至少一次)
    • 在大多数的真实应用场景,我们希望不丢失事件。所有的事件都得到了处理,而一些事件还可能被处理多次
  • EXACTLY-ONCE(精确一次)
    • 恰好处理一次语义不仅仅意味着没有事件丢失,还意味着针对每一个数据,内部状态仅仅更新一次

端到端 exactly-once
  • 内部保证 :checkpoint
  • source :可重设数据的读取位置
  • sink :从故障恢复时,数据不会重复写入外部系统
    • 幂等写入
    • 事务写入

事务写入
  • 构建的事务对应着 checkpoint,等到 checkpoint 真正完成的时候,才把所有对应的结果写入 sink 系统中
  • 实现方式
    • 预写日志
    • 两阶段提交

预写日志(不常用)
  • 把结果数据先当成状态保存,然后在收到 checkpoint 完成的通知时,一次性写入 sink 系统
  • 简单易于实现,由于数据提前在状态后端中做了缓存,所以无论什么sink 系统,都能用这种方式一批搞定
  • DataStream API 提供了一个模板类:GenericWriteAheadSink,来实现这种事务性 sink

两阶段提交
  • 对于每个 checkpoint,sink 任务会启动一个事务,并将接下来所有接收的数据添加到事务里
  • 然后将这些数据写入外部 sink 系统,但不提交它们 —— 这时只是“预提交”
  • 当它收到 checkpoint 完成的通知时,它才正式提交事务,实现结果的真正写入

Flink+Kafka 端到端状态一致性的保证
  • 内部
    • heckpoint 机制,把状态存盘,发生故障的时候可以恢复,保证内部的状态一致性
  • source
    • kafka consumer 作为 source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据,保证一致性
  • sink
    • kafka producer 作为sink,采用两阶段提交 sink,需要实现一个 TwoPhaseCommitSinkFunction

Maven(pom.xml)






    
        org.apache.flink
        flink-clients_${scala.binary.version}
        ${flink.version}

    
    
    
        org.apache.flink
        flink-sql-connector-kafka_${scala.binary.version}
        ${flink.version}
    
    

    
        org.apache.flink
        flink-connector-kafka_${scala.binary.version}
        ${flink.version}
        
            
                kafka-clients
                org.apache.kafka
            
        
    

    
        org.apache.flink
        flink-streaming-java_${scala.binary.version}

        
        
        
            
                slf4j-api
                org.slf4j
            
            
                commons-collections
                commons-collections
            
        
        ${flink.version}
    

    
    
        org.slf4j
        slf4j-api
        1.7.25
    
    
        org.slf4j
        slf4j-simple
        1.7.25
    

    
        mysql
        mysql-connector-java
        8.0.13
    

    
    
        com.google.guava
        guava
        18.0
    
    
        com.google.code.gson
        gson
        2.8.5
    

    
        com.fasterxml.jackson.core
        jackson-databind
        2.9.4
    

    
        org.apache.httpcomponents
        httpclient
        4.5.2
        
            
                commons-logging
                commons-logging
            
        
    

    
        org.projectlombok
        lombok
        1.18.4
    

    
        com.jayway.jsonpath
        json-path
        2.4.0
        compile
    

    
        joda-time
        joda-time
        2.9.9
    

    
        junit
        junit
        4.12
        test
    
    
    
        asm
        asm
        3.3.1
    
    
        asm
        asm-commons
        3.3.1
    
    
        asm
        asm-util
        3.3.1
    
    
        cglib
        cglib-nodep
        2.2.2
    

    
    
        org.apache.flink
        flink-statebackend-rocksdb_${scala.binary.version}
        compile
        
            
                slf4j-api
                org.slf4j
            
        
        ${flink.version}
    

    
        com.beust
        jcommander
        1.72
    

    
    
        com.alibaba
        druid
        1.1.21
    
    
        commons-dbutils
        commons-dbutils
        1.7
    

    
        com.alibaba
        fastjson
        1.2.72
    

    
        org.apache.kafka
        kafka_${scala.binary.version}
        ${kafka.version}
        compile
    

    
    
        org.apache.commons
        commons-collections4
        4.1
    

    
    
        com.github.oshi
        oshi-core
        3.5.0
    

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号