栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

彻底搞懂 Flink Watermark 和 Window

彻底搞懂 Flink Watermark 和 Window

彻底搞懂 Fink Watermark 和 Window
    • 1.有界无序水印和 lambda 函数作为时间戳分配器
    • 2.为时间戳单调递增的情况创建水印策略
    • 3.空闲分区数据流
    • 4.调用 watermark 生成生成器
    • 5.水位线的作用
    • 6.水位线的原理
    • 7.窗口 trigger
    • 8.定时器 timer
    • 9.自定义 WatermarkStrategy
    • 10.窗口生命周期
    • 11.GlobalWindow

WatermarkStrategy 定义了如何在流源中生成 Watermark 。 WatermarkStrategy 是用于构建水印生成器 (WatermarkGenerator) 和事件时间戳分配器( TimestampAssigner) 的工厂。通常不会自己实现这个接口,而是使用静态方法获取

1.有界无序水印和 lambda 函数作为时间戳分配器
WatermarkStrategy
        .>forBoundedOutOfOrderness(Duration.ofSeconds(20))
  			// TimestampAssigner 是可选的,kafka 从元数据获取
        .withTimestampAssigner((event, timestamp) -> event.f0);
2.为时间戳单调递增的情况创建水印策略
WatermarkStrategy
				// 此方法的实现是 有界无序 的一种特烈,乱序时间为 0;
        .>forMonotonousTimestamps()
  			// TimestampAssigner 是可选的,kafka 从元数据获取
        .withTimestampAssigner((event, timestamp) -> event.f0);
WatermarkStrategy.forMonotonousTimestamps()
3.空闲分区数据流

如果输入分区/分片之一在一段时间内不携带事件,则这意味着WatermarkGenerator也没有获得任何新信息作为水印的基础。我们称之为空闲输入空闲源。这是一个问题,因为您的某些分区可能仍然携带事件。在这种情况下,水印将被阻止,因为它被计算为所有不同并行水印的最小值。

WatermarkStrategy
        .>forBoundedOutOfOrderness(Duration.ofSeconds(20))
  			// 装饰模式,在上面 WatermarkStrategy 基础上增加空闲分区的策略
        .withIdleness(Duration.ofMinutes(1));
4.调用 watermark 生成生成器

WatermarkGenerator接口有两个未实现方法

public interface WatermarkGenerator {

 
 void onEvent(T event, long eventTimestamp, WatermarkOutput output);

 
 void onPeriodicEmit(WatermarkOutput output);
}
// 每隔 10 秒钟,Flink 会调用 WatermarkGenerator#onPeriodicEmit() 方法,产生一个水位线。
env.getConfig().setAutoWatermarkInterval(10000);
5.水位线的作用

总结:1.触发事件时间下的定时器;2.通过定时器触发窗口的关闭。

水位线用来平衡延迟性和正确性。水位线告诉我们,在触发计算(例如关闭窗口并触发窗口计算)之前,我们需要等待事件多长时间。基于事件时间的操作符根据水位线来衡量系统的逻辑时间的进度。

设定水位线通常需要用到领域知识。举例来说,如果知道事件的迟到时间不会超过5秒,就可以将水位线标记时间设为收到的最大时间戳减去5秒。另一种做法是,采用一个Flink作业监控事件流,学习事件的迟到规律,并以此构建水位线生成模型

6.水位线的原理

水印生成器 WatermarkGenerator 内部会记录截止当前事件的 maxTimestamp(不是窗口内的最大时间戳)。

// 周期性调用的方法 onPeriodicEmit() 内部会发出 watermark
// maxOutOfOrderness 表示设置的乱序时间(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(1))
output.emitWatermark(new Watermark( maxTimestamp - maxOutOfOrderness - 1));

当发出的 watermark 等于 window.getEnd() - 1时,触发 window 的 trigger 进行计算。

事件时间自增的数据流 maxOutOfOrderness 等于 0,乱序数据流是用户设置的。

7.窗口 trigger

trigger 是通过定时器实现的,处理时间的定时器触发是系统时间;事件时间的定时器触发是通过 watermark 触发的,当 watermark 超过定时器时间时,触发定时器执行 trigger 的,处理 window operator 。

