栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink sql与api中时间操作

flink sql与api中时间操作

1、处理时间(Processing Time)

基于时间的操作(比如 Table API 和 SQL 中窗口操作),需要定义相关的时间语义和时间数据来源的信息。 所以, Table 可以提供一个逻辑上的时间字段,用于在表处理程序中, 指示时间和访问相应的时间戳。

(1) DataStream 转化成 Table 时指定
  // 定义DataStream,并map称样例类
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val inputStream: DataStream[String] = env.readTextFile("sensor.txt")
    val ds: DataStream[_02_SensorReading] = inputStream.map(
      data => {
        val arr = data.split(",")
        (_02_SensorReading(arr(0), arr(1).toLong, arr(2).toDouble))
      }
    )
    // 创建表的执行环境
    val tabEnvSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val tabEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, tabEnvSettings)
    // 将dataStream转换为Table,并指定时间字段
    val table: Table = tabEnv.fromDataStream(ds,'id,'timestamp,'temperature,'pt.proctime)
(2)定义Table Schema时候指定
// 2)定义Table Schema时候指定
    tabEnv.connect(
      new FileSystem().path("sensor.txt")
     ).withFormat( new Csv())
      .withSchema(
        new Schema()
          .field("id",DataTypes.STRING())
          .field("timestamp",DataTypes.BIGINT())
          .field("temperature",DataTypes.DOUBLE())
          .field("pt",DataTypes.TIMESTAMP(3)).proctime() // 指定pt字段为处理时间
      )// 定义表结构
      .createTemporaryTable("inputTable") // 创建临时表
(3)创建表的DDL中指定
    // 3)创建表的DDL中指定
    val sinkDDL:String =
      """
        |
        |create table dataTable (
        |  id varchar(20) not null,
        |  ts bigint,
        |  temperature double,
        |  pt AS PROCTIME()
        | ) with (
        |  'connector.type' = 'filesystem',
        |  'connector.path' = 'file:///sensor.txt',
        |  'format.type' = 'csv'
        |""".stripMargin

    tabEnv.sqlUpdate(sinkDDL) // 执行DDL 注意: 运行这段 DDL,必须使用 Blink Planner
2、Event Time

为了处理无序事件,并区分流中的准时和迟到事件; Flink 需要从事件数据中,提取时间戳,并用来推进事件时间的进展(watermark)。

(1) DataStream 转化成 Table 时指定
 // 1)DataStream 转换成Table时候指定
    // 注意: 必须在转换的数据流中分配时间戳和水位线
    // 根据指定的.rowtime字段名是否存在,分两种情况
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val inputStream: DataStream[String] = env.readTextFile("sensor.txt")
    val ds: DataStream[_02_SensorReading] = inputStream.map(
      data => {
        val arr = data.split(",")
        _02_SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
      }
    ).assignAscendingTimestamps(_.timestamp * 1000L)
    val tabSettings: EnvironmentSettings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()

    val tabEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, tabSettings)
    // 将DataStream转换为Table,并指定时间字段
    val table1: Table = tabEnv.fromDataStream(ds,'id,'timestamp.rowtime,'temperature)
    // 直接追加字段
    val table2: Table = tabEnv.fromDataStream(ds,'id,'timestamp,'temperature,'rt.rowtime)
(2)定义Table Schema时候指定
// 2) 定义Table Schema时指定
    tabEnv.connect(
      new FileSystem().path("")
    ).withFormat(new Csv())
      .withSchema(
        new Schema()
          .field("id",DataTypes.STRING())
          .field("timestamp",DataTypes.BIGINT())
          .field("temperature",DataTypes.DOUBLE())
          .rowtime(
            new Rowtime()
              .timestampsFromField("timestamp") // 从字段中提取时间戳
              .watermarksPeriodicBounded(1000) //watermark延迟一秒
          )
      )// 定义表结构
      .createTemporaryTable("inputTable")
(3)创建表的DDL中指定
// 3) 创建表的 DDL 中指定
    val sinkDDL: String =
      """
        |create table dataTable (
        | id varchar(20) not null,
        | ts bigint,
        | temperature double,
        | rt AS TO_TIMESTAMP( FROM_UNIXTIME(ts) ),
        | watermark for rt as rt - interval '1' second
        |) with ( 'connector.type' = 'filesystem',
        | 'connector.path' = 'file:///D:\..\sensor.txt',
        | 'format.type' = 'csv'
        |)
     """.stripMargin
    tabEnv.sqlUpdate(sinkDDL) // 执行 DDL
    
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/350090.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号