import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)2、代码实现读取流数据转成表
将结果流转成流
—toRetractStream:更新的表
—toAppendStream:追加的表
object Demo01API {
def main(args: Array[String]): Unit = {
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings = EnvironmentSettings
.newInstance()
.useBlinkPlanner() //使用blink的计划器
.inStreamingMode() //使用流模型
.build()
//创建table的环境
val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
//构建一个流
val lineDS: DataStream[String] = bsEnv.socketTextStream("master", 8888)
val table: Table = bsTableEnv.fromDataStream(lineDS, $"word")
//将数据集注册成表,表名为words
bsTableEnv.createTemporaryView("words", table)
val countTable: Table = bsTableEnv.sqlQuery(
"""
|select word,count(1) from words group by word
""".stripMargin)
val result: DataStream[(Boolean, Row)] = countTable.toRetractStream[Row]
result.print()
bsEnv.execute()
}
}



