栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

FLink学习笔记:12-Flink 的Table API的常用操作

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

FLink学习笔记:12-Flink 的Table API的常用操作

文章目录
  • Table Api的常用操作
    • 创建表环境
    • 从datastream创建一张表
      • 指定主键
      • 字段取别名
      • 提取时间字段(用于timewindow)
      • Watermark
    • 创建临时视图
    • 创建临时表或者表
    • 查询操作
      • 查询选取其中某些列
      • distinct去重
      • 查询过滤filter
      • 分组聚合

Table Api的常用操作 创建表环境
//构建环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

//构建table环境
val tableEnvironmentSettings = EnvironmentSettings
  .newInstance()
  .inStreamingMode()
  .build()
val tableEnv = StreamTableEnvironment.create(env, tableEnvironmentSettings)

  • EnvironmentSettings.newInstance().inStreamingMode() 流模式
    Flink表环境默认的处理模式就是流模式
  • EnvironmentSettings.newInstance().inBatchMode() 批模式
从datastream创建一张表

老版本的Flink的实现如下:

val inputTable = tableEnv.fromDataStream("sensor_table",datastream,
  $("id"),
  $("timestamp")
  $("temperature"))

但是在新的版本中这一方式已经被移除掉了,使用下面的方式定义表结构

val inputTable = tableEnv.fromDataStream(
  datastream,
  Schema
    .newBuilder()
    .column("id",DataTypes.STRING())
    .column("timestamp",DataTypes.BIGINT())
    .column("temperature",DataTypes.DOUBLE())
    .build()
)
指定主键
  • 单值主键
val inputTable = tableEnv.fromDataStream(
  datastream,
  Schema
    .newBuilder()
    .column("id",DataTypes.STRING().notNull())
    .column("timestamp",DataTypes.BIGINT())
    .column("temperature",DataTypes.DOUBLE())
    .primaryKey("id")
    .build()
)
  • 复合主键
val inputTable = tableEnv.fromDataStream(
  datastream,
  Schema
    .newBuilder()
    .column("id",DataTypes.STRING().notNull())
    .column("timestamp",DataTypes.BIGINT().notNull())
    .column("temperature",DataTypes.DOUBLE())
    .primaryKey("id","timestamp")
    .build()
)
字段取别名
//整张表按字段顺序重命名
inputTable.as("id","timestamp","ctime")

//重命名指定字段
inputTable.renameColumns($"id","sensor_id")
提取时间字段(用于timewindow)
  • 事件时间EventTime
val inputTable = tableEnv.fromDataStream(
  datastream,
  Schema
    .newBuilder()
    .column("id",DataTypes.STRING().notNull())
    .column("timestamp",DataTypes.BIGINT().notNull())
    .column("temperature",DataTypes.DOUBLE())
    .primaryKey("id","timestamp")
    .columnByExpression("rowtime","CAST(TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)) AS TIMESTAMP(3))")
    .build()
)

对应的DDL操作

CREATE TABLE sensor(
    id STRING NOT NULL,
    timestamp BIGINT NOT NULL,
    temperature DOUBLE NOT NULL,
    `proctime` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS proctime(),
    CONSTRAINT `PK_id_timestamp` PRIMARY KEY (`id`, `timestamp`) NOT ENFORCED
)
  • 处理时间ProcessTime
val inputTable = tableEnv.fromDataStream(
  datastream,
  Schema
    .newBuilder()
    .column("id",DataTypes.STRING().notNull())
    .column("timestamp",DataTypes.BIGINT().notNull())
    .column("temperature",DataTypes.DOUBLE())
    .primaryKey("id","timestamp")
    .columnByExpression("procTime","proctime()")
    .build()
)

对应的DDL操作

CREATE TABLE sensor(
    id STRING NOT NULL,
    timestamp BIGINT NOT NULL,
    temperature DOUBLE NOT NULL,
    `rowtime` TIMESTAMP(3) AS CAST(TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)) AS TIMESTAMP(3)),
    CONSTRAINT `PK_id_timestamp` PRIMARY KEY (`id`, `timestamp`) NOT ENFORCED
)
Watermark
  • 方式一:在DataStream流上定义好时间戳和watermark
datastream.assignTimestampsAndWatermarks(
WatermarkStrategy
  .forBoundedOutOfOrderness[SensorReading](Duration.ofSeconds(5))
  .withTimestampAssigner(
    new SerializableTimestampAssigner[SensorReading] {
      override def extractTimestamp(element: SensorReading,
                                    recordTimestamp: Long): Long = {
        element.timestamp * 1000L
      }
    })
)
//然后在表中可以指定相应的时间戳字段
val inputTable2 = tableEnv.fromDataStream(datastream,
  $("id"),
  $("timestamp").rowtime(),
  $("temperature"))
  • 方式二:TableAPI实现
