- 创建环境
- Table API
- 创建表环境
- SQL表的创建
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// create a Table
tableEnv.connect(...).createTemporaryTable("table1");
// register an output Table
tableEnv.connect(...).createTemporaryTable("outputTable");
// create a Table object from a Table API query
Table tapiResult = tableEnv.from("table1").select(...);
// create a Table object from a SQL query
Table sqlResult = tableEnv.sqlQuery("SELECt ... FROM table1 ... ");
// emit a Table API result Table to a TableSink, same for SQL result
TableResult tableResult = tapiResult.executeInsert("outputTable");
tableResult...
创建表环境
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(); StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings); // or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings); import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.table.api.bridge.java.BatchTableEnvironment; ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv); // ********************** // Blink STREAMING QUERY // ********************** import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); // or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);SQL表的创建
Table revenue = tableEnv.sqlQuery(
"SELECt cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERe cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
);
tableEnv.executeSql(
"INSERT INTO RevenueFrance " +
"SELECt cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERe cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
);
.io/flowchart.js/



