上节简单介绍了Table API和SQL,tableEnv环境的创建、Table创建、以及Table API和SQL的简单查询等。这一节将深入一点,介绍和使用Table API和SQL连接外部文件系统,进行source和sink的简单操作。
二、连接外部文件系统读取数据,混用 Table API 和 SQL 1、混用 Table API 和 SQL :Table API 和 SQL 查询的混用非常简单因为它们都返回 Table 对象:
可以在 SQL 查询返回的 Table 对象上定义 Table API 查询。在 TableEnvironment 中注册的结果表可以在 SQL 查询的 FROM 子句中引用,通过这种方法就可以在 Table API 查询的结果上定义 SQL 查询。 2、source端连接外部文件系统: 2.1、source端使用Table API 的 Connector Tables连接外部文件系统:
使用 tableEnv.connect(new FileSystem().path(path)) 来连接文件系统;使用 withFormat(new Csv()) 定义以csv格式进行数据格式化;使用 withSchema(new Schema()) 定义表结构使用 field(“id”, DataTypes.STRING()) 定义表字段和类型;使用 createTemporaryTable(“inputTable”) 创建表;
核心示例代码如下:
//读取文件的路径
String path = "src/main/resources/sensor.txt";
tableEnv.connect(new FileSystem().path(path)) //定义到文件系统的连接
.withFormat(new Csv()) //定义以csv格式进行数据格式化
.withSchema(new Schema() //定义表结构
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temp", DataTypes.DOUBLE()))
.createTemporaryTable("inputTable"); //创建临时表
//创建表inputTable
Table inputTable = tableEnv.from("inputTable");
//打印表结构
inputTable.printSchema();
混用Table API 和 SQL 读取文件系统的数据,进行简单查询转换、聚合统计:
public static void main(String[] args) throws Exception {
// 1、创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2、设置并行度
env.setParallelism(1);
// 3、创建表执行环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 4、表的创建:连接外部文件系统,读取数据
// 4.1读取文件
String path = "src/main/resources/sensor.txt";
tableEnv.connect(new FileSystem().path(path)) //定义到文件系统的连接
.withFormat(new Csv()) //定义以csv格式进行数据格式化
.withSchema(new Schema() //定义表结构
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temp", DataTypes.DOUBLE()))
.createTemporaryTable("inputTable"); //创建临时表
//创建表inputTable
Table inputTable = tableEnv.from("inputTable");
inputTable.printSchema();
// 5、查询转换
// 5.1 Table API
// 使用Table API 进行简单查询
Table resultTable = inputTable.select($("id"), $("temp"))
.filter($("id").isEqual("sensor_6"));
// 使用Table API 进行聚合统计
Table aggTable = inputTable.groupBy($("id"))
.select($("id"), $("id").count().as("count"), $("temp").avg().as("avgTemp"));
//旧的Table API
// 5.2 SQL
Table sqlResultTable = tableEnv.sqlQuery("select id , temp from inputTable where id='sensor_1'");
Table sqlAggTable = tableEnv.sqlQuery("select id,count(id) as cnt , avg(temp) as avgTemp from inputTable group by id");
// 6.打印输出
//Table API查询结果输出
tableEnv.toAppendStream(resultTable, Row.class).print("resultTable");
//利用撤回流方式输出
tableEnv.toRetractStream(aggTable, Row.class).print("aggTable");
//SQL查询结果输出
tableEnv.toAppendStream(sqlResultTable,Row.class).print("sqlResultTable");
// 利用撤回流方式输出
tableEnv.toRetractStream(sqlAggTable, Row.class).print("sqlAggTable");
env.execute();
}
2.2、source端使用SQL的 Connector Tables连接外部文件系统:
使用SQL连接外部系统首先需要编写DDL语句创建表,示例如下:
String filePath = "src/main/resources/inputTable.txt";
//创建连接外部文件系统的inputTable表
String ddl =
"create table inputTable (n" +
" id STRING,n" +
" ts BIGINT,n" +
" temp DOUBLEn" +
") WITH (n" +
" 'connector.type' = 'filesystem',n" +
" 'connector.path' = '" + filePath + "',n" +
" 'format.type' = 'csv'n" +
")";
//执行建表语句
tableEnv.executeSql(ddl);
如此,就在catalog中注册了inputTable这张表。
3、sink端连接外部文件系统:source端读取数据可使用Table API 的 Connector Tables,也可使用SQL执行DDL建表语句;同理sink输出端也可使用这两种方式:
3.1 sink端使用Table API 的 Connector Tables连接外部文件系统:注意: 写入到文件有局限,只能是批处理,且只能是追加写,不能是更新式的随机写。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 1.创建表执行环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 2.表的创建:连接外部系统,读取数据
String inputPath = "src/main/resources/sensor.txt";
tableEnv.connect(new FileSystem().path(inputPath)) //定义到文件系统的连接
.withFormat(new Csv()) //定义以csv格式ji进行数据格式化
.withSchema(new Schema() //定义表结构
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temp", DataTypes.DOUBLE()))
.createTemporaryTable("inputTable"); //创建临时表
Table inputTable = tableEnv.from("inputTable");
// 3.查询id=sensor_1的数据
Table resultTable = inputTable.select($("id"), $("temp"))
.filter($("id").isEqual("sensor_1"));
// 4.输出到文件
// 连接外部文件注册输出表
String outputPath = "src/main/resources/output.txt";
tableEnv.connect(new FileSystem().path(outputPath))
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("temperature", DataTypes.DOUBLE()))
.createTemporaryTable("outputTable");
resultTable.insertInto("outputTable");
tableEnv.execute("");
}
3.2 sink端使用SQL Connector Tables连接外部文件系统:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 1.创建表执行环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 2.表的创建:连接外部系统,读取数据
String inputPath = "src/main/resources/sensor.txt";
String inputTableDDL =
"create table inputTable (n" +
" id STRING,n" +
" ts BIGINT, n" +
" temp DOUBLEn" +
") WITH (n" +
" 'connector.type' = 'filesystem',n" +
" 'connector.path' = '" + inputPath + "',n" +
" 'format.type' = 'csv'n" +
")";
//执行source建表语句
tableEnv.executeSql(inputTableDDL);
// 3.输出到文件
// 连接外部文件注册输出表
String outputPath = "src/main/resources/outputTable.txt";
//创建输出到外部文件系统的outputTable表
String outputTableDDL =
"create table outputTable (n" +
" id STRING,n" +
" temp DOUBLEn" +
") WITH (n" +
" 'connector.type' = 'filesystem',n" +
" 'connector.path' = '" + outputPath + "',n" +
" 'format.type' = 'csv'n" +
")";
//执行sink建表语句
tableEnv.executeSql(outputTableDDL);
// 将表inputTable的查询结果输出到outputTable
tableEnv.executeSql(
"INSERT INTO outputTable " +
"SELECT id , temp FROM inputTable WHERe id='sensor_1' ");
tableEnv.execute("");
}
Flink1.11及以后的版本执行如上任务会报这个异常:
log4j:WARN No appenders could be found for logger (org.apache.calcite.sql.parser). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot generate StreamGraph. at org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:41) at org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:50) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1276) at org.example.flink.table_sql_api.TableTest4_FileOutput_SQL.main(TableTest4_FileOutput_SQL.java:67)
原因是Flink 1.11版本对Table&SQL模块进行了重构,废弃了TableEnvironment.sqlUpdate方法,引入了TableEnvironment.executeSql、StatementSet.execute方法。
由于1.10的习惯 在程序最后一般都会调用TableEnvironment.execute()或StreamExecutionEnvironment.execute()方法。
问题就在这里,由于executeSql已经是异步提交了作业,生成Transformation后会把缓存的Operation清除,见TableEnvironmentImpl#translateAndClearBuffer,执行execute也会走那一段逻辑,报了上面异常,但是这个异常不影响程序执行。



