- 窗口 top-N flink 1.12
package com.cn.stream;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.stream.Collectors;
public class WindowTopN {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource Input = env.readTextFile("E:\大数据相关-学员参考\flinkdemo\src\main\resources\ok.txt");
SingleOutputStreamOperator> InputMap = Input.map(new MapFunction>() {
@Override
public Tuple3 map(String value) throws Exception {
String[] v = value.split(",");
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm");
long timestampe = LocalDateTime.parse(v[0], formatter).toInstant(ZoneOffset.of("+8")).toEpochMilli();
return new Tuple3(timestampe, Float.valueOf(v[1]), v[2]);
}
}).uid("001-zlg-map01").name("切分数据源")
.assignTimestampsAndWatermarks(
WatermarkStrategy
.>forBoundedOutOfOrderness(
Duration.ofSeconds(5))
.withTimestampAssigner((event, timstamp) -> {
return event.f0;
}
).withIdleness(Duration.ofSeconds(1)));
SingleOutputStreamOperator>> process = InputMap
.rebalance()
.keyBy(new KeySelector, String>() {
@Override
public String getKey(Tuple3 value) throws Exception {
return value.f2;
}
})
.window(TumblingEventTimeWindows.of(Time.minutes(10)))
.aggregate(new aggregations(), new processWindowsA()).uid("aggregation").name("窗口聚合")
.keyBy(new KeySelector, Long>() {
@Override
public Long getKey(Tuple3 value) throws Exception {
return value.f2;
}
})
.process(new keywd()).uid("topc").name("求topn");
process.print();
env.execute("top n");
}
}
class aggregations implements AggregateFunction, Float, Float> {
//创建一个 累加器
@Override
public Float createAccumulator() {
float num = 0l;
return num;
}
// 创建累加规则
@Override
public Float add(Tuple3 value, Float accumulator) {
return accumulator + value.f1;
}
//返回的结果
@Override
public Float getResult(Float accumulator) {
return accumulator;
}
// 对不同的节点 计算结果的汇总
@Override
public Float merge(Float a, Float b) {
return a + b;
}
}
class processWindowsA implements WindowFunction, String, TimeWindow> {
//操作 里面有 状态的信息 返回 关闭的窗口的时间
@Override
public void apply(String s, TimeWindow window, Iterable input, Collector> out) throws Exception {
Float next = input.iterator().next();
// 获取 窗口结束的时候 目的是为了 触发 onTimer 去处排序和 取值问题
long end = window.getEnd();
out.collect(new Tuple3(next, s, end));
}
}
class keywd extends KeyedProcessFunction, List>> {
ListState> listState = null;
@Override
public void open(Configuration parameters) throws Exception {
// 在开始前就 初始化 状态 和状态描述器
ListStateDescriptor> tuple3ListStateDescriptor = new ListStateDescriptor>("top-n", TypeInformation.of(new TypeHint>() {
@Override
public TypeInformation> getTypeInfo() {
return super.getTypeInfo();
}
}));
listState = getRuntimeContext().getListState(tuple3ListStateDescriptor);
}
@Override
public void onTimer(long timestamp, KeyedProcessFunction, List>>.onTimerContext ctx, Collector>> out) throws Exception {
// 将状态 状装入 集合并且清除 状态集合
List> list = new ArrayList<>();
Iterator> iterator = listState.get().iterator();
while (iterator.hasNext()) {
Tuple3 next = iterator.next();
list.add(next);
}
listState.clear();
//排序 取前三
List> collect = list.stream().sorted(new Comparator>() {
@Override
public int compare(Tuple3 o1, Tuple3 o2) {
return o1.f0 - o2.f0 < 0 ? 1 : -1;
}
}).limit(3).collect(Collectors.toList());
out.collect(collect);
}
@Override
public void processElement(Tuple3 value, KeyedProcessFunction, List>>.Context ctx, Collector>> out) throws Exception {
//将 数据 装入到集合中
listState.add(value);
//注册ontimer 执行时间 就是在 窗口 关闭的 后1 纳秒 就 执行
ctx.timerService().registerEventTimeTimer(value.f2 + 1);
}
}
提供的 文本
2020-04-15 08:05,4.00,supplier1 2020-04-15 08:06,4.00,supplier2 2020-04-15 08:07,2.00,supplier1 2020-04-15 08:08,2.00,supplier3 2020-04-15 08:09,5.00,supplier4 2020-04-15 08:11,2.00,supplier3 2020-04-15 08:13,1.00,supplier1 2020-04-15 08:15,3.00,supplier2 2020-04-15 08:17,6.00,supplier5 2020-04-15 08:25,6.00,supplier5 2020-04-15 08:30,6.00,supplier5
注意 需要注意的有以下几点 第一个 引入包的问题 因为 flink 有java scala 两套API 所以在引入的时候一定观察好不要引入错了 而造成 聚合报错
第二个 就是 类型 一般 使用Tuple的时候很容易造成 类型推断问题 在 程序里面参考我做的 处理 基本可以避免此类问题 缺点但就是 写的时候可能繁琐一些



