栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

flink 学习(十三)数据流连接 join

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

flink 学习(十三)数据流连接 join

目录
  • 一、inner join
  • 二、sliding-inner-join
  • 三、session-inner-join
  • 四、left-join
  • 五、interval-join


一、inner join

        两个流中的数据,通过join连接,在通过where和equalsTo条件判断后,条件成立并且处在同一个窗口内的数据会触发后续的窗口操作。

1.开启nc

开启两个端口,模拟两个数据来源

nc -lp 8888
nc -lp 8899

2.示例

 @Test
    public void joinTumblingTest() throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
                .setParallelism(1);
        //数据流1
        SingleOutputStreamOperator> stream1 = env.socketTextStream("172.16.10.159", 8888)
                .map(new MapFunction>() {
                    @Override
                    public Tuple2 map(String value) throws Exception {
                        String[] s = value.split(",");
                        return new Tuple2<>(s[0], Integer.parseInt(s[1]));
                    }
                })
                .assignTimestampsAndWatermarks(WatermarkStrategy.>forMonotonousTimestamps().withTimestampAssigner((element, recordTimestamp) -> element.f1));
        ;
        //数据流2
        SingleOutputStreamOperator> stream2 = env.socketTextStream("172.16.10.159", 8899)
                .map(new MapFunction>() {
                    @Override
                    public Tuple2 map(String value) throws Exception {
                        String[] s = value.split(",");
                        return new Tuple2<>(s[0], Integer.parseInt(s[1]));
                    }
                })
                .assignTimestampsAndWatermarks(WatermarkStrategy.>forMonotonousTimestamps().withTimestampAssigner((element, recordTimestamp) -> element.f1));
        ;
        //连接两个数据流
        stream1.join(stream2)
                //第一个流的条件
                .where(new KeySelector, String>() {
                    @Override
                    public String getKey(Tuple2 value) throws Exception {
                        return value.f0;
                    }
                })
                //第二个流的条件
                .equalTo(new KeySelector, String>() {
                    @Override
                    public String getKey(Tuple2 value) throws Exception {
                        return value.f0;
                    }
                })
                //滚动窗口,时间间隔10毫秒
                .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
                .apply(new JoinFunction, Tuple2, Tuple3>() {
                    @Override
                    public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception {
                        return new Tuple3<>(first.f0, first.f1, second.f1);
                    }
                })
                .print("join");
        env.execute("joinTumblingTest");
    }

3.测试

数据流1

a,1
a,5
b,6
a,10

数据流2

a,7
a,8
a,11

由于滑动窗口设置的时间间隔是10毫秒,当窗口关闭的时候,处在0~10毫秒内的数据会触发join操作

结果

join> (a,1,7)
join> (a,1,8)
join> (a,5,7)
join> (a,5,8)

二、sliding-inner-join

        下面测试滑动窗口的内连接

1.示例

滑动窗口时间间隔是4毫秒,滑动间隔是2毫秒

 @Test
    public void joinSlidingTest() throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
                .setParallelism(1);
        //数据流1
        SingleOutputStreamOperator> stream1 = env.socketTextStream("172.16.10.159", 8888)
                .map(new MapFunction>() {
                    @Override
                    public Tuple2 map(String value) throws Exception {
                        String[] s = value.split(",");
                        return new Tuple2<>(s[0], Integer.parseInt(s[1]));
                    }
                })
                .assignTimestampsAndWatermarks(WatermarkStrategy.>forMonotonousTimestamps().withTimestampAssigner((element, recordTimestamp) -> element.f1));
        ;
        //数据流2
        SingleOutputStreamOperator> stream2 = env.socketTextStream("172.16.10.159", 8899)
                .map(new MapFunction>() {
                    @Override
                    public Tuple2 map(String value) throws Exception {
                        String[] s = value.split(",");
                        return new Tuple2<>(s[0], Integer.parseInt(s[1]));
                    }
                })
                .assignTimestampsAndWatermarks(WatermarkStrategy.>forMonotonousTimestamps().withTimestampAssigner((element, recordTimestamp) -> element.f1));
        ;
        //连接两个数据流
        stream1.join(stream2)
                //第一个流的条件
                .where(new KeySelector, String>() {
                    @Override
                    public String getKey(Tuple2 value) throws Exception {
                        return value.f0;
                    }
                })
                //第二个流的条件
                .equalTo(new KeySelector, String>() {
                    @Override
                    public String getKey(Tuple2 value) throws Exception {
                        return value.f0;
                    }
                })
                //滚动窗口,时间间隔10毫秒
                .window(SlidingEventTimeWindows.of(Time.milliseconds(4),Time.milliseconds(2)))
                .apply(new JoinFunction, Tuple2, Tuple3>() {
                    @Override
                    public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception {
                        return new Tuple3<>(first.f0, first.f1, second.f1);
                    }
                })
                .print("sliding-inner-join");
        env.execute("joinSlidingTest");
    }

