栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink窗口聚合案例(增量聚合、全量聚合)

Flink窗口聚合案例(增量聚合、全量聚合)

上面我们time window和count window讨论了聚合,下面我们从另外一个角度对window进行分类,从聚合角度。
window的集合操作分为2种:一种是增量聚合,一种时候全量聚合,增量聚合是指窗口内每进入一条数据就计算一次,而全量聚合是指在窗口被处罚的时候才会对窗口内的所有数据进行一次聚合

1、增量聚合:窗口内每进入一条数据就计算一次

常见的增量聚合的函数有:reduce(reduceFunction)、aggregate(aggregateFunction)、sum()、min()、max() 

案列: 

package Flink_Window;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

//flink原生支持的无界和操作
public class SocketInfiniteWindow2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource streamSource=env.socketTextStream("192.168.208.112",8821,"n");
        DataStream> windowCounts = streamSource.flatMap(new FlatMapFunction>() {
            @Override
            public void flatMap(String s, Collector> collector) throws Exception {

                String[] split=s.split("\W+");
                for(String word:split){
                    collector.collect(Tuple2.of(word,1));
                }
            }
        });
        DataStream> result=windowCounts
                .keyBy(0)
                .timeWindow(Time.seconds(100))
                .reduce(new ReduceFunction>() {
                    @Override
                    public Tuple2 reduce(Tuple2 t0, Tuple2 t1) throws Exception {
                        System.out.print("算子:"+Thread.currentThread().getId()+"集合的元素是:"+t0+"t"+t1);
                        return Tuple2.of(t0.f0,t0.f1+t1.f1);
                    }
                });
        result.print();
        env.execute("SocketInfiniteWindow2");
    }
}

2、全量聚合
 

全量集合指当属于窗口内的数据都到齐了,才开始聚合计算,可以实现对窗口内的数据进行排序等需求,常见的全量聚合函数有:WindowFunction类当中的apply方法和ProcessWindowFunction类当中的process方法,注意:ProcessWindowFunction类比WindowFunction类提供更多的上下文信息,即content上下文信息。

 案列:

package Flink_Window;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;

//flink原生支持的无界和操作
public class SocketInfiniteWindow2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource streamSource=env.socketTextStream("192.168.208.112",8821,"n");
        DataStream> windowCounts = streamSource.flatMap(new FlatMapFunction>() {
            @Override
            public void flatMap(String s, Collector> collector) throws Exception {

                String[] split=s.split("\W+");
                for(String word:split){
                    collector.collect(Tuple2.of(word,1));
                }
            }
        });
        //增量聚合
//        DataStream> result=windowCounts
//                .keyBy(0)
//                .timeWindow(Time.seconds(100))
//                .reduce(new ReduceFunction>() {
//                    @Override
//                    public Tuple2 reduce(Tuple2 t0, Tuple2 t1) throws Exception {
//                        System.out.print("算子:"+Thread.currentThread().getId()+"集合的元素是:"+t0+"t"+t1);
//                        return Tuple2.of(t0.f0,t0.f1+t1.f1);
//                    }
//                });
//        result.print();
        //全量聚合最重要的俩个类(ProcessFunction类和Context类)
        DataStream> result=windowCounts
                .keyBy(0)
                .timeWindow(Time.seconds(100))
                .process(new ProcessWindowFunction, Tuple2, Tuple, TimeWindow>() {
                    @Override
                    public void process(Tuple key, Context context, Iterable> v2s, Collector> collector) throws Exception {
                        SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                        System.out.print("------------");
                        System.out.print("算子是:"+Thread.currentThread().getId()+"窗口范围:"+sdf.format(context.window().getStart())+"t"+sdf.format(context.window().getEnd()));
                        System.out.print("算子是:"+Thread.currentThread().getId()+"窗口范围:"+context.window().getStart()+"t"+context.window().getEnd());
                        int sum=0;
                        for(Tuple2 v2:v2s){
                            sum +=1;
                        }
                        collector.collect(Tuple2.of(key.getField(0),sum));
                    }
                });
        result.print();
        env.execute("SocketInfiniteWindow2");
    }
}
全量聚合最重要的俩个类(ProcessFunction类和Context类)

如果使用apply方法,没有context方法:

 DataStream>result=windowCounts
            .keyBy(0)
            .timeWindow(Time.seconds(10),Time.seconds(2))
            .apply(new WindowFunction,Tuple2,Tuple,TimeWindow>(){
                @Override
                public void apply(Tuple key,TimeWindow window,Iterable> v2s,Collector> collect>){
                    int sum =0;
                    for(Tuple2 v2:v2s){
                        sum +=1;
                    }
                    collect.collect(Tuple2.of(key.getField(0),sum));
                }

            });

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/784463.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号