栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

大数据之flink中join用法

大数据之flink中join用法

1、将两个流中的数据进行join处理

package cn._51doit.flink.day05;


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

public class TumblingWindowJoinDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //1000,A,1
        DataStreamSource leftLines = env.socketTextStream("localhost", 8888);
        //2000,A,2
        DataStreamSource rightLines = env.socketTextStream("localhost", 9999);

        //提取第一个流中数据的EventTime
        DataStream leftWaterMarkStream = leftLines
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(0)) {
                    @Override
                    public long extractTimestamp(String line) {
                        return Long.parseLong(line.split(",")[0]);
                    }
                });
        //提取第二个流中数据的EventTime
        DataStream rightWaterMarkStream = rightLines
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(0)) {
                    @Override
                    public long extractTimestamp(String line) {
                        return Long.parseLong(line.split(",")[0]);
                    }
                });
        //对第一个流整理成tuple3
        DataStream> leftStream = leftWaterMarkStream.map(
                new MapFunction>() {
                    @Override
                    public Tuple3 map(String value) throws Exception {
                        String[] fields = value.split(",");
                        return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
                    }
                }
        );
        //对第二个流整理成tuple3
        DataStream> rightStream = rightWaterMarkStream.map(
                new MapFunction>() {
                    @Override
                    public Tuple3 map(String value) throws Exception {
                        String[] fields = value.split(",");
                        return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
                    }
                }
        );
        //第一个流(左流)调用join方法关联第二个流(右流),并且在where方法和equalTo方法中分别指定两个流join的条件
        DataStream> joinedStream = leftStream.join(rightStream)
                .where(new KeySelector, String>() {
                    @Override
                    public String getKey(Tuple3 value) throws Exception {
                        return value.f1; //将左流tuple3中的f1作为join的key
                    }
                })
                .equalTo(new KeySelector, String>() {
                    @Override
                    public String getKey(Tuple3 value) throws Exception {
                        return value.f1; //将右流tuple3中的f1作为join的key
                    }
                })
                .window(TumblingEventTimeWindows.of(Time.seconds(5))) //划分EventTime滚动窗口,窗口长度为5秒
                .apply(new MyInnerJoinFunction()); //在apply方法中传入自定义的MyInnerJoinFunction
        joinedStream.print(); //调用print sink 输出结果
        env.execute("TumblingWindowJoinDemo");
    }

}
package cn._51doit.flink.day05;

import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple6;

public class MyInnerJoinFunction implements JoinFunction<
        Tuple3, //第一个数据流(左流)输入的数据类型
        Tuple3, //第二个数据流(右流)输入的数据类型
        Tuple6> { //join后输出的数据类型
    //第一个流和第二个流输入的数据在同一个时间窗口内并且join的key相同才会调用join方法
    @Override
    public Tuple6 join(
            Tuple3 left, //第一个数据流(左流)输入的一条数据
            Tuple3 right) //第二个数据流(右流)输入的一条数据
            throws Exception {
        //能join将两个流的数据放入tuple6中,并返回输出
        return Tuple6.of(left.f0, left.f1, left.f2, right.f0, right.f1, right.f2);
    }
}

2、左外连接

package cn._51doit.flink.day05;


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

public class TumblingWindowLeftOuterJoinDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //1000,A,1
        DataStreamSource leftSteam = env.socketTextStream("localhost", 8888);
        //2000,A,2
        DataStreamSource rightStream = env.socketTextStream("localhost", 9999);

        //提取第一个流中数据的EventTime
        DataStream leftWaterMarkStream = leftSteam
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(0)) {
                    @Override
                    public long extractTimestamp(String line) {
                        return Long.parseLong(line.split(",")[0]);
                    }
                });
        //提取第二个流中数据的EventTime
        DataStream rightWaterMarkStream = rightStream
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(0)) {
                    @Override
                    public long extractTimestamp(String line) {
                        return Long.parseLong(line.split(",")[0]);
                    }
                });
        //对第一个流整理成tuple3
        DataStream> leftTuple = leftWaterMarkStream.map(
                new MapFunction>() {
                    @Override
                    public Tuple3 map(String value) throws Exception {
                        String[] fields = value.split(",");
                        return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
                    }
                }
        );
        //对第二个流整理成tuple3
        DataStream> rightTuple = rightWaterMarkStream.map(
                new MapFunction>() {
                    @Override
                    public Tuple3 map(String value) throws Exception {
                        String[] fields = value.split(",");
                        return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
                    }
                }
        );
        //第一个流(左流)和第二个流(右流)进行LeftOuterJoin
		//在同一个窗口,并且join的条件相等,第一个流中的数据没join上也输出
        DataStream> joinedStream = leftTuple.coGroup(rightTuple)
                .where(new KeySelector, String>() {
                    @Override
                    public String getKey(Tuple3 value) throws Exception {
                        return value.f1;
                    }
                })
                .equalTo(new KeySelector, String>() {
                    @Override
                    public String getKey(Tuple3 value) throws Exception {
                        return value.f1;
                    }
                })
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new MyLeftOuterJoinFunction());

        joinedStream.print();

        env.execute();
    }
}
package cn._51doit.flink.day05;

import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.util.Collector;