val inputTable = tableEnv.fromDataStream(
  datastream,
  Schema
    .newBuilder()
    .column("id",DataTypes.STRING().notNull())
    .column("timestamp",DataTypes.BIGINT().notNull())
    .column("temperature",DataTypes.DOUBLE())
    .primaryKey("id","timestamp")
    .columnByExpression("rowtime","CAST(TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)) AS TIMESTAMP(3))")
    .watermark("rowtime","rowtime - interval '5' SECOND ")
    .build()
)

  • 方式三:SQL实现

语法:WATERMARK FOR order_time AS order_time - INTERVAL ‘5’ SECOND

其中的5,表示的最大延迟时间

CREATE TABLE sensor(
  `id` STRING NOT NULL,
  `timestamp` BIGINT NOT NULL,
  `temperature` DOUBLE,
  `rowtime` TIMESTAMP(3) AS CAST(TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)) AS TIMESTAMP(3)),
  WATERMARK FOR `rowtime`: TIMESTAMP(3) AS rowtime - interval '5' SECOND ,
  CONSTRAINT `PK_id_timestamp` PRIMARY KEY (`id`, `timestamp`) NOT ENFORCED
)
创建临时视图
  • 方式一:tableAPI
tableEnv.createTemporaryView("sensor_view",datastream,
  Schema
    .newBuilder()
    .column("id",DataTypes.STRING())
    .build()
)
val result = tableEnv.sqlQuery("SELECt * FROM sensor_view")
tableEnv.toDataStream(result).print()
  • 方式二:SQL语句实现
|CREATE TEMPORARY VIEW sensor_view
|AS
|SELECt id,temperature,rowtime FROM sensor
创建临时表或者表
  • 方式一:TABLE API

具体设置见TableAPI的Connector操作一文

val targetTable = tableEnv.createTemporaryTable("sensor_temp_table",
  TableDescriptor
    .forConnector("filesystem")
    .schema(Schema.newBuilder()
      .column("id",DataTypes.STRING())
      .column("temperature",DataTypes.DOUBLE())
      .build())
    .format("csv")
    .option("path","D:\LearnWorkSpace\FlinkDemo\src\main\resources\out")
    .build()
)
  • 方式二:SQL
CREATE TEMPORARY TABLE sensor_temp_table (
    id STRING NOT NULL,
    temperature DOUBLE
) WITH (
'connector'='filesystem',
'format'='csv',
'path'='file:///filepath/'
...
)

查询操作 查询选取其中某些列
  • 方式一:Table Api操作
var result = inputTable.select($"id",$"timestamp",'temperature,'rowtime)
tableEnv.toDataStream(result).print()
  • 方式二:SQL实现
SELECT id,timestamp,temperature,rowtime FROM inputtable
distinct去重
  • 方式一:Table Api操作
var result = inputTable.select($"id",$"timestamp",'temperature,'rowtime).dictinct()
tableEnv.toDataStream(result).print()
  • 方式二:SQL实现
SELECT distinct id,timestamp,temperature,rowtime FROM inputtable
查询过滤filter
  • .filter 可以级联操作,多个filter之间是and的关系
var result = inputTable.select($"id",$"timestamp",'temperature,'rowtime)
  .filter($"id" === "sensor_1")
  .filter($"temperature" >= 40.8 )
    .distinct()
tableEnv.toDataStream(result).print()
  • .filter 里面也可以写多个条件,多个条件之间可以使用“or”或者“and”连接。
    var result = inputTable.select($"id",$"timestamp",'temperature,'rowtime)
      .filter($"id" === "sensor_1" or $"id" === "sensor_2")
        .distinct()
    tableEnv.toDataStream(result).print()
  • SQL实现
SELECT distinct id,timestamp,temperature,rowtime FROM inputtable
WHERe id = 'sensor_1' or id = 'sensor_2'

然后使用tableEnv.sqlQuery(sqlStatement)执行查询

分组聚合
  • 方式一:Table Api操作
var result = inputTable
    .groupBy($"id")
    .select($"id",$"id".count() as "cnt")
tableEnv.toDataStream(result).print()
  • 方式二:SQL实现
SELECT id,count(id) as cnt FROM inputtable
group by id
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/885406.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号