栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink生成Watermark之WatermarkStrategy

flink生成Watermark之WatermarkStrategy

文章目录
  • 固定乱序长度策略(forBoundedOutOfOrderness)
  • 单调递增策略(forMonotonousTimestamps)
  • 不生成策略(noWatermarks)

flink1.11版本后 建议用WatermarkStrategy(Watermark生成策略)生成Watermark,当创建DataStream对象后,使用如下方法指定策略: assignTimestampsAndWatermarks(WatermarkStrategy)
我们只需要 实现WatermarkGenerator接口即可,该接口中有2个方法: onEvent方法在接收到每一个事件数据时就会触发调用,第一个参数event为接收的事件数据,第二个参数eventTimestamp表示事件时间戳,第三个参数output可用output.emitWatermark方法生成一个Watermark。 onPeriodicEmit方法会周期性触发,比每个元素生成一个Watermark效率高。接收一个WatermarkOutput类型的参数output,内部可用output.emitWatermark方法生成一个Watermark。

@Public
public interface WatermarkGenerator {
    
    void onEvent(T event, long eventTimestamp, WatermarkOutput output);
    
    void onPeriodicEmit(WatermarkOutput output);
}
固定乱序长度策略(forBoundedOutOfOrderness)

通过调用WatermarkStrategy对象上的forBoundedOutOfOrderness方法来实现,接收一个Duration类型的参数作为最大乱序(out of order)长度。WatermarkStrategy对象上的withTimestampAssigner方法为从事件数据中提取时间戳提供了接口

//在assignTimestampsAndWatermarks中用WatermarkStrategy.forBoundedOutOfOrderness方法抽取Timestamp和生成周期性水位线示例
public class Test{
    public static void main(String[] args) throws  Exception{
        //创建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置EventTime语义
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //设置周期生成Watermark间隔(10毫秒)
        env.getConfig().setAutoWatermarkInterval(10L);
        //并行度1
        env.setParallelism(1);
        //演示数据
        DataStreamSource mySource = env.fromElements(
                new ClickEvent("user1", 1L, 1),
                new ClickEvent("user1", 2L, 2),
                new ClickEvent("user1", 3L, 3),
                new ClickEvent("user1", 4L, 4),
                new ClickEvent("user1", 5L, 5),
                new ClickEvent("user1", 6L, 6),
                new ClickEvent("user1", 7L, 7),
                new ClickEvent("user1", 8L, 8)
        );
        //WatermarkStrategy.forBoundedOutOfOrderness周期性生成水位线
        //可更好处理延迟数据
        //BoundedOutOfOrdernessWatermarks实现WatermarkGenerator
        SingleOutputStreamOperator streamTS = mySource.assignTimestampsAndWatermarks(
        		//指定Watermark生成策略,最大延迟长度5毫秒
                WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(5))
                        .withTimestampAssigner(
                        		//SerializableTimestampAssigner接口中实现了extractTimestamp方法来指定如何从事件数据中抽取时间戳
                                new SerializableTimestampAssigner() {
                                   @Override
                                   public long extractTimestamp(ClickEvent event, long recordTimestamp) {
                                       return event.getDateTime();
                                   }
                                 })
       );
        //结果打印
        streamTS.print();
        env.execute();
    }
}

withTimestampAssigner方法核心代码:

    default WatermarkStrategy withTimestampAssigner(SerializableTimestampAssigner timestampAssigner) {
        checkNotNull(timestampAssigner, "timestampAssigner");
        return new WatermarkStrategyWithTimestampAssigner<>(this, TimestampAssignerSupplier.of(timestampAssigner));
    }

forBoundedOutOfOrderness方法使用WatermarkGenerator接口的实现类BoundedOutOfOrdernessWatermarks来实现具体生成策略,核心代码:

    
    static  WatermarkStrategy forBoundedOutOfOrderness(Duration maxOutOfOrderness) {
        return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);
    }