2.测试

数据流1输入 a,2 和 a,4
数据流2输入a,3 和 a,4

当达到滑动窗口时间间隔4毫秒时触发join连接,打印出 sliding-inner-join> (a,2,3)

接着
数据流1输入 a,5 和 a,6
数据流2输入a,5 和 a,6

此时到达滑动间隔2毫秒,则4~6之间的数据会触发join操作

三、session-inner-join

        下面测试会话窗口的内连接

1.示例

会话窗口,时间间隔10毫秒

	@Test
    public void joinSessionTest() throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
                .setParallelism(1);
        //数据流1
        SingleOutputStreamOperator> stream1 = env.socketTextStream("172.16.10.159", 8888)
                .map(new MapFunction>() {
                    @Override
                    public Tuple2 map(String value) throws Exception {
                        String[] s = value.split(",");
                        return new Tuple2<>(s[0], Integer.parseInt(s[1]));
                    }
                })
                .assignTimestampsAndWatermarks(WatermarkStrategy.>forMonotonousTimestamps().withTimestampAssigner((element, recordTimestamp) -> element.f1));
        ;
        //数据流2
        SingleOutputStreamOperator> stream2 = env.socketTextStream("172.16.10.159", 8899)
                .map(new MapFunction>() {
                    @Override
                    public Tuple2 map(String value) throws Exception {
                        String[] s = value.split(",");
                        return new Tuple2<>(s[0], Integer.parseInt(s[1]));
                    }
                })
                .assignTimestampsAndWatermarks(WatermarkStrategy.>forMonotonousTimestamps().withTimestampAssigner((element, recordTimestamp) -> element.f1));
        ;
        //连接两个数据流
        stream1.join(stream2)
                //第一个流的条件
                .where(new KeySelector, String>() {
                    @Override
                    public String getKey(Tuple2 value) throws Exception {
                        return value.f0;
                    }
                })
                //第二个流的条件
                .equalTo(new KeySelector, String>() {
                    @Override
                    public String getKey(Tuple2 value) throws Exception {
                        return value.f0;
                    }
                })
                //会话窗口,时间间隔10毫秒
                .window(EventTimeSessionWindows.withGap(Time.milliseconds(10)))
                .apply(new JoinFunction, Tuple2, Tuple3>() {
                    @Override
                    public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception {
                        return new Tuple3<>(first.f0, first.f1, second.f1);
                    }
                })
                .print("session-inner-join");
        env.execute("joinSessionTest");
    }

2.测试

数据流1输入 a,3 和 a,14
数据流2输入a,5 和 a,20

四、left-join

两个数据流左连接

1.示例

