栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

04-Flink Window API

04-Flink Window API

Window API
  • 1. Window 简介
    • 概念
    • 窗口类型
      • 滚动窗口(Tumbling Windows)
      • 滑动窗口(Sliding Windows)
      • 会话窗口(Session Windows)
  • 2. Window API
    • WindowAssigne
    • 窗口创建
    • 窗口函数
      • 增量聚合函数
      • 全量窗口函数
    • 计数窗口
    • 会话窗口
  • 3. 其他可选 API

1. Window 简介 概念
  • 一般真实的流都是无界的,怎么处理无界的数据?
  • 可以把无限的数据流进行切分,得到有限的数据集进行处理(即转换为有界流)。
  • 窗口(window)就是将无限流切割为有限流的一种方式,它会将流数据分发到有限大小的桶(bucket)进行分析。

窗口类型
  • 时间窗口(Time Window)

    • 滚动时间窗口
    • 滑动时间窗口
    • 会话窗口
  • 计数窗口(Count Window)

    • 滚动计数窗口
    • 滑动计数窗口
滚动窗口(Tumbling Windows)

  • 将数据以固定的窗口长度对数据进行切分。
  • 时间对齐,窗口长度固定,没有重叠。
滑动窗口(Sliding Windows)

  • 滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。
  • 窗口长度固定,可以有重叠。
会话窗口(Session Windows)

  • 由一系列事件组合一个指定时间长度的 timeout 间隔组成,也就是一段时间没有接收到新数据就会生成新的窗口。
  • 特点:时间无对齐。
2. Window API

        window():

  • 我们可以用.window()来定义一个窗口,然后基于这个window去做一些聚合或者其他处理操作。注意window()方法必须在keyBy之后才能用。
  • Flink提供了更加简单的.timeWindow和.countWindow方法,用于定于时间窗口和计数窗口。
public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

//        // 从文件中读取数据
        DataStream inputStream = env.readTextFile("test.txt");

        // 转换成 Score 类型
        DataStream dataStream = inputStream.map(new MapFunction() {
            @Override
            public Score map(String value) throws Exception {
                String[] array = value.split(",");
                return new Score(array[0], array[1], Integer.parseInt(array[2]));
            }
        });

        // 开窗测试
        dataStream.keyBy("course")
//                .countWindow(10, 2);
//                .window(EventTimeSessionWindows.withGap(Time.minutes(1)));
//                .timeWindow(Time.seconds(2));
                  .window(TumblingEventTimeWindows.of(Time.seconds(15)));

        dataStream.print();

        env.execute();
WindowAssigne
  • window()方法接收的输入参数是一个 WindowAssigne
  • WindowAssigne 负责将每条输入到数据分发到正确到window中
  • Flink提供了通用的WindowAssigne
    • 滚动窗口(tumbling window)
    • 滑动窗口(sliding window)
    • 会话窗口(session window)
    • 全局窗口 (global window)
窗口创建
  • 滚动时间窗口
	.timeWindow(Time.seconds(15))
  • 滑动时间窗口
	.timeWindow(Time.seconds(15), Time.seconds(5))
  • 会话窗口
	.window(EventTimeSessionWindows.withGap(Time.minutes(5)))
  • 滚动计数窗口
	.countWindow(5)
  • 滑动计数窗口
	.countWindow(5, 2)
窗口函数
  • window function定义了要对窗口中收集的数据做的计算操作
  • 可以分为两类:
    • 增量聚合函数
      • 每条数据到来就进行计算,保持一个简单的状态
      • Reduce Function, AggregateFunction
    • 全窗口函数
      • 先把窗口所有数据收集起来,等到计算的时候会遍历所有数据
      • ProcessWindowFunction, WIndowFunction
增量聚合函数
public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        // socket 文本流
        DataStream inputStream = env.socketTextStream("localhost", 7777);

        // 转换成 Score 类型
        DataStream dataStream = inputStream.map(new MapFunction() {
            @Override
            public Score map(String value) throws Exception {
                String[] array = value.split(",");
                return new Score(array[0], array[1], Integer.parseInt(array[2]));
            }
        });

        // 增量窗口
        DataStream resultStream = dataStream.keyBy("course")
                .timeWindow(Time.seconds(15))
                .aggregate(new AggregateFunction() {
                    @Override
                    public Integer createAccumulator() {
                        return 0;
                    }

                    @Override
                    public Integer add(Score score, Integer count) {
                        System.out.println("add");
                        return count + 1;
                    }

                    @Override
                    public Integer getResult(Integer count) {
                        return count;
                    }

                    @Override
                    public Integer merge(Integer a, Integer b) {
                        return a + b;
                    }
                });
        resultStream.print("resultStream");
        env.execute();
    }


       上述代码的作用,就是开启一个滚动时间窗口,时间间隔为 15 秒,每隔 15 秒,对数据做一次增量聚合操作,统计各科目出现的次数。


       我们每 15 秒,输入红框内的数据,可以猜测一下结果。

       每输入一条数据,都会进行一次 add,当到达15秒后,将聚合结果输出。

