package com.wjy.api.transform;
import com.wjy.api.beans.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class TransformMutipleStream {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
final DataStream fileDataStreamSource = env.readTextFile("E:\work\baozun\dataX\gitee\gmall\flink-tutorial\src\main\resources\sensor.txt");
//转换成SensorReading
DataStream sensorReading = fileDataStreamSource.map(new MapFunction() {
public SensorReading map(String s) throws Exception {
String[] fields = s.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
}
});
//分流操作,按照温度30
OutputTag high = new OutputTag<>("high", TypeInformation.of(SensorReading.class));
OutputTag low = new OutputTag<>("low", TypeInformation.of(SensorReading.class));
SingleOutputStreamOperator processDataStream = sensorReading.process(new ProcessFunction() {
@Override
public void processElement(SensorReading value, Context ctx, Collector out) throws Exception {
if(value.getTemperature()>30){
ctx.output(high, value);
}else{
ctx.output(low, value);
}
}
});
DataStream highDataStream = processDataStream.getSideOutput(high);
DataStream lowDataStream = processDataStream.getSideOutput(low);
highDataStream.print("high");
lowDataStream.print("low");
env.execute();
}
}