BoundedOutOfOrdernessWatermarks提供2个方法,onEvent方法每次从事件数据中提取时间戳,并计算最大时间戳maxTimestamp;在周期触发的onPeriodicEmit方法中,生成的Watermark等于最大时间戳maxTimestamp减去最大乱序长度outOfOrdernessMillis,再减1毫秒。核心代码:

@Public
public class BoundedOutOfOrdernessWatermarks implements WatermarkGenerator {

    
    private long maxTimestamp;

    
    private final long outOfOrdernessMillis;

    
    public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
        checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
        checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");

        this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();

        // start so that our lowest watermark would be Long.MIN_VALUE.
        this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
    }

    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
    }
}
单调递增策略(forMonotonousTimestamps)

通过调用WatermarkStrategy对象上的forMonotonousTimestamps方法来实现,无需任何参数,相当于将forBoundedOutOfOrderness策略的最大乱序长度outOfOrdernessMillis设置为0。

//在assignTimestampsAndWatermarks中用WatermarkStrategy.forMonotonousTimestamps方法抽取Timestamp和生成周期性水位线示例
public class Test{
    public static void main(String[] args) throws  Exception{
        //创建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置EventTime语义
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //设置周期生成Watermark间隔(10毫秒)
        env.getConfig().setAutoWatermarkInterval(10L);
        //并行度1
        env.setParallelism(1);
        //演示数据
        DataStreamSource mySource = env.fromElements(
                new ClickEvent("user1", 1L, 1),
                new ClickEvent("user1", 2L, 2),
                new ClickEvent("user1", 3L, 3),
                new ClickEvent("user1", 4L, 4),
                new ClickEvent("user1", 5L, 5),
                new ClickEvent("user1", 6L, 6),
                new ClickEvent("user1", 7L, 7),
                new ClickEvent("user1", 8L, 8)
        );
        //WatermarkStrategy.forMonotonousTimestamps周期性生成水位线
        //相当于延迟outOfOrdernessMillis=0
        //继承自BoundedOutOfOrdernessWatermarks
        SingleOutputStreamOperator streamTS = mySource.assignTimestampsAndWatermarks(
                WatermarkStrategy.forMonotonousTimestamps()
                        .withTimestampAssigner((event, recordTimestamp) -> event.getDateTime())
        );
        //结果打印
        streamTS.print();
        env.execute();
    }
}

forMonotonousTimestamps使用AscendingTimestampsWatermarks类来实现。核心代码:

    
    static  WatermarkStrategy forMonotonousTimestamps() {
        return (ctx) -> new AscendingTimestampsWatermarks<>();
    }

AscendingTimestampsWatermarks继承自BoundedOutOfOrdernessWatermarks类,核心代码:

@Public
public class AscendingTimestampsWatermarks extends BoundedOutOfOrdernessWatermarks {

    
    public AscendingTimestampsWatermarks() {
    	//最大延迟长度为0,内部生成的Watermark=最大时间戳maxTimestamp-1
        super(Duration.ofMillis(0));
    }
}
不生成策略(noWatermarks)

WatermarkStrategy.noWatermarks()

当一个算子从多个上游算子中获取数据时,会取上游最小的Watermark作为自身的Watermark,并检测是否满足窗口触发条件。当达不到触发条件,窗口会在内存中缓存大量窗口数据,导致内存不足等问题。
flink提供了设置流状态为空闲的withIdleness方法。在设置的超时时间内,当某个数据流一直没有事件数据到达,就标记这个流为空闲。下游算子不需要等待这条数据流产生的Watermark,而取其他上游激活状态的Watermark,来决定是否需要触发窗口计算。
WatermarkStrategy.withIdleness(Duration.ofMillis(5))
上面代码设置超时时间5毫秒,超过这个时间,没有生成Watermark,将流状态设置空闲,当下次有新的Watermark生成并发送到下游时,重新设置为活跃。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/618835.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号