栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Flink流处理引擎系统学习(十二)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Flink流处理引擎系统学习(十二)

前言

这期分享windos的理解,只有这个理解清楚了,才能更好的根据场景选择合适的开窗处理。


一、window的基本概念 1.window是什么

2.window的分类


PS:
按key分组了用window构建多个window,未分组用windowAll(API后缀都带All)
区别示例:

3.window的生命周期

4.Window Assinger

5.Window Assinger分类(window小分类)


翻滚窗口

翻滚窗口的使用

滑动窗口

滑动窗口的使用

session窗口

session窗口的使用

PS:
sessionWindow只能基于时间
global窗口

6.window盘点


示例比较,注意体会各个window的区别

7.预定义的keyed window


预定义的这些window可以替换.window()

8.预定义的Non-Keyed window

二、窗口函数


PS
windowFunction/AllWindowFunction是早期版本一致遗留下来的,现在被ProcessWindowFunction/ProcessAllWindowFunction替换

1.ReduceFunction

2.AggregateFunction

3.FoldFunction


PS
已过时,官方已经不推荐用了

3.WindowFunction/AllWindowFunction


PS
我目前用的1.14.4已经标记过时了,官方也已经不推荐

4.ProcessWindowFunction/ProcessAllWindowFunction


这是新一代的窗口函数

4.新一代窗口函数混搭


三、触发器与驱逐器 1.什么是触发器

2.触发和清除

3.默认触发器

4.内置和自定义触发器

5.驱逐器的作用

6.内置驱逐器


四、延迟处理及窗口计算结果的使用 1.如何允许延迟

2.延迟数据的获取

3.晚点元素注意

4.window result的使用

5.水位线与窗口的交互

6.窗口估算注意事项


五、窗口函数示例 1.AggregateFunction
package spendreport.window;

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class TestAggFunctionOnWindow {

  private static final Tuple3[] ENGLISH_TRANSCRIPT = new Tuple3[]{
      Tuple3.of("class1", "张三", 100D),
      Tuple3.of("class1", "李四", 78D),
      Tuple3.of("class1", "王五", 99D),
      Tuple3.of("class2", "赵六", 81D),
      Tuple3.of("class2", "钱七", 59D),
      Tuple3.of("class2", "马二", 97D),
  };

  public static void main(String[] args) throws Exception {
    //获取运行环境
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream> input = env.fromElements(ENGLISH_TRANSCRIPT);

    //求各班级英语平均分
    DataStream avgScore = input.keyBy(
        new KeySelector, String>() {
          @Override
          public String getKey(Tuple3 value)
              throws Exception {
            return value.f0;
          }

        }).countWindow(2).aggregate(new AverageAggregate());

    //打印统计结果
    avgScore.print();

    //执行
    env.execute();

  }


  
  private static class AverageAggregate implements
      AggregateFunction, Tuple2, Double> {

    
    @Override
    public Tuple2 createAccumulator() {
      return new Tuple2<>(0D, 0L);
    }

    
    @Override
    public Tuple2 add(Tuple3 value,
        Tuple2 accumulator) {
      //来一个计算一下sum和count保存中间结果到累加器
      return new Tuple2<>(accumulator.f0 + value.f2, accumulator.f1 + 1L);
    }

    
    @Override
    public Double getResult(Tuple2 accumulator) {
      return accumulator.f0 / accumulator.f1;
    }

    
    @Override
    public Tuple2 merge(Tuple2 acc1,
        Tuple2 acc2) {
      return new Tuple2<>(acc1.f0 + acc2.f0, acc1.f1 + acc2.f1);
    }
  }
}

2.ReduceFunction
package spendreport.window;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class TestReduceFunctionOnWindow {

  private static final Tuple3[] ENGLISH_TRANSCRIPT = new Tuple3[]{
      Tuple3.of("class1", "张三", 100),
      Tuple3.of("class1", "李四", 78),
      Tuple3.of("class1", "王五", 99),
      Tuple3.of("class2", "赵六", 81),
      Tuple3.of("class2", "钱七", 59),
      Tuple3.of("class2", "马二", 97),
  };

  public static void main(String[] args) throws Exception {
    //获取运行环境
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream> input = env.fromElements(ENGLISH_TRANSCRIPT);

    //求各班级英语总分
    //countWindow(2)满2个才计算
    DataStream> totalPoints = input.keyBy(
        new KeySelector, String>() {
          @Override
          public String getKey(Tuple3 value)
              throws Exception {
            return value.f0;
          }

        }).countWindow(2).reduce(
        new ReduceFunction>() {
          @Override
          public Tuple3 reduce(
              Tuple3 v1,
              Tuple3 v2) throws Exception {
            return new Tuple3<>(v1.f0, v1.f1, v1.f2 + v2.f2);
          }
        });
    //打印统计结果
    totalPoints.print();

    //执行
    env.execute();

  }

}

3.ProcessWindowFunction
package spendreport.window;


import java.util.Iterator;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;


public class TestProcessWinFunctionOnWindow {


  private static final Tuple3[] ENGLISH_TRANSCRIPT = new Tuple3[]{
      Tuple3.of("class1", "张三", 100D),
      Tuple3.of("class1", "李四", 78D),
      Tuple3.of("class1", "王五", 99D),
      Tuple3.of("class2", "赵六", 81D),
      Tuple3.of("class2", "钱七", 59D),
      Tuple3.of("class2", "马二", 97D),
  };

  public static void main(String[] args) throws Exception {
    //获取运行环境
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream> input = env.fromElements(ENGLISH_TRANSCRIPT);

    //求各班级英语平均分
    DataStream avgScore = input.keyBy(
        new KeySelector, String>() {
          @Override
          public String getKey(Tuple3 value)
              throws Exception {
            return value.f0;
          }
        }).countWindow(2).process(new MyProcessWindowFunction());

    //打印统计结果
    avgScore.print();

    //执行
    env.execute();

  }

  public static class MyProcessWindowFunction extends
      ProcessWindowFunction, Double, String, GlobalWindow> {

    @Override
    public void process(String tuple,
        Context context,
        Iterable> iterable, Collector collector)
        throws Exception {
      //拿到所有数据,最后才计算
      Double sum = 0D;
      Long count = 0L;
      Iterator> it = iterable.iterator();
      while (it.hasNext()) {
        Tuple3 tp = it.next();
        sum += tp.f2;
        count++;
      }
      Double outScore = sum / count;
      collector.collect(outScore);
    }


  }
}

PS
这里例子MyProcessWindowFunction 的第3个参数KEY要与keyBy()的KeySelector返回return的一致。

总结


窗口函数多种写法,特别灵活,在业务场景中使用,先别慌写,理清楚。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/874535.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号