基于时间的操作(比如 Table API 和 SQL 中窗口操作),需要定义相关的时间语义和时间数据来源的信息。 所以, Table 可以提供一个逻辑上的时间字段,用于在表处理程序中, 指示时间和访问相应的时间戳。
(1) DataStream 转化成 Table 时指定 // 定义DataStream,并map称样例类
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val inputStream: DataStream[String] = env.readTextFile("sensor.txt")
val ds: DataStream[_02_SensorReading] = inputStream.map(
data => {
val arr = data.split(",")
(_02_SensorReading(arr(0), arr(1).toLong, arr(2).toDouble))
}
)
// 创建表的执行环境
val tabEnvSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tabEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, tabEnvSettings)
// 将dataStream转换为Table,并指定时间字段
val table: Table = tabEnv.fromDataStream(ds,'id,'timestamp,'temperature,'pt.proctime)
(2)定义Table Schema时候指定
// 2)定义Table Schema时候指定
tabEnv.connect(
new FileSystem().path("sensor.txt")
).withFormat( new Csv())
.withSchema(
new Schema()
.field("id",DataTypes.STRING())
.field("timestamp",DataTypes.BIGINT())
.field("temperature",DataTypes.DOUBLE())
.field("pt",DataTypes.TIMESTAMP(3)).proctime() // 指定pt字段为处理时间
)// 定义表结构
.createTemporaryTable("inputTable") // 创建临时表
(3)创建表的DDL中指定
// 3)创建表的DDL中指定
val sinkDDL:String =
"""
|
|create table dataTable (
| id varchar(20) not null,
| ts bigint,
| temperature double,
| pt AS PROCTIME()
| ) with (
| 'connector.type' = 'filesystem',
| 'connector.path' = 'file:///sensor.txt',
| 'format.type' = 'csv'
|""".stripMargin
tabEnv.sqlUpdate(sinkDDL) // 执行DDL 注意: 运行这段 DDL,必须使用 Blink Planner
2、Event Time
为了处理无序事件,并区分流中的准时和迟到事件; Flink 需要从事件数据中,提取时间戳,并用来推进事件时间的进展(watermark)。
(1) DataStream 转化成 Table 时指定 // 1)DataStream 转换成Table时候指定
// 注意: 必须在转换的数据流中分配时间戳和水位线
// 根据指定的.rowtime字段名是否存在,分两种情况
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val inputStream: DataStream[String] = env.readTextFile("sensor.txt")
val ds: DataStream[_02_SensorReading] = inputStream.map(
data => {
val arr = data.split(",")
_02_SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
}
).assignAscendingTimestamps(_.timestamp * 1000L)
val tabSettings: EnvironmentSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tabEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, tabSettings)
// 将DataStream转换为Table,并指定时间字段
val table1: Table = tabEnv.fromDataStream(ds,'id,'timestamp.rowtime,'temperature)
// 直接追加字段
val table2: Table = tabEnv.fromDataStream(ds,'id,'timestamp,'temperature,'rt.rowtime)
(2)定义Table Schema时候指定
// 2) 定义Table Schema时指定
tabEnv.connect(
new FileSystem().path("")
).withFormat(new Csv())
.withSchema(
new Schema()
.field("id",DataTypes.STRING())
.field("timestamp",DataTypes.BIGINT())
.field("temperature",DataTypes.DOUBLE())
.rowtime(
new Rowtime()
.timestampsFromField("timestamp") // 从字段中提取时间戳
.watermarksPeriodicBounded(1000) //watermark延迟一秒
)
)// 定义表结构
.createTemporaryTable("inputTable")
(3)创建表的DDL中指定
// 3) 创建表的 DDL 中指定
val sinkDDL: String =
"""
|create table dataTable (
| id varchar(20) not null,
| ts bigint,
| temperature double,
| rt AS TO_TIMESTAMP( FROM_UNIXTIME(ts) ),
| watermark for rt as rt - interval '1' second
|) with ( 'connector.type' = 'filesystem',
| 'connector.path' = 'file:///D:\..\sensor.txt',
| 'format.type' = 'csv'
|)
""".stripMargin
tabEnv.sqlUpdate(sinkDDL) // 执行 DDL



