周期生成watermark生成策略:
指定固定时间间隔的OutOfOrderness最大容忍时间,所以此时watermark=最大元素时间戳-OutOfOrderness最大容忍时间,并且当Watermark>=窗口结束时间,窗口被触发进行计算,该操作在默认的trigger中。
Watermark是可以设置延迟触发窗口计算,而allowedLateness是设置在窗口已经触发后对迟到的数据进行怎样的处理,是窗口的一种属性,默认为丢弃迟到数据,也可以侧输出流sideOutputLateData,也可以重新触发窗口计算allowedLateness。
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.*;
import java.util.Random;
public class WaterMarkBoundedOutOfOrderness {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置生成Watermark的时间间隔,基于性能的考虑
env.getConfig().setAutoWatermarkInterval(100L);
DataStreamSource> stringDataStreamSource = env.addSource(new SourceFunction>() {
volatile boolean flag = true;
@Override
public void run(SourceContext> sourceContext) throws Exception {
String[] s = {"张三","王五","李四","秋英"};
while(flag) {
Thread.sleep(1000);
int i = new Random().nextInt(4);
sourceContext.collect(new Tuple2(s[i],System.currentTimeMillis()));
}
}
@Override
public void cancel() {
flag = false;
}
});
//在流上设置watermark生成策略,固定时间间隔策略,也就是最大容忍延迟时间
SingleOutputStreamOperator> tuple2SingleOutputStreamOperator = stringDataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner(new SerializableTimestampAssigner>() {
//抽取时间戳
@Override
public long extractTimestamp(Tuple2 integerLongTuple2, long l) {
return integerLongTuple2.f1;
}
}));
tuple2SingleOutputStreamOperator.map(new MapFunction, Tuple3>() {
@Override
public Tuple3 map(Tuple2 stringLongTuple2) throws Exception {
System.out.println(stringLongTuple2.f0 + stringLongTuple2.f1+" "+System.currentTimeMillis());
return new Tuple3(stringLongTuple2.f0,stringLongTuple2.f1,1);
}
}).keyBy(new KeySelector, String>() {
@Override
public String getKey(Tuple3 s) throws Exception {
return s.f0;
}
}).window(TumblingEventTimeWindows.of(Time.seconds(10)))
.sum(2)
.print();
env.execute("watermark test");
}
}
=====================分割线=========================
以下代码类似boundedOutOfOrderness的代码,不过该方式已经过时:
SingleOutputStreamOperator> tuple2SingleOutputStreamOperator = streamSource.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks >() { Long currentWatermark = 0l; Long mxTimestamp = Long.MIN_VALUE; Long maxOutOfOrderness = 1000l; //抽取时间戳 @Override public long extractTimestamp(Tuple2 s, long l) { System.out.println("(" + s.f0 + "," + s.f1 + ")"); System.out.println("当前的watermark: " + currentWatermark); return mxTimestamp = Math.max(s.f1, mxTimestamp); } //生成watermark @Nullable @Override public Watermark getCurrentWatermark() { currentWatermark = mxTimestamp - maxOutOfOrderness; System.out.println("当前产生的watermark: " + currentWatermark); return new Watermark(currentWatermark); } });



