栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink - Scala/Java trigger 简介与使用

Flink - Scala/Java trigger 简介与使用

一.引言

Flink 使用 windowAll 生成 AllwindowedStream 后调用 Trigger 执行窗口触发逻辑,下面对 Trigger 触发器做一个基本的了解。

二.Trigger 简介

Trigger 翻译为触发,扳机,其作用为在一定条件下触发窗口进行计算,如果是内部 operator 则执行对应 operator,如果自定义实现了 ProcessAllWindowFunction,则触发自定义执行逻辑。触发器决定窗口(由窗口赋值器形成)何时准备由窗口函数处理。每个WindowAssigner都有一个默认的触发器。如果默认触发器不符合您的需要,您可以使用trigger(…)指定一个自定义触发器。Trigger 多见于将大窗口的数据实时输出,例如针对 100s 的窗口,每 10s 触发一次数据执行窗口逻辑。

1.触发器方法

· onElement

public abstract TriggerResult onElement(T var1, long var2, W var4, Trigger.TriggerContext var5) throws Exception;

对于添加到窗口中的每个元素,都会调用onElement()方法。以最基础的 CountTrigger 为例,每当元素到达,对应 Trigger 类都会进行计数器累加和判断,如果到达数量累加至对应 count,则进行触发执行一次窗口逻辑。

· onEventTime

public abstract TriggerResult onEventTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception;

当注册的事件时间定时器触发时,调用onEventTime()方法。一般触发后会重置执行时间或注册下一次执行的 eventTime。

· onProcessingTime

public abstract TriggerResult onProcessingTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception;

当注册的处理时间计时器触发时,将调用onProcessingTime()方法。基本处理方式同上。

· onMerge

public void onMerge(W window, Trigger.onMergeContext ctx) throws Exception {
    throw new UnsupportedOperationException("This trigger does not support merging.");
}

onMerge()方法与有状态触发器相关,当它们对应的窗口合并时,可以合并两个触发器的状态,例如使用会话窗口时。两个窗口合并时,合并二者的状态值,可以看做是 reduce 函数,将 TimeWindow1 和 Timewindow2 的 state 变量合二为一。

· clear

public abstract void clear(W var1, Trigger.TriggerContext var2) throws Exception;

最后,clear()方法执行删除相应窗口时所需的任何操作。以最基础的 CountTrigger 为例,clear 会清空计数器状态,即重新置为0。

· canMerge

    public boolean canMerge() {
        return false;
    }

trigger 是否支持 onMerge 方法合并二者状态。

2.触发器状态

onElement,onProcessTime,onEventTime 三个方法都会返回一个 TriggerResult,该类为枚举类,对应了执行方法后返回的窗口操作。

· TriggerResult.CONTINUE - 跳过,什么都不做

· TriggerResult.FIRE - 触发窗口计算

· TriggerResult.PURGE - 清除窗口元素

· TriggerResult.FIRE_AND_PURGE - 触发窗口操作,随后清空窗口元素

以 CountTrigger 为例,每攒够 Count 个元素,都会返回 TriggerResult.FIRE 执行窗口逻辑,而未够 Count 数量时则会 TiggerResult.CONTINUE。

3.Flink 自带 Trigger

 Flink org.apache.flink.streaming.api.windowing.triggers 类下自带如下窗口触发器,如果需要自定义触发器则只需实现 Trigger 类的触发器方法,例如可以结合 CountTrigger 和 ProcessingTimeTrigger 实现基于处理条数和处理时间的双重触发器 CountAndProcessingTime Trigger。

ContinuousEventTimeTrigger连续事件时间触发器
ContinuousProcessingTimeTrigger连续处理时间触发器
CountTrigger计数触发器
DeltaTrigger阈值触发器
EventTimeTrigger事件时间触发器
ProcessingTimeoutTrigger处理时间超时触发器
ProcessingTimeTrigger处理时间触发器
PurgingTrigger强制 PURGE 触发器

三.API 示例 1.Scala 示例

下述示例将原始 DataStream 按10s进行滚动窗口聚合,其中 Trigger 设置为 CountTrigger,每满足 30 个元素进行一次触发。

    val allwindowedStream = dataStream
      .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))
      .trigger(CountTrigger.of[TimeWindow](30L))
      .process(new ProcessAllWindowFunction[String, String, TimeWindow] {
        override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
          val info = elements.toArray.mkString(",")
          out.collect(info)
        }
      }).setParallelism(1)
    allwindowedStream.print()

Tips:

Trigger 参数需要指定 implict T 即 of 后面的 [TimeWindow],这里如果添加对应数据的输出类型 T,则会报错 Required: Trigger[_ >: String,_ >: TimeWindow] :

Required: Trigger[_ >: String,_ >: TimeWindow]
Found: ContinuousProcessingTimeTrigger[String]

2.Java 示例

下述示例针对原始 DataStream 生成 10s 的滚动窗口,并且按连续处理时间每 5s 触发一次窗口的处理逻辑即 ProcessFunction。

       dataStream 
           .setParallelism(processParallel)
           .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))
           .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
           .process(new ProcessFunction())
           .addSink(outputSink)
           .setParallelism(processParallel)
           .print()

四.总结与注意事项 1.默认触发器

基于 EventTime 的窗口默认使用 EventTimeTrigger,基于 ProcessTime 的窗口默认使用 ProcessingTimeTrigger

2.GlobalWindow

GlobalWindow的默认触发器是永不触发的NeverTrigger。因此,在使用GlobalWindow时,您总是必须定义一个自定义触发器。

3.窗口触发逻辑

一旦触发器确定窗口已经准备好进行处理,它就会触发,也就是说,它返回FIRE或FIRE_AND_PURGE。这是窗口操作符发出当前窗口结果的信号。给定一个带有ProcessWindowFunction的窗口,所有元素都被传递给ProcessWindowFunction。带有ReduceFunction或AggregateFunction的窗口只是发送它们的聚合结果。

4.FIRE AND PURGE

FIRE 触发窗口不清除窗口元素,PURGE 触发窗口但会清除窗口元素,进行自定义编辑时需要注意,避免窗口触发后损失一批数据,其次 PURGE 只会清除窗口的元素,窗口一些自定义的元数据和基本属性并不会清除。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/775256.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号