OVER聚合: 有序行范围内的每个输入行计算聚合值
- 与GROUP BY聚合相比,OVER聚合不会将每个组的结果行数减少到一行
- OVER聚合为每个输入行生成新的列,表示聚合值
SELECt
agg_func(agg_col) OVER (
[PARTITION BY col1[, col2, ...]]
ORDER BY time_col
range_definition),
...
FROM ...
注意: WINDOW子句可以用来在SELECt子句之外定义一个OVER窗口,这样使查询更具有可读性,也允许对多个聚合重用窗口定义。
示例:
-- 下面的查询为每个订单计算在当前订单之前一小时内收到的同一产品的所有订单的金额总和。
SELECT order_id, order_time, amount,
SUM(amount) OVER (
PARTITION BY product
ORDER BY order_time
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
) AS one_hour_prod_amount_sum
FROM Orders
-- 下面的查询为每个订单计算在当前订单之前一小时内收到的同一产品的所有订单的金额总和 及 平均金额。
SELECt order_id, order_time, amount,
SUM(amount) OVER w AS sum_amount,
AVG(amount) OVER w AS avg_amount
FROM Orders
WINDOW w AS (
PARTITION BY product
ORDER BY order_time
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW)
2.1. ORDER BY
OVER窗口定义在一个有序的行序列集合上。
由于表没有固有的顺序,order by子句是必须的。
对于流式查询,Flink目前只支持按升序时间属性顺序定义的OVER窗口,其它顺序不支持。
OVER窗口可以在分区表上定义。
如果存在PARTITION BY子句,对于每个输入行,只在其分区的行上计算聚合。相当于基于分区列 进行分组计算聚合。
范围定义指定聚合中包含多少行。使用BETWEEN子句定义范围,该子句定义下限和上限。这些边界之间的所有行都包含在聚合中。 Flink只支持CURRENT ROW作为上边界。
有两种方式进行范围定义: ROWS intervals 、 RANGE intervals.
2.3.1. RANGE intervalsRANGE intervals定义在ORDER BY列的值上,对于Flink,这列始终是一个时间属性
--下面的RANGE intervals定义时间属性最多比当前行少30分钟的所有行都包含在聚合中。 RANGE BETWEEN INTERVAL '30' MINUTE PRECEDING AND CURRENT ROW2.3.2. ROW intervals
ROW intervals是一个基于计数的intervals。它确切地定义了在聚合中包含多少行。
--下面的ROWS intervals定义当前行之前的10行和当前行(总共11行)包含在聚合中。
ROWS BETWEEN 10 PRECEDING AND CURRENT ROW
--示例SQL
SELECt bidtime, price, item,supplier_id,
max(price) OVER
PARTITION BY item
ORDER BY bidtime
ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
) AS max_price
FROM Bid
3. 源码示例
3.1. 源码
package com.streaming.flink.tvf;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
public class TVFSQLExample {
private static final Logger LOG = LoggerFactory.getLogger(TVFSQLExample.class);
public static void main(String[] args) {
EnvironmentSettings settings = null;
StreamTableEnvironment tEnv = null;
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
tEnv = StreamTableEnvironment.create(env, settings);
Configuration configuration = tEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true"); // local-global aggregation depends on mini-batch is enabled
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm");
DataStream dataStream =
env.fromElements(
new Bid(LocalDateTime.parse("2020-04-15 08:05",dateTimeFormatter), new BigDecimal("4.00"), "A", "supplier1"),
new Bid(LocalDateTime.parse("2020-04-15 08:06",dateTimeFormatter), new BigDecimal("4.00"), "A", "supplier2"),
new Bid(LocalDateTime.parse("2020-04-15 08:07",dateTimeFormatter), new BigDecimal("2.00"), "A", "supplier1"),
new Bid(LocalDateTime.parse("2020-04-15 08:08",dateTimeFormatter), new BigDecimal("2.00"), "A", "supplier3"),
new Bid(LocalDateTime.parse("2020-04-15 08:09",dateTimeFormatter), new BigDecimal("5.00"), "B", "supplier4"),
new Bid(LocalDateTime.parse("2020-04-15 08:11",dateTimeFormatter), new BigDecimal("2.00"), "B", "supplier3"),
new Bid(LocalDateTime.parse("2020-04-15 08:13",dateTimeFormatter), new BigDecimal("1.00"), "B", "supplier1"),
new Bid(LocalDateTime.parse("2020-04-15 08:15",dateTimeFormatter), new BigDecimal("3.00"), "B", "supplier2"),
new Bid(LocalDateTime.parse("2020-04-15 08:17",dateTimeFormatter), new BigDecimal("6.00"), "B", "supplier5"));
Table table = tEnv.fromDataStream(dataStream, Schema.newBuilder()
.column("bidtime", DataTypes.TIMESTAMP(3))
.column("price", DataTypes.DECIMAL(10,2))
.column("item", DataTypes.STRING())
.column("supplier_id", DataTypes.STRING())
.watermark("bidtime", "bidtime - INTERVAL '1' SECOND")
.build());
table.printSchema();
tEnv.createTemporaryView("Bid",table);
String sql = "SELECt bidtime, price, item,supplier_id," +
" SUM(price) OVER (" +
" PARTITION BY item " +
" ORDER BY bidtime " +
// " RANGE BETWEEN INTERVAL '5' MINUTE PRECEDING AND CURRENT ROW" +
" ROWS BETWEEN 3 PRECEDING AND CURRENT ROW" +
" ) AS max_sum" +
" FROM Bid";
TableResult res = tEnv.executeSql(sql);
res.print();
tEnv.dropTemporaryView("Bid");
}catch (Exception e){
LOG.error(e.getMessage(),e);
}
}
//必须是静态类,且属性
public static class Bid{
public LocalDateTime bidtime;
public BigDecimal price;
public String item;
public String supplier_id;
public Bid() {}
public Bid(LocalDateTime bidtime, BigDecimal price, String item,String supplier_id) {
this.bidtime = bidtime;
this.price = price;
this.item = item;
this.supplier_id = supplier_id;
}
}
}
3.2. 输出结果
| op | bidtime | price | item | supplier_id | max_sum | +----+-------------------------+--------------+------+-------------+----------------+ | +I | 2020-04-15 08:05:00.000 | 4.00 | A | supplier1 | 4.00 | | +I | 2020-04-15 08:06:00.000 | 4.00 | A | supplier2 | 8.00 | | +I | 2020-04-15 08:07:00.000 | 2.00 | A | supplier1 | 10.00 | | +I | 2020-04-15 08:08:00.000 | 2.00 | A | supplier3 | 12.00 | | +I | 2020-04-15 08:09:00.000 | 5.00 | B | supplier4 | 5.00 | | +I | 2020-04-15 08:11:00.000 | 2.00 | B | supplier3 | 7.00 | | +I | 2020-04-15 08:13:00.000 | 1.00 | B | supplier1 | 8.00 | | +I | 2020-04-15 08:15:00.000 | 3.00 | B | supplier2 | 11.00 | | +I | 2020-04-15 08:17:00.000 | 6.00 | B | supplier5 | 12.00 |



