栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink - Watermark

Flink - Watermark

1. 概述

给datastream添加watermark的方法

assignTimestampsAndWatermarks(WatermarkStrategy watermarkStrategy)

WatermarkStrategy(watermark策略)是一个interface,包含一个必须实现的方法

WatermarkGenerator createWatermarkGenerator(Context var1);

这个方法需要返回的WatermarkGenerator(watermark生成器)也是一个接口,包含两个方法

void onEvent(T var1, long var2, WatermarkOutput var4);
void onPeriodicEmit(WatermarkOutput var1);

完整的写出来就是

data.assignTimestampsAndWatermarks(new WatermarkStrategy() {
    @Override
    public WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
        return new WatermarkGenerator() {
            @Override
            public void onEvent(TestObj testObj, long l, WatermarkOutput watermarkOutput) {

            }

            @Override
            public void onPeriodicEmit(WatermarkOutput watermarkOutput) {

            }
        };
    }
})

onEvent方法在事件发生时调用,ontPeriodicEmit周期性调用,这两个方法都可以用来生成watermark

生成方法

watermarkOutput.emitWatermark(Watermark wm);

只要在上面两个方法中调用就可以了,Watermark的构造方法只有一个参数,是long类型的时间戳,也就是watermark的标记时间

2. 内置的Watermark生成器

WatermarkStrategy.forMonotonousTimestamps();

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));

 翻了下源码,forMonotonousTimestamps()是forBoundedOutOfOrderness()乱序时间为0的情况

static  WatermarkStrategy forMonotonousTimestamps() {
    return (ctx) -> {
        return new AscendingTimestampsWatermarks();
    };
}

public class AscendingTimestampsWatermarks extends BoundedOutOfOrdernessWatermarks {
    public AscendingTimestampsWatermarks() {
        super(Duration.ofMillis(0L));
    }
}

所以我们看一下 BoundedOutOfOrdernessWatermarks就可以了

public class BoundedOutOfOrdernessWatermarks implements WatermarkGenerator {
    private long maxTimestamp;
    private final long outOfOrdernessMillis;

    public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
        Preconditions.checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
        Preconditions.checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");
        this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
        this.maxTimestamp = -9223372036854775808L + this.outOfOrdernessMillis + 1L;
    }

    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        this.maxTimestamp = Math.max(this.maxTimestamp, eventTimestamp);
    }

    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(this.maxTimestamp - this.outOfOrdernessMillis - 1L));
    }
}

每次事件取最大时间,然后周期性生成最大事件时间减去最大乱序时间的做为watermark,也就是说先发生但后到达的事件的watermark会和在它后面发生但先于它到达,且到达时间差小于最大乱序时间的事件的watermark相同

onPeriodicEmit的周期可以用下面方法设置

env.getConfig().setAutoWatermarkInterval(long interval);

3. withIdleness方法

使用方法

WatermarkStrategy.forMonotonousTimestamps().withIdleness(Duration.ofMinutes(1))

这个方法可以解决个别分区在一段时间内数据空闲导致的没有watermark生成,而由于下游算子 watermark 的计算方式是取所有不同的上游并行数据源 watermark 的最小值,导致watermark不变的问题

这个问题是如何解决的呢?官网没说,还是看源码

default WatermarkStrategy withIdleness(Duration idleTimeout) {
    Preconditions.checkNotNull(idleTimeout, "idleTimeout");
    Preconditions.checkArgument(!idleTimeout.isZero() && !idleTimeout.isNegative(), "idleTimeout must be greater than zero");
    return new WatermarkStrategyWithIdleness(this, idleTimeout);
}

final class WatermarkStrategyWithIdleness implements WatermarkStrategy {
    private static final long serialVersionUID = 1L;
    private final WatermarkStrategy baseStrategy;
    private final Duration idlenessTimeout;