EventTimeTrigger 判断触发的逻辑如下,当 window.maxTimestamp() <= ctx.getCurrentWatermark()时触发窗口计算

  • window.maxTimestamp() = window.getEnd() - 1

  • ctx.getCurrentWatermark() = 当前元素的最大时间戳 - maxOutOfOrderness - 1

也就是说:当前元素的最大时间戳 = window.getEnd() 时触发窗口计算,window 中的元素包含开始不包含结尾

@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
  System.out.println("element = " + element);
  if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
    // if the watermark is already past the window fire immediately
    System.out.println(TriggerResult.FIRE);
    return TriggerResult.FIRE;
  } else {
    ctx.registerEventTimeTimer(window.maxTimestamp());
    System.out.println(TriggerResult.CONTINUE);
    return TriggerResult.CONTINUE;
  }
}
8.定时器 timer

当定时器 timer 触发时,执行回调函数onTimer()。processElement()方法和onTimer()方法是同步(不是异步)方法,这样可以避免并发访问和操作状态。

针对每一个 key和 timestamp,只能注册一个定期器。

KeyedProcessFunction 默认将所有定时器的时间戳放在一个优先队列中。在Flink做检查点操作时,定时器也会被保存到状态后端中。

9.自定义 WatermarkStrategy
WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ofMillis(1)).withTimestampAssigner((event, timestamp) -> event.f1));

自定义实现下面的例子,乱序时间处理,等同于

package com.zhangjian.learning;

import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;


public class WatermarkExample {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    // 每隔 10 秒钟,Flink 会调用 WatermarkGenerator#onPeriodicEmit() 方法,产生一个水位线
    env.getConfig().setAutoWatermarkInterval(10000);

    // 输入元素格式如 (a,1)(a,4),(a,5)
    DataStreamSource source = env.socketTextStream("localhost", 9999);

    // 转成元组
    SingleOutputStreamOperator> map = source.map(s -> {
      String[] split = s.split("\s");
      return Tuple2.of(split[0], Long.parseLong(split[1]));
    }).returns(Types.TUPLE(Types.STRING, Types.LONG));

    // 分配水位线
    SingleOutputStreamOperator> watermarks =
        map.assignTimestampsAndWatermarks(
            new WatermarkStrategy>() {

              
              @Override
              public WatermarkGenerator> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                // 此方法的参数是延迟生成 watermark,时间戳为 6 的事件事件生成 5 的 watermark,参数为 0 代表允许 1 毫秒延迟
                return new BoundedOutOfOrdernessGenerator(Duration.ofMillis(1));
              }

              
              @Override
              public TimestampAssigner> createTimestampAssigner(
                  TimestampAssignerSupplier.Context context) {
                return (element, recordTimestamp) -> element.f1;
              }
            });

            


    KeyedStream, String> keyedStream = watermarks.keyBy(s -> s.f0);

    WindowedStream, String, TimeWindow> window = keyedStream
        .window(TumblingEventTimeWindows.of(Time.milliseconds(5)));

    // 增量聚合窗口函数
    SingleOutputStreamOperator> reduce = window.reduce((a, b) -> Tuple2.of(a.f0, a.f1 + b.f1));

    SingleOutputStreamOperator process = window.process(new MyProcessWindowFunction());

    process.print();

    env.execute();
  }

  
  static class BoundedOutOfOrdernessGenerator implements WatermarkGenerator> {

    private long maxOutOfOrderness;

    private long maxTimestamp;

    public BoundedOutOfOrdernessGenerator(Duration maxOutOfOrdernes){
      this.maxOutOfOrderness = maxOutOfOrdernes.toMillis();
      this.maxTimestamp = Long.MIN_VALUE + maxOutOfOrdernes.toMillis() + 1;
    }

    
    @Override
    public void onEvent(Tuple2 event, long eventTimestamp, WatermarkOutput output) {
      
      maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
      System.out.println("maxTimestamp = " + maxTimestamp);
    }

    
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
      // System.out.println(LocalTime.now() + ", output = " + new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
      // 发出的 watermark = 当前最大时间戳 - 最大乱序时间
      output.emitWatermark(new Watermark( maxTimestamp - maxOutOfOrderness - 1));
    }

  }

  
  static class MyProcessWindowFunction extends ProcessWindowFunction, String, String, TimeWindow> {

    @Override
    public void process(String key, Context context, Iterable> input, Collector out) {
      long count = 0;
      StringBuilder sb = new StringBuilder();
      for (Tuple2 in : input) {
        sb.append(in);
        count++;
      }
      String result = String.format("key=%s, window=%s, count=%d, maxTimestamp=%d, currentWatermark=%d, 元素=%s",
          key,
          context.window(),
          count,
          context.window().maxTimestamp(), context.currentWatermark(), sb.toString());
      out.collect(result);
    }
  }
}

