整体框架:
ProcessFunction:能拿到别的API拿不到的东西。
处理函数提供了一个定时服务TimerService,可以用它访问流中的事件event、时间戳timestamp,水位线watermark,注册定时事件。
处理函数继承自AbstractRichFunction,拥有富函数类的特性,可以访问状态state和其他运行时信息。
处理函数可以直接将数据输出到侧输出流side output中。
处理函数是DataStream API的底层逻辑。
ProcessFunction解析ProcessFunction类中: 1. processElement(输入类型,上下文,输入类型Collector) 上下文中能获取到:时间戳、侧输出流、"timerService" timerService中能获取到:currentProcessingTime处理时间、currentWatermark事件时间、registerProcessingTimeTimer注册定时器、删除定时器。 2. onTimer() 注册了定时器后,到点时会触发这个回调,这是定时到了后的处理方法。 但是,只有基于KeyedStream才能定义定时器。处理函数的分类
1. ProcessFunction 2. KeyedProcessFunction----"重要" 3. ProcessWindowFunction 4. ProcessAllWindowFunction 5. CoProcessFunction 6. ProcessJoinFunction 7. BroadcastProcessFunction 8. KeyedBroadcastProcessFunction按键分区处理函数 KeyedProcessFunction
stream.keyBy() .process(new MyKeyedProcessFunction())定时器Timer 和定时服务 TimerService
处理时间:
stream.keyBy(data->data.user) .process(new KeyedProcessFunction(){ @Override public void processElement(Event value, Context ctx, Collector out) throws Exception(){ Long currTs = ctx.timerService().currentProcessingTime(); out.collect(ctx.getCurrentKey() + " 数据到达,到达时间:" + new Timestamp(currTs)); // 注册一个10s后的定时器 ctx.timerService().registerProcessingTimeTimer(currTs + 10 * 1000L); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception() { out.collect(ctx.getCurrentKey() + " 定时器触发,触发时间:"+new Timestamp(timestamp)); } })
事件时间:
stream.keyBy(data->data.user) .process(new KeyedProcessFunction窗口处理函数(){ @Override public void processElement(Event value, Context ctx, Collector out) throws Exception(){ Long currTs = ctx.timestamp(); out.collect(ctx.getCurrentKey() + " 数据到达,时间戳:" + new Timestamp(currTs) + " watermark:"+ ctx.timerService().currentWatermark()); // 注册一个10s后的定时器 ctx.timerService().registerProcessingTimeTimer(currTs + 10 * 1000L); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception() { out.collect(ctx.getCurrentKey() + " 定时器触发,触发时间:"+new Timestamp(timestamp) + " watermark:"+ ctx.timerService().currentWatermark()); } })
窗口处理函数的使用
ProcessWindowFunction解析
要求:统计每隔10s的最受欢迎的URL的前两名,每隔5s更新一次结果。
使用ProcessAllWindowFunction
该方法数据量大的时候,把所有数据放在一个窗口里,不靠谱。
使用KeyedProcessFunction
// 1.按照URL分组,统计窗口内每个URL的访问量 SingleOutputStreamOperator侧输出流urlCountStream = stream .keyBy(data -> data.url) .window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5))) .aggregate(new UrlCountViewExample.UrlViewCountAgg(), new UrlCountViewExample.xxxx()); // 2.对于同一窗口统计出的访问量,进行收集和排序 urlCountStream.keyBy(data -> data.windowEnd) // 窗口关闭时钟 .process(new TopNProcessResult(2)) .print(); // 实现TopNProcessResult public static class TopNProcessResult extends KeyedProcessFunction { // 定义属性N private int n; // 定义列表状态 private ListState urlViewCountListState; public TopNProcessResult(int n){ this.n = n; } // 在运行环境中获取状态 @Override public void open(Configuration parameters) throws Exception{ urlViewCountListState = getRuntimeContext().getListState( new ListStateDescriptor ("url-count-list", Types.POJO(UrlViewCount.class)) ; ); } @Override public void processElement(UrlViewCount value, Context ctx, Collector out) throws Exception{ // 来了数据后,将数据保存到状态中 urlViewCountListState.add(value); // 注册windowEnd + 1ms 的定时器 ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey() + 1); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception{ // 从状态中获取数据 ArrayList urlViewCountArrayList = new ArrayList(); for(UrlViewCount u : urlViewCountListState.get()){ urlViewCountArrayList.add(u); } // 排序 urlViewCountArrayList.sort( new Comparator (){ @Override public int compare(UrlViewCount o1, UrlViewCount o2){ return o2.count.intValue() - o1.count.intValue(); } } ); // 包装信息打印输出 StringBuilder result = new StringBuilder(); result.append("窗口结束时间:" + new Timestamp(ctx.getCurrentKey())); // 取前两个 for(int i=0; i < 2; i++){ UrlViewCount currTuple = urlViewCountArrayList.get(i); String info = "No. "+(i+1)+" " + "url: "+ currTuple.url + " " + "访问量: "+ currTuple.count + "n"; result.append(info); } out.collect(result.toString()); } }
可以用来做分流操作
// 定义侧输出流标签 OutputTagoutputTag = new OutputTag ("side-output"){}; public void processElement(){ // 转换成Long, 输出到主流中 out.collect(Long.valueof(value)); // 转换成String, 输出到侧输出流中 ctx.output(outputTag, String.valueof(value)); } // 获得侧输出流 DataStream stringStream = longStream.getSideOutput(outputTag);