    WatermarkStrategyWithIdleness(WatermarkStrategy baseStrategy, Duration idlenessTimeout) {
        this.baseStrategy = baseStrategy;
        this.idlenessTimeout = idlenessTimeout;
    }

    public TimestampAssigner createTimestampAssigner(Context context) {
        return this.baseStrategy.createTimestampAssigner(context);
    }

    public WatermarkGenerator createWatermarkGenerator(org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier.Context context) {
        return new WatermarksWithIdleness(this.baseStrategy.createWatermarkGenerator(context), this.idlenessTimeout);
    }
}

public class WatermarksWithIdleness implements WatermarkGenerator {
    private final WatermarkGenerator watermarks;
    private final WatermarksWithIdleness.IdlenessTimer idlenessTimer;

    public WatermarksWithIdleness(WatermarkGenerator watermarks, Duration idleTimeout) {
        this(watermarks, idleTimeout, SystemClock.getInstance());
    }

    @VisibleForTesting
    WatermarksWithIdleness(WatermarkGenerator watermarks, Duration idleTimeout, Clock clock) {
        Preconditions.checkNotNull(idleTimeout, "idleTimeout");
        Preconditions.checkArgument(!idleTimeout.isZero() && !idleTimeout.isNegative(), "idleTimeout must be greater than zero");
        this.watermarks = (WatermarkGenerator)Preconditions.checkNotNull(watermarks, "watermarks");
        this.idlenessTimer = new WatermarksWithIdleness.IdlenessTimer(clock, idleTimeout);
    }

    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        this.watermarks.onEvent(event, eventTimestamp, output);
        this.idlenessTimer.activity();//每次事件调用一次
    }

    public void onPeriodicEmit(WatermarkOutput output) {
        //判断是否是空闲
        if (this.idlenessTimer.checkIfIdle()) {
            output.markIdle();//标记为空闲
        } else {
            this.watermarks.onPeriodicEmit(output);//正常执行
        }

    }

    @VisibleForTesting
    static final class IdlenessTimer {
        private final Clock clock;
        private long counter;
        private long lastCounter;
        private long startOfInactivityNanos;
        private final long maxIdleTimeNanos;

        IdlenessTimer(Clock clock, Duration idleTimeout) {
            this.clock = clock;

            long idleNanos;
            try {
                idleNanos = idleTimeout.tonanos();
            } catch (ArithmeticException var6) {
                idleNanos = 9223372036854775807L;
            }

            this.maxIdleTimeNanos = idleNanos;
        }

        public void activity() {
            ++this.counter;//增加一个计数
        }

        public boolean checkIfIdle() {
            //本次计数和上次计数不同(期间有事件发生)
            if (this.counter != this.lastCounter) {
                //用本次计数替换上次计数
                this.lastCounter = this.counter;
                //开始计数间重置为0
                this.startOfInactivityNanos = 0L;
                //返回非空闲
                return false;
            //本次计数和上次计数相同(期间无事件发生),开始计数时间为0
            } else if (this.startOfInactivityNanos == 0L) {
                //开始计数时间设置为当前系统时间ns数
                this.startOfInactivityNanos = this.clock.relativeTimeNanos();
                //返回非空闲
                return false;
            //本次计数和上次计数相同(期间无事件发生),开始计数时间不为0
            } else {
                //如果当前系统ns数减去开始计数时间大于最大空闲时间,返回空闲,否则返回非空闲
                return this.clock.relativeTimeNanos() - this.startOfInactivityNanos > this.maxIdleTimeNanos;
            }
        }
    }
}

都写注解里啦 

4. createTimestampAssigner 

 assignTimestampsAndWatermarks还有一个方法需要注意

default TimestampAssigner createTimestampAssigner(org.apache.flink.api.common.eventtime.TimestampAssignerSupplier.Context context)

这个方法在Kafka/Kinesis 数据源时不需要实现,可以从数据记录中获取到timestamp,但是其他情况,比如说例子里这种就需要手动实现一下,不然会抛出异常

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/312707.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号