- 固定乱序长度策略(forBoundedOutOfOrderness)
- 单调递增策略(forMonotonousTimestamps)
- 不生成策略(noWatermarks)
flink1.11版本后 建议用WatermarkStrategy(Watermark生成策略)生成Watermark,当创建DataStream对象后,使用如下方法指定策略: assignTimestampsAndWatermarks(WatermarkStrategy
我们只需要 实现WatermarkGenerator
@Public public interface WatermarkGenerator固定乱序长度策略(forBoundedOutOfOrderness){ void onEvent(T event, long eventTimestamp, WatermarkOutput output); void onPeriodicEmit(WatermarkOutput output); }
通过调用WatermarkStrategy对象上的forBoundedOutOfOrderness方法来实现,接收一个Duration类型的参数作为最大乱序(out of order)长度。WatermarkStrategy对象上的withTimestampAssigner方法为从事件数据中提取时间戳提供了接口。
//在assignTimestampsAndWatermarks中用WatermarkStrategy.forBoundedOutOfOrderness方法抽取Timestamp和生成周期性水位线示例
public class Test{
public static void main(String[] args) throws Exception{
//创建流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置EventTime语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//设置周期生成Watermark间隔(10毫秒)
env.getConfig().setAutoWatermarkInterval(10L);
//并行度1
env.setParallelism(1);
//演示数据
DataStreamSource mySource = env.fromElements(
new ClickEvent("user1", 1L, 1),
new ClickEvent("user1", 2L, 2),
new ClickEvent("user1", 3L, 3),
new ClickEvent("user1", 4L, 4),
new ClickEvent("user1", 5L, 5),
new ClickEvent("user1", 6L, 6),
new ClickEvent("user1", 7L, 7),
new ClickEvent("user1", 8L, 8)
);
//WatermarkStrategy.forBoundedOutOfOrderness周期性生成水位线
//可更好处理延迟数据
//BoundedOutOfOrdernessWatermarks实现WatermarkGenerator
SingleOutputStreamOperator streamTS = mySource.assignTimestampsAndWatermarks(
//指定Watermark生成策略,最大延迟长度5毫秒
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(5))
.withTimestampAssigner(
//SerializableTimestampAssigner接口中实现了extractTimestamp方法来指定如何从事件数据中抽取时间戳
new SerializableTimestampAssigner() {
@Override
public long extractTimestamp(ClickEvent event, long recordTimestamp) {
return event.getDateTime();
}
})
);
//结果打印
streamTS.print();
env.execute();
}
}
withTimestampAssigner方法核心代码:
default WatermarkStrategy withTimestampAssigner(SerializableTimestampAssigner timestampAssigner) {
checkNotNull(timestampAssigner, "timestampAssigner");
return new WatermarkStrategyWithTimestampAssigner<>(this, TimestampAssignerSupplier.of(timestampAssigner));
}
forBoundedOutOfOrderness方法使用WatermarkGenerator接口的实现类BoundedOutOfOrdernessWatermarks来实现具体生成策略,核心代码:
static WatermarkStrategy forBoundedOutOfOrderness(Duration maxOutOfOrderness) {
return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);
}
BoundedOutOfOrdernessWatermarks提供2个方法,onEvent方法每次从事件数据中提取时间戳,并计算最大时间戳maxTimestamp;在周期触发的onPeriodicEmit方法中,生成的Watermark等于最大时间戳maxTimestamp减去最大乱序长度outOfOrdernessMillis,再减1毫秒。核心代码:
@Public public class BoundedOutOfOrdernessWatermarks单调递增策略(forMonotonousTimestamps)implements WatermarkGenerator { private long maxTimestamp; private final long outOfOrdernessMillis; public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) { checkNotNull(maxOutOfOrderness, "maxOutOfOrderness"); checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative"); this.outOfOrdernessMillis = maxOutOfOrderness.toMillis(); // start so that our lowest watermark would be Long.MIN_VALUE. this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1; } @Override public void onEvent(T event, long eventTimestamp, WatermarkOutput output) { maxTimestamp = Math.max(maxTimestamp, eventTimestamp); } @Override public void onPeriodicEmit(WatermarkOutput output) { output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1)); } }
通过调用WatermarkStrategy对象上的forMonotonousTimestamps方法来实现,无需任何参数,相当于将forBoundedOutOfOrderness策略的最大乱序长度outOfOrdernessMillis设置为0。
//在assignTimestampsAndWatermarks中用WatermarkStrategy.forMonotonousTimestamps方法抽取Timestamp和生成周期性水位线示例
public class Test{
public static void main(String[] args) throws Exception{
//创建流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置EventTime语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//设置周期生成Watermark间隔(10毫秒)
env.getConfig().setAutoWatermarkInterval(10L);
//并行度1
env.setParallelism(1);
//演示数据
DataStreamSource mySource = env.fromElements(
new ClickEvent("user1", 1L, 1),
new ClickEvent("user1", 2L, 2),
new ClickEvent("user1", 3L, 3),
new ClickEvent("user1", 4L, 4),
new ClickEvent("user1", 5L, 5),
new ClickEvent("user1", 6L, 6),
new ClickEvent("user1", 7L, 7),
new ClickEvent("user1", 8L, 8)
);
//WatermarkStrategy.forMonotonousTimestamps周期性生成水位线
//相当于延迟outOfOrdernessMillis=0
//继承自BoundedOutOfOrdernessWatermarks
SingleOutputStreamOperator streamTS = mySource.assignTimestampsAndWatermarks(
WatermarkStrategy.forMonotonousTimestamps()
.withTimestampAssigner((event, recordTimestamp) -> event.getDateTime())
);
//结果打印
streamTS.print();
env.execute();
}
}
forMonotonousTimestamps使用AscendingTimestampsWatermarks类来实现。核心代码:
static WatermarkStrategy forMonotonousTimestamps() {
return (ctx) -> new AscendingTimestampsWatermarks<>();
}
AscendingTimestampsWatermarks继承自BoundedOutOfOrdernessWatermarks类,核心代码:
@Public public class AscendingTimestampsWatermarks不生成策略(noWatermarks)extends BoundedOutOfOrdernessWatermarks { public AscendingTimestampsWatermarks() { //最大延迟长度为0,内部生成的Watermark=最大时间戳maxTimestamp-1 super(Duration.ofMillis(0)); } }
WatermarkStrategy.noWatermarks()
当一个算子从多个上游算子中获取数据时,会取上游最小的Watermark作为自身的Watermark,并检测是否满足窗口触发条件。当达不到触发条件,窗口会在内存中缓存大量窗口数据,导致内存不足等问题。
flink提供了设置流状态为空闲的withIdleness方法。在设置的超时时间内,当某个数据流一直没有事件数据到达,就标记这个流为空闲。下游算子不需要等待这条数据流产生的Watermark,而取其他上游激活状态的Watermark,来决定是否需要触发窗口计算。
WatermarkStrategy.withIdleness(Duration.ofMillis(5))
上面代码设置超时时间5毫秒,超过这个时间,没有生成Watermark,将流状态设置空闲,当下次有新的Watermark生成并发送到下游时,重新设置为活跃。



