栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

FlinkSQL Table API和SQL(二)

FlinkSQL Table API和SQL(二)

一、回顾

上节简单介绍了Table API和SQL,tableEnv环境的创建、Table创建、以及Table API和SQL的简单查询等。这一节将深入一点,介绍和使用Table API和SQL连接外部文件系统,进行source和sink的简单操作。

二、连接外部文件系统读取数据,混用 Table API 和 SQL 1、混用 Table API 和 SQL :

Table API 和 SQL 查询的混用非常简单因为它们都返回 Table 对象:

可以在 SQL 查询返回的 Table 对象上定义 Table API 查询。在 TableEnvironment 中注册的结果表可以在 SQL 查询的 FROM 子句中引用,通过这种方法就可以在 Table API 查询的结果上定义 SQL 查询。 2、source端连接外部文件系统: 2.1、source端使用Table API 的 Connector Tables连接外部文件系统:

使用 tableEnv.connect(new FileSystem().path(path)) 来连接文件系统;使用 withFormat(new Csv()) 定义以csv格式进行数据格式化;使用 withSchema(new Schema()) 定义表结构使用 field(“id”, DataTypes.STRING()) 定义表字段和类型;使用 createTemporaryTable(“inputTable”) 创建表;

核心示例代码如下:

//读取文件的路径
String path = "src/main/resources/sensor.txt";

tableEnv.connect(new FileSystem().path(path))       //定义到文件系统的连接
                .withFormat(new Csv())                      //定义以csv格式进行数据格式化
                .withSchema(new Schema()                    //定义表结构
                        .field("id", DataTypes.STRING())
                        .field("timestamp", DataTypes.BIGINT())
                        .field("temp", DataTypes.DOUBLE()))
                .createTemporaryTable("inputTable");   //创建临时表

//创建表inputTable
Table inputTable = tableEnv.from("inputTable");
//打印表结构
inputTable.printSchema();

混用Table API 和 SQL 读取文件系统的数据,进行简单查询转换、聚合统计:

 public static void main(String[] args) throws Exception {
        // 1、创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2、设置并行度
        env.setParallelism(1);

        // 3、创建表执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 4、表的创建:连接外部文件系统,读取数据
        // 4.1读取文件
        String path = "src/main/resources/sensor.txt";
        tableEnv.connect(new FileSystem().path(path))       //定义到文件系统的连接
                .withFormat(new Csv())                      //定义以csv格式进行数据格式化
                .withSchema(new Schema()                    //定义表结构
                        .field("id", DataTypes.STRING())
                        .field("timestamp", DataTypes.BIGINT())
                        .field("temp", DataTypes.DOUBLE()))
                .createTemporaryTable("inputTable");   //创建临时表

        //创建表inputTable
        Table inputTable = tableEnv.from("inputTable");
        inputTable.printSchema();

        // 5、查询转换
        // 5.1 Table API
        // 使用Table API 进行简单查询
        Table resultTable = inputTable.select($("id"), $("temp"))
                .filter($("id").isEqual("sensor_6"));
        // 使用Table API 进行聚合统计
        Table aggTable = inputTable.groupBy($("id"))
                .select($("id"), $("id").count().as("count"), $("temp").avg().as("avgTemp"));

 		//旧的Table API
        


        // 5.2 SQL
        Table sqlResultTable = tableEnv.sqlQuery("select id , temp from inputTable where id='sensor_1'");

        Table sqlAggTable = tableEnv.sqlQuery("select id,count(id) as cnt , avg(temp) as avgTemp  from inputTable group by id");

        // 6.打印输出
        //Table API查询结果输出
        tableEnv.toAppendStream(resultTable, Row.class).print("resultTable");
        //利用撤回流方式输出
        tableEnv.toRetractStream(aggTable, Row.class).print("aggTable");
        //SQL查询结果输出
        tableEnv.toAppendStream(sqlResultTable,Row.class).print("sqlResultTable");
        // 利用撤回流方式输出
       	tableEnv.toRetractStream(sqlAggTable, Row.class).print("sqlAggTable");  

        env.execute();
    }

2.2、source端使用SQL的 Connector Tables连接外部文件系统:

使用SQL连接外部系统首先需要编写DDL语句创建表,示例如下:

		String filePath = "src/main/resources/inputTable.txt";
        //创建连接外部文件系统的inputTable表
        String ddl =
                "create table inputTable (n" +
                        " id STRING,n" +
                        " ts BIGINT,n" +
                        " temp DOUBLEn" +
                        ") WITH (n" +
                        " 'connector.type' = 'filesystem',n" +
                        " 'connector.path' = '" + filePath + "',n" +
                        " 'format.type' = 'csv'n" +
                        ")";
        //执行建表语句
        tableEnv.executeSql(ddl);

