栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

TableAPI和SQL之创建Table(一)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

TableAPI和SQL之创建Table(一)

1.从一个文件中导入表结构(Structure)(常用于批计算)(静态)
2.Table API 中已经提供了 TableSource 从外部系统获取数据,例如常见的数据库、文件 系统和 Kafka 消息队列等外部系统。
3.从文件中创建 Table(静态表) Flink 允许用户从本地或者分布式文件系统中读取和写入数据,在 Table API 中可以通 过 CsvTableSource 类来创建,只需指定相应的参数即可。但是文件格式必须是 CSV 格式的。 其 他 文 件 格 式 也 支 持 ( 在 Flink 还 有 Connector 的 来 支 持 其 他 格 式 或 者 自 定 义 TableSource)。

package tablesql

import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.sources.CsvTableSource


object TestCreateTableByFile {
  def main(args: Array[String]): Unit = {
    val environment = StreamExecutionEnvironment.getExecutionEnvironment
    val settings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
    val table = StreamTableEnvironment.create(environment, settings)

    //读取数据
    val source: CsvTableSource = new CsvTableSource("data/statefile.log",
      Array[String]("f1", "f2", "f3", "f4", "f5", "f6"),
      Array(Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG)
    )

    //注册一张表
    table.registerTableSource("t_station_log",source)
    //打印表结构,或者使用Table API, 需要得到Table对象 API
    val t: Table = table.scan("t_station_log")
    t.printSchema()

  }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/683118.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号