1.从一个文件中导入表结构(Structure)(常用于批计算)(静态)
2.Table API 中已经提供了 TableSource 从外部系统获取数据,例如常见的数据库、文件 系统和 Kafka 消息队列等外部系统。
3.从文件中创建 Table(静态表) Flink 允许用户从本地或者分布式文件系统中读取和写入数据,在 Table API 中可以通 过 CsvTableSource 类来创建,只需指定相应的参数即可。但是文件格式必须是 CSV 格式的。 其 他 文 件 格 式 也 支 持 ( 在 Flink 还 有 Connector 的 来 支 持 其 他 格 式 或 者 自 定 义 TableSource)。
package tablesql
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.sources.CsvTableSource
object TestCreateTableByFile {
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
val settings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
val table = StreamTableEnvironment.create(environment, settings)
//读取数据
val source: CsvTableSource = new CsvTableSource("data/statefile.log",
Array[String]("f1", "f2", "f3", "f4", "f5", "f6"),
Array(Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG)
)
//注册一张表
table.registerTableSource("t_station_log",source)
//打印表结构,或者使用Table API, 需要得到Table对象 API
val t: Table = table.scan("t_station_log")
t.printSchema()
}
}



