栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

flink Watermark 水印原理

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

flink Watermark 水印原理

org.apache.flink.streaming.api.functions 核心在streaming包内

一、DataStream 入口

1.AssignerWithPeriodicWatermarks

周期生产水印,在原有流基础上,包一层定时生产水印的程序

	public SingleOutputStreamOperator assignTimestampsAndWatermarks(
			AssignerWithPeriodicWatermarks timestampAndWatermarkAssigner) {

		TimestampsAndPeriodicWatermarksOperator operator =
				new TimestampsAndPeriodicWatermarksOperator<>        
                (timestampAndWatermarkAssigner);

		return transform("Timestamps/Watermarks",     
                 getTransformation().getOutputType(), operator)
				.setParallelism(getTransformation().getParallelism());
	}

2.AssignerWithPunctuatedWatermarks

每一个元素处理时,都要判断是否要生产水印,即是否生产水印取决于处理的数据。

比如遇到xx结尾的元素要生产一个水印

	public SingleOutputStreamOperator assignTimestampsAndWatermarks(
			AssignerWithPunctuatedWatermarks timestampAndWatermarkAssigner) {

    TimestampsAndPunctuatedWatermarksOperator operator =
				new TimestampsAndPunctuatedWatermarksOperator<>    
                (clean(timestampAndWatermarkAssigner));

    return transform("Timestamps/Watermarks",
         getTransformation().getOutputType(), operator)
        .setParallelism(getTransformation().getParallelism());
	}

二、原理

2.1 周期性的生产water水印 --- 核心方法 -- 继承AbstractUdfStreamOperator

public class TimestampsAndPeriodicWatermarksOperator
		extends AbstractUdfStreamOperator>
		implements OneInputStreamOperator, ProcessingTimeCallback {

transient long watermarkInterval;//发送水印时间戳间隔
transient long currentWatermark;//最后一次发送水印的时间戳

public void open() throws Exception
    watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();//初始化生产水印周期
    //设置周期性定时任务,定时产生水印
    if (watermarkInterval > 0) {
	    long now = getProcessingTimeService().getCurrentProcessingTime();
	    getProcessingTimeService().registerTimer(now + watermarkInterval, this);
    }

void processElement(StreamRecord element) //正常处理数据流的数据
		//function是AssignerWithPeriodicWatermarks
		final long newTimestamp = userFunction.extractTimestamp()
		output.collect(element.replace(element.getValue(), newTimestamp));//产生新的StreamRecord


public void onProcessingTime(long timestamp) { //触发定时任务--该输出水印了
		//function是AssignerWithPeriodicWatermarks
		Watermark newWatermark = userFunction.getCurrentWatermark();
        output.emitWatermark(newWatermark);//发送水印

        //再次设置定时任务,定时产生水印
		long now = getProcessingTimeService().getCurrentProcessingTime();
		getProcessingTimeService().registerTimer(now + watermarkInterval, this);
	}

三、周期生产水印实现

class BoundedOutOfOrdernessTimestampExtractor implements AssignerWithPeriodicWatermarks

    //构造函数,设置延迟时间
	BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness)

    //提取元素时间戳,每一个元素流过,都会提取时间戳,并且设置currentMaxTimestamp
	abstract long extractTimestamp(T element); 

    生产Watermark 当触发该函数的时候,就会设置Watermark
  Watermark getCurrentWatermark() 
    long potentialWM = currentMaxTimestamp - maxOutOfOrderness; //先调慢时间
		return new Watermark(lastEmittedWatermark);

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/785506.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号