这期分享windos的理解,只有这个理解清楚了,才能更好的根据场景选择合适的开窗处理。
一、window的基本概念 1.window是什么 2.window的分类
PS:
按key分组了用window构建多个window,未分组用windowAll(API后缀都带All)
区别示例:
翻滚窗口
翻滚窗口的使用
滑动窗口
滑动窗口的使用
session窗口
session窗口的使用
PS:
sessionWindow只能基于时间
global窗口
示例比较,注意体会各个window的区别
预定义的这些window可以替换.window()
PS
windowFunction/AllWindowFunction是早期版本一致遗留下来的,现在被ProcessWindowFunction/ProcessAllWindowFunction替换
PS
已过时,官方已经不推荐用了
PS
我目前用的1.14.4已经标记过时了,官方也已经不推荐
这是新一代的窗口函数
三、触发器与驱逐器 1.什么是触发器 2.触发和清除 3.默认触发器 4.内置和自定义触发器 5.驱逐器的作用 6.内置驱逐器
四、延迟处理及窗口计算结果的使用 1.如何允许延迟 2.延迟数据的获取 3.晚点元素注意 4.window result的使用 5.水位线与窗口的交互 6.窗口估算注意事项
五、窗口函数示例 1.AggregateFunction
package spendreport.window;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class TestAggFunctionOnWindow {
private static final Tuple3[] ENGLISH_TRANSCRIPT = new Tuple3[]{
Tuple3.of("class1", "张三", 100D),
Tuple3.of("class1", "李四", 78D),
Tuple3.of("class1", "王五", 99D),
Tuple3.of("class2", "赵六", 81D),
Tuple3.of("class2", "钱七", 59D),
Tuple3.of("class2", "马二", 97D),
};
public static void main(String[] args) throws Exception {
//获取运行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream> input = env.fromElements(ENGLISH_TRANSCRIPT);
//求各班级英语平均分
DataStream avgScore = input.keyBy(
new KeySelector, String>() {
@Override
public String getKey(Tuple3 value)
throws Exception {
return value.f0;
}
}).countWindow(2).aggregate(new AverageAggregate());
//打印统计结果
avgScore.print();
//执行
env.execute();
}
private static class AverageAggregate implements
AggregateFunction, Tuple2, Double> {
@Override
public Tuple2 createAccumulator() {
return new Tuple2<>(0D, 0L);
}
@Override
public Tuple2 add(Tuple3 value,
Tuple2 accumulator) {
//来一个计算一下sum和count保存中间结果到累加器
return new Tuple2<>(accumulator.f0 + value.f2, accumulator.f1 + 1L);
}
@Override
public Double getResult(Tuple2 accumulator) {
return accumulator.f0 / accumulator.f1;
}
@Override
public Tuple2 merge(Tuple2 acc1,
Tuple2 acc2) {
return new Tuple2<>(acc1.f0 + acc2.f0, acc1.f1 + acc2.f1);
}
}
}
2.ReduceFunction
package spendreport.window;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class TestReduceFunctionOnWindow {
private static final Tuple3[] ENGLISH_TRANSCRIPT = new Tuple3[]{
Tuple3.of("class1", "张三", 100),
Tuple3.of("class1", "李四", 78),
Tuple3.of("class1", "王五", 99),
Tuple3.of("class2", "赵六", 81),
Tuple3.of("class2", "钱七", 59),
Tuple3.of("class2", "马二", 97),
};
public static void main(String[] args) throws Exception {
//获取运行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream> input = env.fromElements(ENGLISH_TRANSCRIPT);
//求各班级英语总分
//countWindow(2)满2个才计算
DataStream> totalPoints = input.keyBy(
new KeySelector, String>() {
@Override
public String getKey(Tuple3 value)
throws Exception {
return value.f0;
}
}).countWindow(2).reduce(
new ReduceFunction>() {
@Override
public Tuple3 reduce(
Tuple3 v1,
Tuple3 v2) throws Exception {
return new Tuple3<>(v1.f0, v1.f1, v1.f2 + v2.f2);
}
});
//打印统计结果
totalPoints.print();
//执行
env.execute();
}
}
3.ProcessWindowFunction
package spendreport.window;
import java.util.Iterator;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;
public class TestProcessWinFunctionOnWindow {
private static final Tuple3[] ENGLISH_TRANSCRIPT = new Tuple3[]{
Tuple3.of("class1", "张三", 100D),
Tuple3.of("class1", "李四", 78D),
Tuple3.of("class1", "王五", 99D),
Tuple3.of("class2", "赵六", 81D),
Tuple3.of("class2", "钱七", 59D),
Tuple3.of("class2", "马二", 97D),
};
public static void main(String[] args) throws Exception {
//获取运行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream> input = env.fromElements(ENGLISH_TRANSCRIPT);
//求各班级英语平均分
DataStream avgScore = input.keyBy(
new KeySelector, String>() {
@Override
public String getKey(Tuple3 value)
throws Exception {
return value.f0;
}
}).countWindow(2).process(new MyProcessWindowFunction());
//打印统计结果
avgScore.print();
//执行
env.execute();
}
public static class MyProcessWindowFunction extends
ProcessWindowFunction, Double, String, GlobalWindow> {
@Override
public void process(String tuple,
Context context,
Iterable> iterable, Collector collector)
throws Exception {
//拿到所有数据,最后才计算
Double sum = 0D;
Long count = 0L;
Iterator> it = iterable.iterator();
while (it.hasNext()) {
Tuple3 tp = it.next();
sum += tp.f2;
count++;
}
Double outScore = sum / count;
collector.collect(outScore);
}
}
}
PS
这里例子MyProcessWindowFunction 的第3个参数KEY要与keyBy()的KeySelector返回return的一致。
窗口函数多种写法,特别灵活,在业务场景中使用,先别慌写,理清楚。



