-- 创建数据源表映射
create table source (
id bigint,
name STRING
)with (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/database',
'table-name' = 'table-name', // 这个是mysql中对应的表名
'username' = 'root',
'password' = '123456');
注意:该DDL并不是真正的创建物理表,而是创建了一个与物理表连接的映射关系
-- 创建sink表映射
create table sink(
id bigint,
name STRING
)with (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/database',
'table-name' = 'tablename',
'username' = 'root',
'password' = '123456');
-- 将source表中的数据插入到sink表中 insert into sink select * from source;Flink sql API 案例
EnvironmentSettings settings = EnvironmentSettings.newInstance()...
TableEnvironment tableEnv = TableEnvironment.create(settings);
// 对已注册的表进行 SQL 查询
// 注册名为 “Orders” 的表
tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
// 在表上执行 SQL 查询,并把得到的结果作为一个新的表
Table result = tableEnv.sqlQuery(
"SELECt product, amount FROM Orders WHERe product LIKE '%Rubber%'");
// 对已注册的表进行 INSERT 操作
// 注册 TableSink
tableEnv.executeSql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)");
// 在表上执行 INSERT 语句并向 TableSink 发出结果
tableEnv.executeSql(
"INSERT INTO RubberOrders SELECt product, amount FROM Orders WHERe product LIKE '%Rubber%'");
详细案例请参考官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/dev/table/sql/create.html



