栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink教程-2-Source

flink教程-2-Source

文章目录
      • 1.Source
        • 1.1 集合
        • 1.2 文件File
        • 1.3 kafka
        • 1.4 自定义数据源UDF

1.Source

数据源

bean

@Data
@AllArgsConstructor
@NoArgsConstructor
public class SensorReading {
    private String id;
    private Long timestamp;
    private Double temperature;
}
1.1 集合

env.fromCollection

public class SourceTest01 {
    public static void main(String[] args)throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream dataStream = env.fromCollection(Arrays.asList(
                new SensorReading("sensor_1", 1111L, 11.1),
                new SensorReading("sensor_2", 1411L, 11.2),
                new SensorReading("sensor_3", 1211L, 11.3),
                new SensorReading("sensor_4", 1215L, 11.6)
        ));

        DataStream streamSource = env.fromElements(1, 2, 3, 4, 5);

        dataStream.print("list");
        streamSource.print("int");

        env.execute();

    }

}

1.2 文件File

env.readTextFile

public class SourceTest02_File {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        String path = "E:\atguiguDemo03\leet-code\flink04_java\src\main\resources\1.txt";
        DataStream dataStream = env.readTextFile(path);
        dataStream.print("txt");
        env.execute();
    }
}

或者用反射

public class SourceTest02_File {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        URL resource = SourceTest02_File.class.getResource("/1.txt");
        DataStream dataStream = env.readTextFile(resource.getPath());
        dataStream.print("txt");
        env.execute();
    }
}

1.3 kafka

maven

		
            org.apache.flink
            flink-connector-kafka-0.11_2.12
            1.10.1
        

env.addSource(new FlinkKafkaConsumer011())

public class SourceTest03_Kafka {
    public static void main(String[] args)throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        Properties properties = new Properties();
        properties.setProperty("", "");
        DataStream dataStream = env.addSource(new FlinkKafkaConsumer011("sensor", new SimpleStringSchema(), properties));
        dataStream.print("txt");
        env.execute();
    }
}

中间可以设置参数

kafka-console-producer.sh --brocker-list locahost:9092 --topic sensor


1.4 自定义数据源UDF
public class SourceTest04_UDF {
    public static void main(String[] args)throws Exception{
        // 自定义数据源
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream dataStream = env.addSource(new MySource());
        dataStream.print("txt");
        env.execute();
    }
    public static class MySource implements SourceFunction{
        private boolean running = true;
        @Override
        public void run(SourceContext sourceContext) throws Exception {
            Random random = new Random();
            HashMap hashMap = new HashMap<>();
            for (int i = 0; i < 10; i++) {
                hashMap.put("sensor_" + (i + 1), random.nextGaussian() * 20 + 60);
            }
            while (running){
                for (String s : hashMap.keySet()) {
                    Double nextTemp = hashMap.get(s) + random.nextGaussian();
                    hashMap.put(s, nextTemp);
                    sourceContext.collect(new SensorReading(s, System.currentTimeMillis(), nextTemp));

                }
                Thread.sleep(1000L);
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/632969.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号