flink多并行时,如果有窗口中没数据,那么有数据的窗口即使watermark到达了触发边界,barren没对齐,窗口也不会触发计算。这样的空窗口即空闲窗口。可通过设置空闲时间(withIdleness)来使有数据的窗口进行触发。
输入:parallellism:2
windowSize: 10s
forBoundOutofOrderness: 5s
withIdleness:10s
窗口数据触发范围为[n*size, (n+1)*size+5)
窗口数据有效范围为[n*size, (n+1)*size)
a 1 a 10 a 15 -- 到达第一个窗口触发边界,因有两个task,另一个task中并无数据,所以不会正常触发 a 20 第二个窗口task2 数据 b 1 -- 过期被丢弃 b 19 第二个窗口task1 数据 b 25 第二个窗口task1 数据,且到达触发边界,窗口触发输出:
2> (a,1) ======================= ^^^ window_num: 0 ^^^ ================== ======================= range: [0___10000 ) ======================= key: a ======================= watermark: 9999 ====================================================== 只有一个窗口中有数据,到达空闲时间10s后才会触发 1> (b,19) 2> (a,10) ======================= ^^^ window_num: 0 ^^^ ================== ======================= range: [10000___20000 ) ======================= key: b 2> (side-a,10) ======================= watermark: 19999 ====================================================== 2> (a,15) ======================= ^^^ window_num: 1 ^^^ ================== ======================= range: [10000___20000 ) ======================= key: a ======================= watermark: 19999 ====================================================== 两个窗口中都有数据,当有一个窗口数到达触发时间戳数据两个窗口都会触发demo:
//定义侧流标签
private static OutputTag> outputTag = new OutputTag>("dayle_data2"){};
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataStreamSource> dataStreams = env
.addSource(new SourceFunction>() {
Random random = new Random();
int tp2 = 0;
int speed = 0;
int f1 = -1;
@Override
public void run(SourceContext> ctx) throws Exception {
while (true) {
TimeUnit.SECONDS.sleep(1);
tp2 = Math.abs(random.nextInt() % 7);
f1 = Math.abs(++speed + tp2);
ctx.collect(Tuple2.of("a", f1));
System.out.println("source generator :t" + f1);
}
}
@Override
public void cancel() {
}
});
//必须为匿名内部类
OutputTag erroOutputTag = new OutputTag("formatExceptionData"){} ;
SingleOutputStreamOperator> socketTextStream = env
.socketTextStream("localhost", 10086)
//异常数据侧流输出
.process(new FlatMapProcess(erroOutputTag));
SingleOutputStreamOperator> tuple2Stream = ReduceWindowPrint(socketTextStream, 5);
//获取测流
socketTextStream.getSideOutput(erroOutputTag).print();
tuple2Stream.getSideOutput(outputTag).print();
//正常处理的主流
tuple2Stream.print();
env.execute("event time process ");
}
private static SingleOutputStreamOperator> ReduceWindowPrint(SingleOutputStreamOperator> dataStreams, int dayleTime) {
return dataStreams
.assignTimestampsAndWatermarks(
WatermarkStrategy
.>forBoundedOutOfOrderness(Duration.ofSeconds(dayleTime))
.withTimestampAssigner((e, t) -> e.f1*1000)
//多并行下如果有窗口数据为空,那么窗口需要barre对齐,不会触发。
// 空窗口将导致下游算子都无法进行计算,
// 设置idleness时间那么如果存在空窗口,当别的窗口有数据并且到达设置的时间(下面为10s)
// 窗口就会触发
.withIdleness(Duration.ofSeconds(10))
)
.keyBy(e -> e.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
//允许上个窗口数据迟到时间
// .allowedLateness(Time.seconds(5))
.process(new ProcessWindowFunction, Tuple2, String, TimeWindow>() {
int i;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
i = 0;
}
@Override
public synchronized void process(String s, Context context, Iterable> elements, Collector> out) throws Exception {
Iterator> iterator = elements.iterator();
while (iterator.hasNext()) {
Tuple2 next = iterator.next();
out.collect(next);
if (next.f1%2 == 0) {
//收集数据到侧流
context.>output(outputTag,Tuple2.of("side-" + next.f0, next.f1));
}
}
System.out.println("======================= ^^^ window_num:t" + i++ + " ^^^ ==================");
System.out.println("======================= range: [" + context.window().getStart() + "___" + context.window().getEnd() + " )");
System.out.println("======================= key: " + s );
System.out.println("======================= watermark: " + context.currentWatermark());
System.out.println("======================================================");
}
});
}
}



