栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink 窗口触发器

Flink 窗口触发器

窗口触发器 1. 为什么要有触发器
  • 决定何时触发窗口后续的逻辑执行。每个窗口都有一个默认的触发器,时间窗口默认watermark超过EndTime就触发计算
窗口类型触发器触发时机
EventTime(Tumblng/Sliding/Session)EventTimeTrigger一旦Watermark没过窗口的EndTime,该窗口触发
ProcessingTime(Tumblng/Sliding/Session)ProcessingTimeTrigger一旦系统时间没过窗口的EndTime,该窗口触发
GlobalWindowNeverTrigger永不触发
2. 触发器怎么使用
  • Flink中定义了Trigger抽象类,任何trigger必须继承Trigger类,Flink官方提供了几种常用的trigger实现,同时,用户可以根据需求自定义trigger。
public abstract class Trigger implements Serializable {

	private static final long serialVersionUID = -4104633972991191369L;

	
	public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;

   
	public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;

   
	public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;

	
	public boolean canMerge() {
		return false;
	}

	
	public void onMerge(W window, OnMergeContext ctx) throws Exception {
		throw new UnsupportedOperationException("This trigger does not support merging.");
	}


	public abstract void clear(W window, TriggerContext ctx) throws Exception;
	
}

  • 前三方法决定着如何通过返回一个TriggerResult枚举类来操作输入事件

CONTINUE:什么都不做
FIRE:触发计算
PURE:清除窗口的元素
FIRE_AND_PURE:触发计算和清除窗口元素

5. ProcessingTimeTrigger 源码分析
public class ProcessingTimeTrigger extends Trigger {
	private static final long serialVersionUID = 1L;

	private ProcessingTimeTrigger() {}



	@Override
	public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
		ctx.registerProcessingTimeTimer(window.maxTimestamp());
		return TriggerResult.CONTINUE;
	}


	@Override
	public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
		return TriggerResult.CONTINUE;
	}


	@Override
	public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
		return TriggerResult.FIRE;
	}

	@Override
	public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
		ctx.deleteProcessingTimeTimer(window.maxTimestamp());
	}

	@Override
	public boolean canMerge() {
		return true;
	}

	@Override
	public void onMerge(TimeWindow window,
			OnMergeContext ctx) {
		// only register a timer if the time is not yet past the end of the merged window
		// this is in line with the logic in onElement(). If the time is past the end of
		// the window onElement() will fire and setting a timer here would fire the window twice.
		long windowMaxTimestamp = window.maxTimestamp();
		if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
			ctx.registerProcessingTimeTimer(windowMaxTimestamp);
		}
	}

	@Override
	public String toString() {
		return "ProcessingTimeTrigger()";
	}

	
	public static ProcessingTimeTrigger create() {
		return new ProcessingTimeTrigger();
	}
}

  • 需要注意的是ProcessingTimeTrigger类只会在窗口的最终时间到达的时候触发窗口函数的计算,计算完成后并不会清除窗口中的数据,这些数据存储在内存中,除非调用PURGE或FIRE_AND_PURGE,否则数据将一直存在内存中。实际上,Flink中提供的Trigger类,除了PurgingTrigger类,其他的都不会对窗口中的数据进行清除。
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/676773.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号