栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink生成Watermark之assignTimestampsAndWatermarks

flink生成Watermark之assignTimestampsAndWatermarks

DataStreamSource对象上调用assignTimestampsAndWatermarks方法,自定义Timestamp提取规则和Watermark生成规则。在flink1.11版本之前,flink内置的Timestamp分配器有以下4种:

文章目录
  • 基于AssignerWithPeriodicWatermarks接口
  • 基于AssignerWithPunctuatedWatermarks接口
  • 基于AscendingTimestampExtractor抽象类
  • 基于BoundedOutOfOrdernessTimestampExtractor抽象类

基于AssignerWithPeriodicWatermarks接口

AssignerWithPeriodicWatermarks接口扩展自TimestampAssigner类,其中extractTimestamp方法定义抽取Timestamp,getCurrentWatermark方法定义Watermark生成规则,该接口会周期性进行调用。

//在assignTimestampsAndWatermarks中,通过AssignerWithPeriodicWatermarks抽取Timestamp和生成周期性水位线示例
public class Test{
    public static void main(String[] args) throws  Exception{
        //创建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置EventTime语义
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //设置周期性生成Watermark间隔(10毫秒)
        env.getConfig().setAutoWatermarkInterval(10L);
        //并行度1
        env.setParallelism(1);
        //演示数据
        DataStreamSource mySource = env.fromElements(
                new ClickEvent("user1", 1L, 1),
                new ClickEvent("user1", 2L, 2),
                new ClickEvent("user1", 3L, 3),
                new ClickEvent("user1", 4L, 4),
                new ClickEvent("user1", 5L, 5),
                new ClickEvent("user1", 6L, 6),
                new ClickEvent("user1", 7L, 7),
                new ClickEvent("user1", 8L, 8)
        );
        //AssignerWithPeriodicWatermarks周期性生成水位线
        SingleOutputStreamOperator streamTS = mySource.assignTimestampsAndWatermarks(
                new AssignerWithPeriodicWatermarks(){
                    private long maxTimestamp = 0L;
                    //延迟
                    private long delay = 0L;
                    @Override
                    //自定义Timestamp提取规则
                    public long extractTimestamp(ClickEvent event, long l) {
                        try {
                            //放慢处理速度,否则可能只会生成一条水位线
                            Thread.sleep(100L);
                        }
                        catch (Exception ex){
                        }
                        //比较当前事件时间和最大时间戳maxTimestamp(并更新)
                        maxTimestamp = Math.max(event.getDateTime(), maxTimestamp);
                        System.out.println("时间:"+event.getDateTime());
                        //提取时间戳
                        return event.getDateTime();
                    }
                    @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        //周期性生成watermark:10ms
                        System.out.println("水位线:"+(maxTimestamp - delay));
                        //生成水位线
                        return new Watermark(maxTimestamp - delay);
                    }
                });
        //结果打印
        streamTS.print();
        env.execute();
    }
}
基于AssignerWithPunctuatedWatermarks接口

AssignerWithPunctuatedWatermarks接口是一种非周期性生成Watermark的方法,根据事件数据上的特殊条件触发Watermark生成,调用checkAndGetNextWatermark方法生成Watermark,调用extractTimestamp方法抽取Timestamp