public class MyLeftOuterJoinFunction implements CoGroupFunction<
        Tuple3, //左流输入的数据类型
        Tuple3, //右流输入的数据类型
        Tuple6> { //输出的数据类型
		
    @Override
    public void coGroup(Iterable> first,
                        Iterable> second,
                        Collector> out) throws Exception {
        //循环左流的数据,如果有数据说明触发窗口时左流中有数据
        for (Tuple3 left : first) {
            boolean hasJoined = false;
            //循环右流的数据,如果有数据说明触发窗口时右流中有数据,即join上流
            for (Tuple3 right : second) {
                //返回两个流join上的数据
                out.collect(Tuple6.of(left.f0, left.f1, left.f2, right.f0, right.f1, right.f2));
                hasJoined = true;
            }
            //如果没有join上,只返回左流的数据
            if (!hasJoined) {
                out.collect(Tuple6.of(left.f0, left.f1, left.f2, null, null, null));
            }
        }
    }
}

3、右外连接

package cn._51doit.flink.day05;


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

public class TumblingWindowRightOuterJoinDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //1000,A,1
        DataStreamSource leftSteam = env.socketTextStream("localhost", 8888);
        //2000,A,2
        DataStreamSource rightStream = env.socketTextStream("localhost", 9999);

        //提取第一个流中数据的EventTime
        DataStream leftWaterMarkStream = leftSteam
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(0)) {
                    @Override
                    public long extractTimestamp(String line) {
                        return Long.parseLong(line.split(",")[0]);
                    }
                });
        //提取第二个流中数据的EventTime
        DataStream rightWaterMarkStream = rightStream
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(0)) {
                    @Override
                    public long extractTimestamp(String line) {
                        return Long.parseLong(line.split(",")[0]);
                    }
                });
        //对第一个流整理成tuple3
        DataStream> leftTuple = leftWaterMarkStream.map(
                new MapFunction>() {
                    @Override
                    public Tuple3 map(String value) throws Exception {
                        String[] fields = value.split(",");
                        return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
                    }
                }
        );
        //对第二个流整理成tuple3
        DataStream> rightTuple = rightWaterMarkStream.map(
                new MapFunction>() {
                    @Override
                    public Tuple3 map(String value) throws Exception {
                        String[] fields = value.split(",");
                        return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
                    }
                }
        );
        //调用coGroup实现left join
        DataStream> joinedStream = leftTuple.coGroup(rightTuple)
                .where(new KeySelector, String>() {
                    @Override
                    public String getKey(Tuple3 value) throws Exception {
                        return value.f1;
                    }
                })
                .equalTo(new KeySelector, String>() {
                    @Override
                    public String getKey(Tuple3 value) throws Exception {
                        return value.f1;
                    }
                })
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new MyRightOuterJoinFunction());

        joinedStream.print();

        env.execute();
    }
}
package cn._51doit.flink.day05;

import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.util.Collector;

public class MyRightOuterJoinFunction implements CoGroupFunction<
        Tuple3, //左流输入的数据类型
        Tuple3, //右流输入的数据类型
        Tuple6> { //输出的数据类型
    @Override
    public void coGroup(Iterable> first,
                        Iterable> second,
                        Collector> out) throws Exception {
        //循环右流的数据,如果有数据说明触发窗口时右流中有数据
        for (Tuple3 right : second) {
            boolean hasJoined = false;
            //循环左流的数据,如果有数据说明触发窗口时左流中有数据,即join上流
            for (Tuple3 left : first) {
                //返回两个流join上的数据
                out.collect(Tuple6.of(left.f0, left.f1, left.f2, right.f0, right.f1, right.f2));
                hasJoined = true;
            }
            //如果没有join上,只返回右流的数据
            if (!hasJoined) {
                out.collect(Tuple6.of(null, null, null, right.f0, right.f1, right.f2));
            }
        }
    }
}

4、interval Join

key相等,设置数据存活的范围

package cn._51doit.flink.day05;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;

public class IntervalJoinDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //1000,A,1
        DataStreamSource leftLines = env.socketTextStream("localhost", 8888);
        //2000,A,2
        DataStreamSource rightLines = env.socketTextStream("localhost", 9999);

        //提取第一个流中数据的EventTime
        DataStream leftWaterMarkStream = leftLines
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(0)) {
                    @Override
                    public long extractTimestamp(String line) {
                        return Long.parseLong(line.split(",")[0]);
                    }
                });
        //提取第二个流中数据的EventTime
        DataStream rightWaterMarkStream = rightLines
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(0)) {
                    @Override
                    public long extractTimestamp(String line) {
                        return Long.parseLong(line.split(",")[0]);
                    }
                });
        //对第一个流整理成tuple3
        DataStream> leftStream = leftWaterMarkStream.map(
                new MapFunction>() {
                    @Override
                    public Tuple3 map(String value) throws Exception {
                        String[] fields = value.split(",");
                        return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
                    }
                }
        );
        //对第二个流整理成tuple3
        DataStream> rightStream = rightWaterMarkStream.map(
                new MapFunction>() {
                    @Override
                    public Tuple3 map(String value) throws Exception {
                        String[] fields = value.split(",");
                        return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
                    }
                }
        );
        DataStream> joinedStream = leftStream
                .keyBy(t -> t.f1) //指定第一个流分组KeySelector
                .intervalJoin(rightStream.keyBy(t -> t.f1)) //调用intervalJoin方法并指定第二个流的分组KeySelector
                .between(Time.seconds(-1), Time.seconds(1)) //设置join的时间区间范围为当前数据时间±1秒
                .upperBoundExclusive() //默认join时间范围为前后都包括的闭区间,现在设置为前闭后开区间
                .process(new MyProcessJoinFunction()); //调用process方法中传入自定义的MyProcessJoinFunction
        joinedStream.print(); //调用print sink 输出结果
        env.execute("IntervalJoinDemo");
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/743022.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号