栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink 动态读取配置文件(flink菜鸟教程)

flink 动态读取配置文件(flink菜鸟教程)

Flink Source

进入flink的数据源大致分为以下几类:

    集合 Collection文件 FileKafkaUDF

一般都是使用前三个source源即可,如果想要使用其他数据源就可以自定义数据源即UDF。

Collection
public static void main(String[] args) throws Exception {
//指定运行环境
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置全局并行度
      env.setParallelism(1);
//1.使用fromCollection函数
      DataStreamSource inputStream = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5));
//2.使用fromElements函数
      DataStreamSource inputStream1 = env.fromElements(1, 2, 3, 4, 5);
//打印输出
      inputStream.print();
      inputStream1.print();
//执行任务
      env.execute();
  }
File
 public static void main(String[] args) throws Exception {
        //获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置全局并行度
        env.setParallelism(1);
        //读取数据
        String s = "hello.text";
        DataStream inputDataStream = env.readTextFile(s);
        //打印数据
        inputDataStream.print("out");
        //执行任务
        env.execute("jobname");
    }
Kafka
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    
    //kafka连接参数配置
    Properties prop = new Properties();
    prop.put("bootstrap.servers","hadoop102:9092");//设置需要连接的主机
    prop.put("zookeeper.connect","hadoop102:2181");
    prop.put("group.id","first");
    prop.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
    prop.put("value.serializer","org.apache.kafka.common,serialization.StringSerializer");
    prop.put("auto.offset.rest","latest");
    //通过addSource获取一般数据源
    //注意这里使用了kafka与flink的连接器,如果没有在pom.xml文件中依赖中添加:
//
//    org.apache.flink
//    flink-connector-kafka_${scala.binary.version}
//    ${flink.version}
//
    DataStreamSource inputStream = 
   env.addSource(new FlinkKafkaConsumer("first", new SimpleStringSchema(), prop));//注意获取的数据是String类型.
    
    inputStream.print();
    
    env.execute();
    }
UDF(自定义source源)
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //使用自定义source,需要实现一个SourceFunction,或者他的富函数RichSourceFunction
       DataStreamSource inputStream= env.addSource(new MysourceFunciton());
        inputStream.print();
        env.execute();
    }
    public static class MysourceFunciton extends RichSourceFunction {
        private Integer start;
        @Override
        public void open(Configuration parameters) throws Exception {
            start=1;
        }

        @Override
        public void run(SourceContext ctx) throws Exception {
            while(start!=0){
                ctx.collect("hello wolrd");
            }
        }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/771702.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号