//在assignTimestampsAndWatermarks中,通过AssignerWithPunctuatedWatermarks抽取Timestamp和生成非周期性水位线示例
public class Test{
    public static void main(String[] args) throws  Exception{
        //创建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置EventTime语义
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //并行度1
        env.setParallelism(1);
        //演示数据
        DataStreamSource mySource = env.fromElements(
                new ClickEvent("user1", 1L, 1),
                new ClickEvent("user1", 2L, 2),
                new ClickEvent("user1", 3L, 3),
                new ClickEvent("user1", 4L, 4),
                new ClickEvent("user1", 5L, 5),
                new ClickEvent("user1", 6L, 6),
                new ClickEvent("user1", 7L, 7),
                new ClickEvent("user1", 8L, 8)
        );
        //AssignerWithPunctuatedWatermarks可定制规则来生成水位线
        SingleOutputStreamOperator streamTS = mySource.assignTimestampsAndWatermarks(
                new AssignerWithPunctuatedWatermarks() {
                    private long maxTimestamp = 0L;
                    //延迟
                    private long delay = 0L;
                    @Override
                    //自定义Timestamp提取规则
                    public long extractTimestamp(ClickEvent event, long l) {
                        try {
                            //放慢处理速度,否则可能只会生成一条水位线
                            Thread.sleep(100L);
                        } catch (Exception ex) {
                        }
                        //比较当前事件时间和最大时间戳maxTimestamp(并更新)
                        maxTimestamp = Math.max(event.getDateTime(), maxTimestamp);
                        System.out.println("时间:"+event.getDateTime());
                        //提取时间戳
                        return event.getDateTime();
                    }
                    @Nullable
                    @Override
                    //决定如何生成watermark
                    public Watermark checkAndGetNextWatermark(ClickEvent event,long extractedTimestamp) {
                        //Value为3的倍数时,生成watermark
                        if (event.getValue() % 3 == 0) {
                            System.out.println("水位线:" + (maxTimestamp - delay));
                            return new Watermark(maxTimestamp - delay);
                        } else {
                            //其他情况不返回水位线
                            return null;
                        }
                    }
                });
        //结果打印
        streamTS.print();
        env.execute();
    }
}
基于AscendingTimestampExtractor抽象类

AscendingTimestampExtractor是一种周期性生成Watermark的策略,它实现了AssignerWithPeriodicWatermarks接口。抽象类中定义了一个抽象方法extractAscendingTimestamp(T element),用来从事件数据中提取单调递增的时间戳。内部已经实现了extractTimestamp,getCurrentWatermark方法,只需要指定如何从事件中获取时间戳即可,水位线值就是用事件数据中最大的时间戳-1。
AscendingTimestampExtractor类部分核心代码:

	public abstract long extractAscendingTimestamp(T element);
	
	public AscendingTimestampExtractor withViolationHandler(MonotonyViolationHandler handler) {
		this.violationHandler = requireNonNull(handler);
		return this;
	}
	// ------------------------------------------------------------------------
	@Override
	public final long extractTimestamp(T element, long elementPrevTimestamp) {
		final long newTimestamp = extractAscendingTimestamp(element);
		if (newTimestamp >= this.currentTimestamp) {
			this.currentTimestamp = newTimestamp;
			return newTimestamp;
		} else {
			violationHandler.handleViolation(newTimestamp, this.currentTimestamp);
			return newTimestamp;
		}
	}
	@Override
	public final Watermark getCurrentWatermark() {
		return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);
	}

通过AscendingTimestampExtractor抽取Timestamp和生成周期性水位线示例:

//在assignTimestampsAndWatermarks中,通过AscendingTimestampExtractor抽取Timestamp和生成周期性水位线示例
public class Test{
    public static void main(String[] args) throws Exception{
        //创建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置EventTime语义
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //设置周期生成Watermark间隔(10毫秒)
        env.getConfig().setAutoWatermarkInterval(10L);
        //并行度1
        env.setParallelism(1);
        //演示数据
        DataStreamSource mySource = env.fromElements(
                new ClickEvent("user1", 1L, 1),
                new ClickEvent("user1", 2L, 2),
                new ClickEvent("user1", 3L, 3),
                new ClickEvent("user1", 4L, 4),
                new ClickEvent("user1", 5L, 5),
                new ClickEvent("user1", 6L, 6),
                new ClickEvent("user1", 7L, 7),
                new ClickEvent("user1", 8L, 8)
        );
        //AscendingTimestampExtractor 实现
        //AssignerWithPeriodicWatermarks周期性生成水位线
        SingleOutputStreamOperator streamTS = mySource.assignTimestampsAndWatermarks(
                new AscendingTimestampExtractor() {
                    @Override
                    public long extractAscendingTimestamp(ClickEvent event) {
                        try {
                            //放慢处理速度,否则可能只会生成一条水位线
                            Thread.sleep(100L);
                        } catch (Exception ex) {
                        }
                        System.out.println("getDateTime:" + event.getDateTime());
                        //提取时间戳
                        return event.getDateTime();
                    }
                });
        //结果打印
        streamTS.print();
        env.execute();
    }
}
基于BoundedOutOfOrdernessTimestampExtractor抽象类

