栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink-join的三种方式

Flink-join的三种方式

Join

public class EventTumblingWindowJoin {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1000,o001,c001
        DataStreamSource lines1 = env.socketTextStream("linux01", 7777);
        //1200,c001,图书
        DataStreamSource lines2 = env.socketTextStream("linux01", 8888);

        //按照EventTime进行join,窗口长度为5000秒,使用新的提取EventTime生成WaterMark的API

        //提取两个流的Watermark
        SingleOutputStreamOperator lines1WithWatermark
                = lines1.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner() {

            @Override
            public long extractTimestamp(String element, long recordTimestamp) {
                return Long.parseLong(element.split(",")[0]);
            }
        }));

        SingleOutputStreamOperator lines2WithWatermark
                = lines2.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner() {

            @Override
            public long extractTimestamp(String element, long recordTimestamp) {
                return Long.parseLong(element.split(",")[0]);
            }
        }));

        //对两个流进行处理

        SingleOutputStreamOperator> tpStream1
                = lines1WithWatermark.map(new MapFunction>() {

            @Override
            public Tuple3 map(String input) throws Exception {
                String[] fields = input.split(",");
                return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
            }
        });

        SingleOutputStreamOperator> tpStream2
                = lines2WithWatermark.map(new MapFunction>() {

            @Override
            public Tuple3 map(String input) throws Exception {
                String[] fields = input.split(",");
                return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
            }
        });

        //将两个流join
        DataStream> result = tpStream1.join(tpStream2)
                .where(tp1 -> tp1.f2)   //第一个流keyBY的字段
                .equalTo(tp2 -> tp2.f1) //第二个流keyBy的字段
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))   //划分窗口
                //全量聚合的处理逻辑
                .apply(new JoinFunction, Tuple3, Tuple5>() {
                    //窗口触发后,条件相同的,并且在同一个窗口内的数据,会传入到join方法中
                    @Override
                    public Tuple5 join(Tuple3 first, Tuple3 second) throws Exception {
                        return Tuple5.of(first.f0,first.f1,first.f2,second.f0,second.f2);
                    }
                });

        result.print();

        env.execute();
    }
}

LeftOuterJoin

public class EventTumblingWindowLeftOuterJoin {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1000,o001,c001
        DataStreamSource lines1 = env.socketTextStream("linux01", 7777);
        //1200,c001,图书
        DataStreamSource lines2 = env.socketTextStream("linux01", 8888);

        //按照EventTime进行join,窗口长度为5000秒,使用新的提取EventTime生成WaterMark的API

        //提取两个流的Watermark
        SingleOutputStreamOperator lines1WithWatermark
                = lines1.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner() {

            @Override
            public long extractTimestamp(String element, long recordTimestamp) {
                return Long.parseLong(element.split(",")[0]);
            }
        }));

        SingleOutputStreamOperator lines2WithWatermark
                = lines2.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner() {

            @Override
            public long extractTimestamp(String element, long recordTimestamp) {
                return Long.parseLong(element.split(",")[0]);
            }
        }));

        //对两个流进行处理

        SingleOutputStreamOperator> tpStream1
                = lines1WithWatermark.map(new MapFunction>() {

            @Override
            public Tuple3 map(String input) throws Exception {
                String[] fields = input.split(",");
                return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
            }
        });

        SingleOutputStreamOperator> tpStream2
                = lines2WithWatermark.map(new MapFunction>() {

            @Override
            public Tuple3 map(String input) throws Exception {
                String[] fields = input.split(",");
                return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
            }
        });

        //将两个流leftOuterJoin
        DataStream> result = tpStream1.coGroup(tpStream2)
                .where(tp1 -> tp1.f2) //第一个流keyBy的字段
                .equalTo(tp2 -> tp2.f1)//第二个流keyBy的字段
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))//划分窗口
                .apply(new CoGroupFunction, Tuple3, Tuple5>() {
                    
                    @Override
                    public void coGroup(Iterable> first, Iterable> second, Collector> out) throws Exception {
                        for (Tuple3 left : first) {
                            //实现左外连接
                            //先循环左流的数据
                            boolean isEmpty = false;
                            for (Tuple3 right : second) {
                                isEmpty = true;
                                out.collect(Tuple5.of(left.f0, left.f1, left.f2, right.f0, right.f2));
                            }
                            if (!isEmpty) {
                                out.collect(Tuple5.of(left.f0, left.f1, left.f2, null, null));
                            }
                        }
                    }
                });

        result.print();

        env.execute();
    }
}

intervalJoin

public class EventTumblingWindowIntervalJoin {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1000,o001,c001
        DataStreamSource lines1 = env.socketTextStream("linux01", 7777);
        //1200,c001,图书
        DataStreamSource lines2 = env.socketTextStream("linux01", 8888);

        //按照EventTime进行join,窗口长度为5000秒,使用新的提取EventTime生成WaterMark的API

        //提取两个流的Watermark
        SingleOutputStreamOperator lines1WithWatermark
                = lines1.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner() {

            @Override
            public long extractTimestamp(String element, long recordTimestamp) {
                return Long.parseLong(element.split(",")[0]);
            }
        }));

        SingleOutputStreamOperator lines2WithWatermark
                = lines2.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner() {

            @Override
            public long extractTimestamp(String element, long recordTimestamp) {
                return Long.parseLong(element.split(",")[0]);
            }
        }));

        //对两个流进行处理

        SingleOutputStreamOperator> tpStream1
                = lines1WithWatermark.map(new MapFunction>() {

            @Override
            public Tuple3 map(String input) throws Exception {
                String[] fields = input.split(",");
                return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
            }
        });

        SingleOutputStreamOperator> tpStream2
                = lines2WithWatermark.map(new MapFunction>() {

            @Override
            public Tuple3 map(String input) throws Exception {
                String[] fields = input.split(",");
                return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
            }
        });

        //将两个流join
        KeyedStream, String> keyedStream1 = tpStream1.keyBy(tp -> tp.f2);
        KeyedStream, String> keyedStream2 = tpStream2.keyBy(tp -> tp.f1);

        SingleOutputStreamOperator> result = keyedStream1.intervalJoin(keyedStream2)
                .between(Time.seconds(-1), Time.seconds(1))  //指定的时间范围
                .upperBoundExclusive() //不包括上界
                .process(new ProcessJoinFunction, Tuple3, Tuple5>() {
                    @Override
                    public void processElement(Tuple3 left, Tuple3 right, Context ctx, Collector> out) throws Exception {

                        out.collect(Tuple5.of(left.f0,left.f1,left.f2,right.f0,right.f2));

                    }
                });


        result.print();

        env.execute();
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/745031.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号