全量窗口函数
public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        // socket 文本流
        DataStream inputStream = env.socketTextStream("localhost", 7777);

        // 转换成 Score 类型
        DataStream dataStream = inputStream.map(new MapFunction() {
            @Override
            public Score map(String value) throws Exception {
                String[] array = value.split(",");
                return new Score(array[0], array[1], Integer.parseInt(array[2]));
            }
        });

        // 全窗口函数
        DataStream> resultStream = dataStream.keyBy("course")
                .timeWindow(Time.seconds(15))
                .apply(new WindowFunction, Tuple, TimeWindow>() {
                    @Override
                    public void apply(Tuple tuple, TimeWindow window, Iterable input, Collector> out) throws Exception {
                        String course = tuple.getField(0);
                        Long windowEnd = window.getEnd();
                        Integer count = IteratorUtils.toList(input.iterator()).size();
                        out.collect(new Tuple3(course, windowEnd, count));
                    }
                });
        resultStream.print("resultStream2");

        env.execute();
    }

       我们同样按照上面的方式进行数据的输出,观察输出。


       可以看到,全量实在 15 秒到达时,统一做了聚合计算。

计数窗口

       创建一个滚动计数窗口,当各科目个数达到 3 时,则计算平均值。

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        // socket 文本流
        DataStream inputStream = env.socketTextStream("localhost", 7777);

        // 转换成Score类型
        DataStream dataStream = inputStream.map(new MapFunction() {
            public Score map(String value) throws Exception {
                String[] array = value.split(",");
                return new Score(array[0], array[1], Integer.parseInt(array[2]));
            }
        });

        // 开窗测试
        // 计数窗口
        DataStream resStream = dataStream.keyBy("course")
                .countWindow(3)
                .aggregate(new AggregateFunction, Double>() {

                    @Override
                    public Tuple2 createAccumulator() {
                        return new Tuple2<>(0.0, 0);
                    }

                    @Override
                    public Tuple2 add(Score score, Tuple2 accumulator) {
                        return new Tuple2<>(accumulator.f0 + score.getScore(), accumulator.f1 + 1);
                    }

                    @Override
                    public Double getResult(Tuple2 accumulator) {
                        return accumulator.f0 / accumulator.f1;
                    }

                    @Override
                    public Tuple2 merge(Tuple2 a, Tuple2 b) {
                        return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
                    }
                });

        resStream.print();
        
        env.execute();
    }

       我们首先输入如下三个成绩,可以看到输出为 1.5 (1 + 2 + 3)/3 = 1.5,这是因为我们计数窗口的 size 为 3,所以在接受到 3 个数字后,便会进行输出。



       接着再输出一个成绩,此时并没有输出,是因为语文重新开始计数,个数不足 3 个。

       接着再输出三个数学成绩,可以看到已经计算出了对应的平均值。(5 + 6 +7) = 6.0

       上面就是滚动计数窗口的用法,还是很容易理解的。

       接下来的话,我们创建一个滑动计数窗口,每两个元素,计算一次最近 3 个元素的值。

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // socket 文本流
        DataStream inputStream = env.socketTextStream("localhost", 7777);

        // 转换成Score类型
        DataStream dataStream = inputStream.map(new MapFunction() {
            public Score map(String value) throws Exception {
                String[] array = value.split(",");
                return new Score(array[0], array[1], Integer.parseInt(array[2]));
            }
        });

        // 计数窗口
        DataStream resStream = dataStream.keyBy("course")
                .countWindow(3, 2)
                .aggregate(new AggregateFunction, Double>() {

                    @Override
                    public Tuple2 createAccumulator() {
                        return new Tuple2<>(0.0, 0);
                    }

                    @Override
                    public Tuple2 add(Score score, Tuple2 accumulator) {
                        return new Tuple2<>(accumulator.f0 + score.getScore(), accumulator.f1 + 1);
                    }

                    @Override
                    public Double getResult(Tuple2 accumulator) {
                        return accumulator.f0 / accumulator.f1;
                    }

                    @Override
                    public Tuple2 merge(Tuple2 a, Tuple2 b) {
                        return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
                    }
                });

        resStream.print();

        env.execute();
    }

       我们输入两个成绩,可以看到输出 1.5。这是因为每两个元素,计算最近 3 个元素的平均值,但是由于并没有第三个元素,所以输出(1 + 2) / 2 = 1.5


       此时,在输入一个元素,可以看到并没有输出,因为重新开始计数,个数并不足 2 个。

       接着在输入一个元素,次数输出 3 是因为(2 + 3 + 4) / 3 = 3

       通过上述两个案例,想必可以理解滚动与滑动的区别。

会话窗口

       会话窗口在 WaterMark 之后进行补充。

3. 其他可选 API
  • .trigger() ---- 触发器
  • .evictor() ---- 移除器
  • .allowedLateness() ---- 允许处理迟到的数据
  • .sideOutputLateData() ---- 将迟到的数据放入侧输出流
  • .getSideOutput() — 获取侧输出流
        OutputTag outputTag = new OutputTag("late") {};
        SingleOutputStreamOperator sumStream = dataStream.keyBy("course")
                .timeWindow(Time.seconds(15))
//                .trigger()
//                .evictor()
                .allowedLateness(Time.minutes(1))
                .sideOutputLateData(outputTag)
                .sum("score");

        sumStream.getSideOutput(outputTag).print("late");
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/612810.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号