flink sql 的学习(看大佬代码边敲边练习)):
一:流与表之间 的转换
创建流和表的执行环境
生产常用情况下 使用flink connect kafka 进行读写 ()
+ "'connector' = 'kafka',"
+ "'topic' = 'topic_source_sensor',"
+ "'properties.bootstrap.servers' = 'hadoop102:9092,hadoop103:9092,hadoop104:9092',"
+ "'properties.group.id' = 'atguigu',"
+ "'scan.startup.mode' = 'latest-offset',"
+ "'format' = 'csv'"
+ ")");
// 2. 注册SinkTable: sink_sensor
tableEnv.executeSql("create table sink_sensor(id string, ts bigint, vc int) with("
+ "'connector' = 'kafka',"
+ "'topic' = 'topic_sink_sensor',"
+ "'properties.bootstrap.servers' = 'hadoop102:9092,hadoop103:9092,hadoop104:9092',"
+ "'format' = 'csv'"
+ ")");
// 3. 从SourceTable 查询数据, 并写入到 SinkTable
tableEnv.executeSql("insert into sink_sensor select * from source_sensor where id='sensor_1'");
}
————————————————
版权声明:本文为CSDN博主「徐一闪_BigData」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/zznanyou/article/details/121393064



