栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink 基础demo[一] flink sql 连接kafka demo

flink 基础demo[一] flink sql 连接kafka demo

        使用ddl 用flink 创建表很简单,例子很多,阿里云文档相对通俗易懂,我贴在下面:

创建消息队列Kafka源表 - 实时计算Flink版 - 阿里云

        但是平平无奇的语句,怎么能难得懂踩坑小天才,我设置的kafka发送时间太长了,导致checkpoint不合适。通过设置拯救自己。

  EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .inStreamingMode().useBlinkPlanner().build();
        TableEnvironment tEnv = TableEnvironment.create(settings);
        // TableEnvironment tableEnv = TableEnvironment.create(settings);
        Configuration configuration = tEnv.getConfig().getConfiguration();

        configuration.setString("table.exec.mini-batch.enabled", "true");
        configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
        configuration.setString("table.exec.mini-batch.size", "50");
        configuration.setString("table.dml-sync", "false");
        configuration.setString("execution.checkpointing.interval", "10 s");

        // 3.文件path
        String filePath = "172.17.0.1:9092";
        String filePath1 = "172.17.0.1:2181";
        String topic="testflink";
        String group_id="testcccc";
        String startup="latest-offset";

        String ddl = "create table mytest1 (n" +
        " id Int,n" +
        " hf Double,n" +
        "proctime AS PROCTIME()n" +
        ") WITH (n" +
        " 'connector' = 'kafka',n"+
        "'scan.startup.mode'='earliest-offset',n"+
        " 'topic' = '"+topic+"',n" +
        " 'properties.bootstrap.servers' = '"+filePath+"',n" +
        " 'properties.zookeeper.connect' = '"+filePath1+"',n" +
                "'json.ignore-parse-errors'='false',n"+
        " 'format' = 'json" + "'n" +
        ")";


        // 5.创建一个带eventtime字段的表
        tEnv.executeSql(ddl);

        // 6.一段sql语句
        String sql  = "SELECt id,hf,proctime " +
                "  FROM mytest1 n";
        // 6.执行该sql
        Table table = tEnv.sqlQuery(sql);
//         输出方式1
        TableResult ss=table.execute();
        ss.print();

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/317318.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号