@Test
    public void leftJoinTest() throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
                .setParallelism(1);
        //数据流1
        SingleOutputStreamOperator> stream1 = env.socketTextStream("172.16.10.159", 8888)
                .map(new MapFunction>() {
                    @Override
                    public Tuple2 map(String value) throws Exception {
                        return null;
                    }
                });
        //数据流2
        SingleOutputStreamOperator> stream2 = env.socketTextStream("172.16.10.159", 8899)
                .map(new MapFunction>() {
                    @Override
                    public Tuple2 map(String value) throws Exception {
                        return null;
                    }
                });
        //连接两个数据流
        stream1.coGroup(stream2)
                //第一个流的条件
                .where(new KeySelector, String>() {
                    @Override
                    public String getKey(Tuple2 value) throws Exception {
                        return value.f0;
                    }
                })
                //第二个流的条件
                .equalTo(new KeySelector, String>() {
                    @Override
                    public String getKey(Tuple2 value) throws Exception {
                        return value.f0;
                    }
                })
                //滚动窗口,时间间隔10毫秒
                .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
                .apply(new CoGroupFunction, Tuple2, Tuple3>() {
                    @Override
                    public void coGroup(Iterable> first, Iterable> second, Collector> out) throws Exception {
                        //左连接
                        for (Tuple2 left : first) {
                            boolean isJoin = false;
                            for (Tuple2 right : second) {
                                isJoin = true;
                                out.collect(new Tuple3<>(left.f0, left.f1, right.f1));
                            }
                            //右侧没有数据
                            if (!isJoin) {
                                out.collect(new Tuple3<>(left.f0, left.f1, null));
                            }
                        }

                    }
                })
                .print("left join");
        env.execute("coGroupTest");
    }

2.测试

当数据时间间隔大于10毫秒时,进行左连接输出

五、interval-join

进行连接的两个流a和b,如果满足
b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound]
或者
a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound
即b的时间戳位于a的时间戳的下限和上限的范围内,可以触发join操作。

1.示例

@Test
    public void intervalJoinTest() throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
                .setParallelism(1);
        //数据流1
        KeyedStream, String> stream1 = env.socketTextStream("172.16.10.159", 8888)
                .map(new MapFunction>() {
                    @Override
                    public Tuple2 map(String value) throws Exception {
                        String[] s = value.split(",");
                        return new Tuple2<>(s[0], Integer.parseInt(s[1]));
                    }
                })
                .assignTimestampsAndWatermarks(WatermarkStrategy.>forMonotonousTimestamps().withTimestampAssigner((element, recordTimestamp) -> element.f1))
                .keyBy(new KeySelector, String>() {
                    @Override
                    public String getKey(Tuple2 value) throws Exception {
                        return value.f0;
                    }
                });
        //数据流2
        KeyedStream, String> stream2 = env.socketTextStream("172.16.10.159", 8899)
                .map(new MapFunction>() {
                    @Override
                    public Tuple2 map(String value) throws Exception {
                        String[] s = value.split(",");
                        return new Tuple2<>(s[0], Integer.parseInt(s[1]));
                    }
                })
                .assignTimestampsAndWatermarks(WatermarkStrategy.>forMonotonousTimestamps().withTimestampAssigner((element, recordTimestamp) -> element.f1))
                .keyBy(new KeySelector, String>() {
                    @Override
                    public String getKey(Tuple2 value) throws Exception {
                        return value.f0;
                    }
                });
        //连接两个数据流
        stream1.intervalJoin(stream2)
                //事件时间
                .inEventTime()
                //定义上下界
                .between(Time.milliseconds(-2), Time.milliseconds(2))
                //不包含下界
                .lowerBoundExclusive()
                .process(new ProcessJoinFunction, Tuple2, Tuple3>() {
                    @Override
                    public void processElement(Tuple2 left, Tuple2 right, Context ctx, Collector> out) throws Exception {
                        out.collect(new Tuple3<>(left.f0, left.f1, right.f1));
                    }
                })
                .print("interval-join");
        env.execute("intervalJoinTest");
    }

2.测试

数据流1输入

a,5

数据,流2输入

a,3
a,4
a,6
a,7
a,8

根据定义的上下界是 -2 和 2,数据流2中数据位于 5-2 和 5+2 之间的数据会进行join操作

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/831209.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号