栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink 双流 合并join 操作 connect

Flink 双流 合并join 操作 connect

package com.cn.stream.joins;


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.util.Properties;


public class cdjoin {
    private static OutputTag> first = new OutputTag<>("first", new TypeHint>() {
    }.getTypeInfo());
    private static OutputTag> second = new OutputTag<>("second", new TypeHint>() {
    }.getTypeInfo());

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "tesst");

        FlinkKafkaConsumer myConsumer =
                new FlinkKafkaConsumer<>("kafkaS", new SimpleStringSchema(), properties);
        DataStreamSource stringDataStreamSource = env.addSource(myConsumer);

        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "tesst");

        FlinkKafkaConsumer myConsumers =
                new FlinkKafkaConsumer<>("kafkaSS", new SimpleStringSchema(), properties);
        DataStreamSource stringDataStreamSource1 = env.addSource(myConsumers);

        KeyedStream, String> tuple3StringKeyedStream = stringDataStreamSource.map(new MapFunction>() {
            @Override
            public Tuple3 map(String value) throws Exception {
                String[] split = value.split(",");

                return new Tuple3(split[0], split[1], split[2]);
            }
        }).keyBy(new KeySelector, String>() {
            @Override
            public String getKey(Tuple3 value) throws Exception {
                return value.f2;
            }
        });
        KeyedStream, String> tuple3StringKeyedStream1 = stringDataStreamSource1.map(new MapFunction>() {
            @Override
            public Tuple3 map(String value) throws Exception {
                String[] split = value.split(",");

                return new Tuple3(split[0], split[1], split[2]);
            }
        }).keyBy(new KeySelector, String>() {
            @Override
            public String getKey(Tuple3 value) throws Exception {
                return value.f0;
            }
        });

        SingleOutputStreamOperator> process = tuple3StringKeyedStream.connect(tuple3StringKeyedStream1).process(new CoProcessFunction, Tuple3, Tuple6>() {
            ValueState> stateFirst = null;
            ValueState> stateSecond = null;
            ValueState Timestate = null;

            
            @Override
            public void processElement1(Tuple3 value, CoProcessFunction, Tuple3, Tuple6>.Context ctx, Collector> out) throws Exception {
                Tuple3 value2 = stateSecond.value();
                //判断第二个数据到来了么 如果到了就直接和 value 添加 一并输出
                if (value2 != null) {
                    // 关联上了 发送数据
                    out.collect(new Tuple6(value.f0, value.f1, value.f2, value2.f0, value2.f1, value2.f2));
                    // 清空 第二个流的数据 保证状态清除 不会造成内存溢出
                    stateSecond.clear();
                    // 关联上了 删除定时器 防止错误发送 测流 不删除 到时间会 发送到测流 但是 数据已经 关联
                    ctx.timerService().deleteProcessingTimeTimer(Timestate.value());
                    //  清除 注册时间的状态  也是防止内存 一直保存
                    Timestate.clear();
                } else {
                    //如果没到 将 此值保存到状态中 保存等待其他 流调用
                    stateFirst.update(value);
                    //获得 当前的处理时间 也就是 系统时间 在此之上 添加 一个 需要多久才能触发 ontimer 的时间 单位是毫秒
                    long l = ctx.timerService().currentProcessingTime();
                    //我这里 时间 用 一个Long 类型的状态去保存 目的是 如果 流关联到到了 及时将这个触发器删除
                    Timestate.update(l + 20000l);
                    // 注册触发器
                    ctx.timerService().registerProcessingTimeTimer(Timestate.value());
                }
            }

            
            @Override
            public void processElement2(Tuple3 value, CoProcessFunction, Tuple3, Tuple6>.Context ctx, Collector> out) throws Exception {
                Tuple3 value1 = stateFirst.value();
                if (value1 != null) {
                    out.collect(new Tuple6(value1.f0, value1.f1, value1.f2, value.f0, value.f1, value.f2));
                    stateFirst.clear();
                    ctx.timerService().deleteProcessingTimeTimer(Timestate.value());
                    Timestate.clear();
                } else {
                    stateSecond.update(value);
                    long l = ctx.timerService().currentProcessingTime();
                    Timestate.update(l + 20000l);
                    ctx.timerService().registerProcessingTimeTimer(Timestate.value());
                }
            }

            
            @Override
            public void onTimer(long timestamp, CoProcessFunction, Tuple3, Tuple6>.onTimerContext ctx, Collector> out) throws Exception {
                Tuple3 value1 = stateFirst.value();
                Tuple3 value2 = stateSecond.value();
                if (value1 != null) {
                    ctx.output(first, value1);
                }
                if (value2 != null) {
                    ctx.output(second, value2);
                }
                stateFirst.clear();
                stateSecond.clear();
            }

            
            @Override
            public void open(Configuration parameters) throws Exception {
                ValueStateDescriptor> firstStream = new ValueStateDescriptor<>("firstStream", new TypeHint>() {
                }.getTypeInfo());
                ValueStateDescriptor> secondStream = new ValueStateDescriptor<>("secondStream", new TypeHint>() {
                }.getTypeInfo());
                ValueStateDescriptor timeState = new ValueStateDescriptor<>("timeState", Long.class);
                stateFirst = getRuntimeContext().getState(firstStream);
                stateSecond = getRuntimeContext().getState(secondStream);
                Timestate = getRuntimeContext().getState(timeState);
            }

            @Override
            public void close() throws Exception {
                stateFirst.clear();
                stateSecond.clear();
            }
        });
        process.print();
        process.getSideOutput(first).print("first");
        process.getSideOutput(second).print("second");
        env.execute("Stream join");

    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/457359.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号