BoundedOutOfOrdernessTimestampExtractor是一种周期性生成Watermark的策略,它实现了AssignerWithPeriodicWatermarks接口。抽象类中定义了一个抽象方法extractTimestamp(T element),用来从事件数据中提取时间戳。内部的extractTimestamp(T element, long previousElementTimestamp)方法中首先调用抽象方法extractTimestamp(T element)的实现方法,然后判断与当前最大的时间戳的大小,从而保证内部的时间戳currentMaxTimestamp是递增的。

初始化BoundedOutOfOrdernessTimestampExtractor对象时,需由外部传入一个最大延迟长度的参数maxOutOfOrderness,Time类型。内部会通过maxOutOfOrderness.toMilliseconds()转成Long类型来参与Watermark生成。

getCurrentWatermark()方法,用当前最大时间戳currentMaxTimestamp-maxOutOfOrderness生成Watermark
BoundedOutOfOrdernessTimestampExtractor类部分核心代码:

	public abstract long extractTimestamp(T element);
	@Override
	public final Watermark getCurrentWatermark() {
		// this guarantees that the watermark never goes backwards.
		long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
		if (potentialWM >= lastEmittedWatermark) {
			lastEmittedWatermark = potentialWM;
		}
		return new Watermark(lastEmittedWatermark);
	}
	@Override
	public final long extractTimestamp(T element, long previousElementTimestamp) {
		long timestamp = extractTimestamp(element);
		if (timestamp > currentMaxTimestamp) {
			currentMaxTimestamp = timestamp;
		}
		return timestamp;
	}

通过BoundedOutOfOrdernessTimestampExtractor抽取Timestamp和生成周期性水位线示例:

//在assignTimestampsAndWatermarks中,通过BoundedOutOfOrdernessTimestampExtractor抽取Timestamp和生成周期性水位线示例
public class Test{
    public static void main(String[] args) throws Exception{
        //创建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置EventTime语义
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //设置周期生成Watermark间隔(10毫秒)
        env.getConfig().setAutoWatermarkInterval(10L);
        //并行度1
        env.setParallelism(1);
        //演示数据
        DataStreamSource mySource = env.fromElements(
                new ClickEvent("user1", 1L, 1),
                new ClickEvent("user1", 2L, 2),
                new ClickEvent("user1", 3L, 3),
                new ClickEvent("user1", 4L, 4),
                new ClickEvent("user1", 5L, 5),
                new ClickEvent("user1", 6L, 6),
                new ClickEvent("user1", 7L, 7),
                new ClickEvent("user1", 8L, 8)
        );
        //BoundedOutOfOrdernessTimestampExtractor实现AssignerWithPeriodicWatermarks周期性生成水位线
        //maxOutOfOrderness最大延迟长度,如2毫秒,可更好的处理延迟数据
        //水位线为:最大的时间戳currentMaxTimestamp - 最大延迟长度maxOutOfOrderness
        SingleOutputStreamOperator streamTS = mySource.assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(2L)) {
                    @Override
                    public long extractTimestamp(ClickEvent event) {
                        try {
                            //放慢处理速度,否则可能只会生成一条水位线
                            Thread.sleep(100L);
                        } catch (Exception ex) {
                        }
                        System.out.println("getDateTime:" + event.getDateTime());
                        //提取时间戳
                        return event.getDateTime();
                    }
                });
        //结果打印
        streamTS.print();
        env.execute();
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/582285.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号