split:
DataStream → SplitStream :根据某些特征把一个 DataStream 拆分成两个或者多个 DataStream 。 Select: SplitStream → DataStream :从一个 SplitStream 中获取一个或者多个DataStreampackage com.atguigu.transfrom;
import com.atguigu.bean.SensorReading;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Collections;
public class Test4 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
String path ="D:\大数据组件API\Flink\Flink01\src\main\resources\test.txt";
DataStreamSource dataStream = env.readTextFile(path);
DataStream map = dataStream.map( value -> {
String[] split = value.split(",");
return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));
});
SplitStream split = map.split(new OutputSelector() {
@Override
public Iterable select(SensorReading value) {
return (value.getTemperature() > 30 ? Collections.singletonList("high") : Collections.singletonList("low"));
}
});
DataStream high = split.select("high");
DataStream low = split.select("low");
DataStream all = split.select("all");
high.print();
env.execute();
}
}
Connect 和 CoMap
connect:
DataStream,DataStream → ConnectedStreams :连接两个保持他们类型的数据流,两个数据流被 Connect 之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立 comap,coflatmap ConnectedStreams → DataStream :作用于 ConnectedStreams 上,功能与 map和 flatMap 一样,对 ConnectedStreams 中的每一个 Stream 分别进行 map 和 flatMap处理package com.atguigu.transfrom;
import com.atguigu.bean.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.util.Collector;
import java.util.Collections;
public class Test4 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
String path ="D:\大数据组件API\Flink\Flink01\src\main\resources\test.txt";
DataStreamSource dataStream = env.readTextFile(path);
DataStream map = dataStream.map( value -> {
String[] split = value.split(",");
return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));
});
SplitStream split = map.split(new OutputSelector() {
@Override
public Iterable select(SensorReading value) {
return (value.getTemperature() > 30 ? Collections.singletonList("high") : Collections.singletonList("low"));
}
});
DataStream high = split.select("high");
DataStream low = split.select("low");
DataStream all = split.select("all");
SingleOutputStreamOperator> warnStream = high.map(new MapFunction>() {
@Override
public Tuple2 map(SensorReading value) throws Exception {
return new Tuple2<>(value.getId(), value.getTemperature());
}
});
ConnectedStreams, SensorReading> connectedStreams = warnStream.connect(low);
SingleOutputStreamOperator outputStreamOperator = connectedStreams.map(new CoMapFunction, SensorReading, String>() {
@Override
public String map1(Tuple2 value) throws Exception {
return value.getField(1);
}
@Override
public String map2(SensorReading value) throws Exception {
return value.getId();
}
});
outputStreamOperator.print();
env.execute();
}
}



