栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink流处理——Transform多流转换

Flink流处理——Transform多流转换

Split 和 Select

split:

DataStream → SplitStream :根据某些特征把一个 DataStream 拆分成两个或者多个 DataStream 。 Select: SplitStream → DataStream :从一个 SplitStream 中获取一个或者多个DataStream
package com.atguigu.transfrom;

import com.atguigu.bean.SensorReading;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Collections;

public class Test4 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        String path ="D:\大数据组件API\Flink\Flink01\src\main\resources\test.txt";
        DataStreamSource dataStream = env.readTextFile(path);

        DataStream map = dataStream.map( value -> {
            String[] split = value.split(",");
            return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));
        });

        SplitStream split = map.split(new OutputSelector() {
            @Override
            public Iterable select(SensorReading value) {
                return (value.getTemperature() > 30 ? Collections.singletonList("high") : Collections.singletonList("low"));
            }
        });

        DataStream high = split.select("high");
        DataStream low = split.select("low");
        DataStream all = split.select("all");
        

        high.print();
        env.execute();
    }
}
Connect 和 CoMap

connect:

DataStream,DataStream → ConnectedStreams :连接两个保持他们类型的数据流,两个数据流被 Connect 之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立 comap,coflatmap ConnectedStreams → DataStream :作用于 ConnectedStreams 上,功能与 map和 flatMap 一样,对 ConnectedStreams 中的每一个 Stream 分别进行 map 和 flatMap处理
package com.atguigu.transfrom;

import com.atguigu.bean.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.util.Collector;

import java.util.Collections;

public class Test4 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        String path ="D:\大数据组件API\Flink\Flink01\src\main\resources\test.txt";
        DataStreamSource dataStream = env.readTextFile(path);

        DataStream map = dataStream.map( value -> {
            String[] split = value.split(",");
            return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));
        });

        SplitStream split = map.split(new OutputSelector() {
            @Override
            public Iterable select(SensorReading value) {
                return (value.getTemperature() > 30 ? Collections.singletonList("high") : Collections.singletonList("low"));
            }
        });

        DataStream high = split.select("high");
        DataStream low = split.select("low");
        DataStream all = split.select("all");

        SingleOutputStreamOperator> warnStream = high.map(new MapFunction>() {
            @Override
            public Tuple2 map(SensorReading value) throws Exception {
                return new Tuple2<>(value.getId(), value.getTemperature());
            }
        });

        ConnectedStreams, SensorReading> connectedStreams = warnStream.connect(low);

        SingleOutputStreamOperator outputStreamOperator = connectedStreams.map(new CoMapFunction, SensorReading, String>() {
            @Override
            public String map1(Tuple2 value) throws Exception {
                return value.getField(1);
            }

            @Override
            public String map2(SensorReading value) throws Exception {
                return value.getId();
            }
        });


        outputStreamOperator.print();
        env.execute();
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/487633.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号