如此,就在catalog中注册了inputTable这张表。

3、sink端连接外部文件系统:

source端读取数据可使用Table API 的 Connector Tables,也可使用SQL执行DDL建表语句;同理sink输出端也可使用这两种方式:

3.1 sink端使用Table API 的 Connector Tables连接外部文件系统:

注意: 写入到文件有局限,只能是批处理,且只能是追加写,不能是更新式的随机写。

 public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        // 1.创建表执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 2.表的创建:连接外部系统,读取数据
        String inputPath = "src/main/resources/sensor.txt";
        tableEnv.connect(new FileSystem().path(inputPath))       //定义到文件系统的连接
                .withFormat(new Csv())                      //定义以csv格式ji进行数据格式化
                .withSchema(new Schema()                    //定义表结构
                        .field("id", DataTypes.STRING())
                        .field("timestamp", DataTypes.BIGINT())
                        .field("temp", DataTypes.DOUBLE()))
                .createTemporaryTable("inputTable");   //创建临时表

        Table inputTable = tableEnv.from("inputTable");

        // 3.查询id=sensor_1的数据
        Table resultTable = inputTable.select($("id"), $("temp"))
                .filter($("id").isEqual("sensor_1"));

        // 4.输出到文件
        // 连接外部文件注册输出表
        String outputPath = "src/main/resources/output.txt";
        tableEnv.connect(new FileSystem().path(outputPath))
                .withFormat(new Csv())
                .withSchema(new Schema()
                        .field("id", DataTypes.STRING())
                        .field("temperature", DataTypes.DOUBLE()))
                .createTemporaryTable("outputTable");

        resultTable.insertInto("outputTable");

        tableEnv.execute("");
    }
3.2 sink端使用SQL Connector Tables连接外部文件系统:
public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        // 1.创建表执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 2.表的创建:连接外部系统,读取数据
        String inputPath = "src/main/resources/sensor.txt";
        String inputTableDDL =
                "create table inputTable (n" +
                        " id STRING,n" +
                        " ts BIGINT, n" +
                        " temp DOUBLEn" +
                        ") WITH (n" +
                        " 'connector.type' = 'filesystem',n" +
                        " 'connector.path' = '" + inputPath + "',n" +
                        " 'format.type' = 'csv'n" +
                        ")";

        //执行source建表语句
        tableEnv.executeSql(inputTableDDL);


        // 3.输出到文件
        // 连接外部文件注册输出表
        String outputPath = "src/main/resources/outputTable.txt";
        //创建输出到外部文件系统的outputTable表
        String outputTableDDL =
                "create table outputTable (n" +
                        " id STRING,n" +
                        " temp DOUBLEn" +
                        ") WITH (n" +
                        " 'connector.type' = 'filesystem',n" +
                        " 'connector.path' = '" + outputPath + "',n" +
                        " 'format.type' = 'csv'n" +
                        ")";
        //执行sink建表语句
        tableEnv.executeSql(outputTableDDL);
        
        // 将表inputTable的查询结果输出到outputTable
        tableEnv.executeSql(
                "INSERT INTO outputTable " +
                        "SELECT id , temp FROM inputTable WHERe id='sensor_1' ");

        tableEnv.execute("");
    }

Flink1.11及以后的版本执行如上任务会报这个异常:

log4j:WARN No appenders could be found for logger (org.apache.calcite.sql.parser).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot generate StreamGraph.
	at org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:41)
	at org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:50)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1276)
	at org.example.flink.table_sql_api.TableTest4_FileOutput_SQL.main(TableTest4_FileOutput_SQL.java:67)

原因是Flink 1.11版本对Table&SQL模块进行了重构,废弃了TableEnvironment.sqlUpdate方法,引入了TableEnvironment.executeSql、StatementSet.execute方法。

由于1.10的习惯 在程序最后一般都会调用TableEnvironment.execute()或StreamExecutionEnvironment.execute()方法。

问题就在这里,由于executeSql已经是异步提交了作业,生成Transformation后会把缓存的Operation清除,见TableEnvironmentImpl#translateAndClearBuffer,执行execute也会走那一段逻辑,报了上面异常,但是这个异常不影响程序执行。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/711135.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号