测试1:设置允许的乱序时间是0(单调递增事件)

输入数据nc -lk 9999

# 输入
a 1
a 4
a 5
# 触发窗口关闭,输出
maxTimestamp = 1
maxTimestamp = 4
maxTimestamp = 5
key=a, window=TimeWindow{start=0, end=5}, count=2, maxTimestamp=4, currentWatermark=4, 元素=(a,1)(a,4)
# 结论:currentWatermark == context.window().maxTimestamp() 时触发窗口计算;水位线生成器会在提出水位线函数的返回值上减 1,并不是直接使用

测试2:设置允许的乱序时间是1(单调递增事件)(copy 上面的代码)

输入数据nc -lk 9999

# 输入
a 1
a 4
a 5
a 6
# 触发窗口关闭,输出
maxTimestamp = 1
maxTimestamp = 4
maxTimestamp = 5
maxTimestamp = 6
key=a, window=TimeWindow{start=0, end=5}, count=2, maxTimestamp=4, currentWatermark=4, 元素=(a,1)(a,4)
# 结论:修改乱序时间本质上修改的是 watermark 的发送,比原本发送的小。
10.窗口生命周期

当一个事件来到窗口操作符,首先将会传给 WindowAssigner 来处理。WindowAssigner 决定了事件将被分配到哪些窗口。如果窗口不存在,WindowAssigner 将会创建一个新的窗口。所以不存在没有元素的窗口。

如果一个 window operator 接受了一个增量聚合函数作为参数,例如 ReduceFunction 或者 AggregateFunction,新到的元素将会立即被聚合,而聚合结果 result 将存储在 window 中。

如果 window operator 没有使用增量聚合函数,那么新元素将被添加到ListState中,ListState中保存了所有分配给窗口的元素。

新元素被添加到窗口时,这个新元素同时也被传给了 window 的 trigger。trigger 定义了 window 何时准备好求值,何时 window 被清空。trigger可以基于window被分配的元素和注册的定时器来对窗口的所有元素求值或者在特定事件清空window中所有的元素。

evictor是一个可选的组件,可以被注入到ProcessWindowFunction之前或者之后调用。evictor可以清除掉window中收集的元素。由于evictor需要迭代所有的元素,所以evictor只能使用在没有增量聚合函数作为参数的情况下。

stream
  .keyBy(...)
  .window(...)
 [.trigger(...)]
 [.evictor(...)]
  .reduce/aggregate/process(...)

注意:每个WindowAssigner都有一个默认的trigger。

11.GlobalWindow

GlobalWindow继承了Window,它的maxTimestamp方法与TimeWindow不同,TimeWindow有start和end属性,其maxTimestamp方法返回的是end-1;而GlobalWindow的maxTimestamp方法返回的是Long.MAX_VALUE;

全局窗口分配器将具有相同键的所有元素分配给同一个全局窗口。 此窗口方案仅在您还指定自定义触发器时才有用。

input
    .keyBy()
    .window(GlobalWindows.create())
    .();
// GlobalWindow 的 maxTimestamp,永远不会关闭,所以必须自自定触发器
public class GlobalWindow extends Window {
		@Override
    public long maxTimestamp() {
        return Long.MAX_VALUE;
    }
}

// TimeWindow 的 maxTimestamp,等于 windowEnd - 1
public class TimeWindow extends Window {

	@Override
	public long maxTimestamp() {
		return end - 1;
	}
}


public class GlobalWindows extends WindowAssigner {
  // 默认触发器:永远不会触发的触发器
  @Override
	public Trigger getDefaultTrigger(StreamExecutionEnvironment env) {
		return new NeverTrigger();
	}
  
  // 非事件时间
  @Override
	public boolean isEventTime() {
		return false;
	}
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/676136.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号