org.apache.flink.streaming.api.functions 核心在streaming包内
一、DataStream 入口
1.AssignerWithPeriodicWatermarks
周期生产水印,在原有流基础上,包一层定时生产水印的程序
public SingleOutputStreamOperatorassignTimestampsAndWatermarks( AssignerWithPeriodicWatermarks timestampAndWatermarkAssigner) { TimestampsAndPeriodicWatermarksOperator operator = new TimestampsAndPeriodicWatermarksOperator<> (timestampAndWatermarkAssigner); return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator) .setParallelism(getTransformation().getParallelism()); }
2.AssignerWithPunctuatedWatermarks
每一个元素处理时,都要判断是否要生产水印,即是否生产水印取决于处理的数据。
比如遇到xx结尾的元素要生产一个水印
public SingleOutputStreamOperatorassignTimestampsAndWatermarks( AssignerWithPunctuatedWatermarks timestampAndWatermarkAssigner) { TimestampsAndPunctuatedWatermarksOperator operator = new TimestampsAndPunctuatedWatermarksOperator<> (clean(timestampAndWatermarkAssigner)); return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator) .setParallelism(getTransformation().getParallelism()); }
二、原理
2.1 周期性的生产water水印 --- 核心方法 -- 继承AbstractUdfStreamOperator
public class TimestampsAndPeriodicWatermarksOperatorextends AbstractUdfStreamOperator > implements OneInputStreamOperator , ProcessingTimeCallback { transient long watermarkInterval;//发送水印时间戳间隔 transient long currentWatermark;//最后一次发送水印的时间戳 public void open() throws Exception watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();//初始化生产水印周期 //设置周期性定时任务,定时产生水印 if (watermarkInterval > 0) { long now = getProcessingTimeService().getCurrentProcessingTime(); getProcessingTimeService().registerTimer(now + watermarkInterval, this); } void processElement(StreamRecord element) //正常处理数据流的数据 //function是AssignerWithPeriodicWatermarks final long newTimestamp = userFunction.extractTimestamp() output.collect(element.replace(element.getValue(), newTimestamp));//产生新的StreamRecord public void onProcessingTime(long timestamp) { //触发定时任务--该输出水印了 //function是AssignerWithPeriodicWatermarks Watermark newWatermark = userFunction.getCurrentWatermark(); output.emitWatermark(newWatermark);//发送水印 //再次设置定时任务,定时产生水印 long now = getProcessingTimeService().getCurrentProcessingTime(); getProcessingTimeService().registerTimer(now + watermarkInterval, this); }
三、周期生产水印实现
class BoundedOutOfOrdernessTimestampExtractorimplements AssignerWithPeriodicWatermarks //构造函数,设置延迟时间 BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) //提取元素时间戳,每一个元素流过,都会提取时间戳,并且设置currentMaxTimestamp abstract long extractTimestamp(T element); 生产Watermark 当触发该函数的时候,就会设置Watermark Watermark getCurrentWatermark() long potentialWM = currentMaxTimestamp - maxOutOfOrderness; //先调慢时间 return new Watermark(lastEmittedWatermark);



