- 1.Source
- 1.1 集合
- 1.2 文件File
- 1.3 kafka
- 1.4 自定义数据源UDF
数据源
bean
@Data
@AllArgsConstructor
@NoArgsConstructor
public class SensorReading {
private String id;
private Long timestamp;
private Double temperature;
}
1.1 集合
env.fromCollection
public class SourceTest01 {
public static void main(String[] args)throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream dataStream = env.fromCollection(Arrays.asList(
new SensorReading("sensor_1", 1111L, 11.1),
new SensorReading("sensor_2", 1411L, 11.2),
new SensorReading("sensor_3", 1211L, 11.3),
new SensorReading("sensor_4", 1215L, 11.6)
));
DataStream streamSource = env.fromElements(1, 2, 3, 4, 5);
dataStream.print("list");
streamSource.print("int");
env.execute();
}
}
1.2 文件File
env.readTextFile
public class SourceTest02_File {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
String path = "E:\atguiguDemo03\leet-code\flink04_java\src\main\resources\1.txt";
DataStream dataStream = env.readTextFile(path);
dataStream.print("txt");
env.execute();
}
}
或者用反射
public class SourceTest02_File {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
URL resource = SourceTest02_File.class.getResource("/1.txt");
DataStream dataStream = env.readTextFile(resource.getPath());
dataStream.print("txt");
env.execute();
}
}
1.3 kafka
maven
org.apache.flink flink-connector-kafka-0.11_2.121.10.1
env.addSource(new FlinkKafkaConsumer011())
public class SourceTest03_Kafka {
public static void main(String[] args)throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Properties properties = new Properties();
properties.setProperty("", "");
DataStream dataStream = env.addSource(new FlinkKafkaConsumer011("sensor", new SimpleStringSchema(), properties));
dataStream.print("txt");
env.execute();
}
}
中间可以设置参数
kafka-console-producer.sh --brocker-list locahost:9092 --topic sensor
public class SourceTest04_UDF {
public static void main(String[] args)throws Exception{
// 自定义数据源
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream dataStream = env.addSource(new MySource());
dataStream.print("txt");
env.execute();
}
public static class MySource implements SourceFunction{
private boolean running = true;
@Override
public void run(SourceContext sourceContext) throws Exception {
Random random = new Random();
HashMap hashMap = new HashMap<>();
for (int i = 0; i < 10; i++) {
hashMap.put("sensor_" + (i + 1), random.nextGaussian() * 20 + 60);
}
while (running){
for (String s : hashMap.keySet()) {
Double nextTemp = hashMap.get(s) + random.nextGaussian();
hashMap.put(s, nextTemp);
sourceContext.collect(new SensorReading(s, System.currentTimeMillis(), nextTemp));
}
Thread.sleep(1000L);
}
}
@Override
public void cancel() {
running = false;
}
}
}



