Window TVF Aggregation 是指 基于应用Window TVF函数的分组统计,语法如下:
SELECt ... FROM-- relation applied windowing TVF GROUP BY window_start, window_end, ...
在GROUP BY子句中定义的窗口聚合包含应用于窗口TVF函数产生的新关系的“window_start”和“window_end”列。
就像使用普通的GROUP BY查询一样,使用窗口聚合的group query将为每个组【group】计算出一个结果行。
Flink支持TUMBLE、HOP和CUMULATE 类型的窗口聚合,使用示例详见:flink–Window TVF【窗口化表值函数】讲解及源码示例
SQL示例如下:
--TUMBLE函数 SELECt window_start, window_end, SUM(price) FROM TABLE(TUMBLE(TABLE Bid, DEscriptOR(bidtime), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end --HOP函数 SELECt window_start, window_end, SUM(price) FROM TABLE(HOP(TABLE Bid, DEscriptOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)) GROUP BY window_start, window_end --CUMULATE函数 SELECt window_start, window_end, SUM(price) FROM TABLE(CUMULATE(TABLE Bid, DEscriptOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES)) GROUP BY window_start, window_end2.2. GROUPING SETS 2.2.1. 介绍
Grouping sets【分组集】允许进行比标准GROUP by所描述的更复杂的分组操作
行按照每个指定的分组集分别分组,并为每个分组计算聚合,就像简单的group by子句一样
- 具有分组集的窗口聚合要求:要求window_start和window_end列都必须在GROUP BY子句中,而不是在GROUPING SETS子句中。
- GROUPING SETS的每个子列表可以指定零个或多个列或表达式,就像直接在GROUP BY子句中使用一样
- 空分组集意味着所有行都聚合为一个组,即使没有输入行,这也是输出。
SELECt window_start, window_end, supplier_id, SUM(price) as price FROM TABLE(TUMBLE(TABLE Bid, DEscriptOR(bidtime), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end, GROUPING SETS ((supplier_id), ())
注意:GROUPING SETS可以指定多个分组条件,如((supplier_id),(item),()) 即指定了三组分组条件,针对一个窗口内的数据,会分别按supplier_id、item、所有行聚合为一个组进行分组计算
| bidtime | price | item | supplier_id | +------------------+-------+------+-------------+ | 2020-04-15 08:05 | 4.00 | A | supplier1 | | 2020-04-15 08:07 | 2.00 | A | supplier1 | | 2020-04-15 08:09 | 5.00 | D | supplier2 | | 2020-04-15 08:11 | 3.00 | F | supplier2 | | 2020-04-15 08:13 | 1.00 | E | supplier1 | | 2020-04-15 08:17 | 6.00 | F | supplier2 | 按 GROUP BY window_start, window_end, GROUPING SETS ((supplier_id),(item),()) 分组聚合后的结果如下: | op | window_start | window_end | supplier_id | item |price | +----+-------------------------+-------------------------+-------------+-------+------+ | +I | 2020-04-15 08:00:00.000 | 2020-04-15 08:10:00.000 | supplier1 |(NULL) | 6.00 | | +I | 2020-04-15 08:10:00.000 | 2020-04-15 08:20:00.000 | supplier1 |(NULL) | 1.00 | | +I | 2020-04-15 08:00:00.000 | 2020-04-15 08:10:00.000 | supplier2 |(NULL) | 5.00 | | +I | 2020-04-15 08:10:00.000 | 2020-04-15 08:20:00.000 | supplier2 |(NULL) | 9.00 | | +I | 2020-04-15 08:10:00.000 | 2020-04-15 08:20:00.000 | (NULL) | E | 1.00 | | +I | 2020-04-15 08:10:00.000 | 2020-04-15 08:20:00.000 | (NULL) | F | 9.00 | | +I | 2020-04-15 08:00:00.000 | 2020-04-15 08:10:00.000 | (NULL) | A | 6.00 | | +I | 2020-04-15 08:00:00.000 | 2020-04-15 08:10:00.000 | (NULL) | D | 5.00 | | +I | 2020-04-15 08:00:00.000 | 2020-04-15 08:10:00.000 | (NULL) |(NULL) |11.00 | | +I | 2020-04-15 08:10:00.000 | 2020-04-15 08:20:00.000 | (NULL) |(NULL) |10.00 |2.2.2. 源码示例
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
public class TVFSQLExample {
private static final Logger LOG = LoggerFactory.getLogger(TVFSQLExample.class);
public static void main(String[] args) {
EnvironmentSettings settings = null;
StreamTableEnvironment tEnv = null;
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
tEnv = StreamTableEnvironment.create(env, settings);
Configuration configuration = tEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true"); // local-global aggregation depends on mini-batch is enabled
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
configuration..setString("table.optimizer.distinct-agg.split.enabled", "true");
final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm");
DataStream dataStream =
env.fromElements(
new Bid(LocalDateTime.parse("2020-04-15 08:05",dateTimeFormatter), new BigDecimal("4.00"), "A", "supplier1"),
new Bid(LocalDateTime.parse("2020-04-15 08:07",dateTimeFormatter), new BigDecimal("2.00"), "A", "supplier1"),
new Bid(LocalDateTime.parse("2020-04-15 08:09",dateTimeFormatter), new BigDecimal("5.00"), "D", "supplier2"),
new Bid(LocalDateTime.parse("2020-04-15 08:11",dateTimeFormatter), new BigDecimal("3.00"), "F", "supplier2"),
new Bid(LocalDateTime.parse("2020-04-15 08:13",dateTimeFormatter), new BigDecimal("1.00"), "E", "supplier1"),
new Bid(LocalDateTime.parse("2020-04-15 08:17",dateTimeFormatter), new BigDecimal("6.00"), "F", "supplier2"));
Table table = tEnv.fromDataStream(dataStream, Schema.newBuilder()
.column("bidtime", DataTypes.TIMESTAMP(3))
.column("price", DataTypes.DECIMAL(10,2))
.column("item", DataTypes.STRING())
.column("supplier_id", DataTypes.STRING())
.watermark("bidtime", "bidtime - INTERVAL '1' SECOND")
.build());
table.printSchema();
tEnv.createTemporaryView("Bid",table);
String sql = "SELECt window_start, window_end, supplier_id,item, SUM(price) as price" +
" FROM TABLE(TUMBLE(TABLE Bid, DEscriptOR(bidtime), INTERVAL '10' MINUTES))" +
" GROUP BY window_start, window_end, GROUPING SETS ((supplier_id),(item),()) ";
TableResult res = tEnv.executeSql(sql);
res.print();
tEnv.dropTemporaryView("Bid");
}catch (Exception e){
LOG.error(e.getMessage(),e);
}
}
//必须是静态类,且属性
public static class Bid{
public LocalDateTime bidtime;
public BigDecimal price;
public String item;
public String supplier_id;
public Bid() {}
public Bid(LocalDateTime bidtime, BigDecimal price, String item,String supplier_id) {
this.bidtime = bidtime;
this.price = price;
this.item = item;
this.supplier_id = supplier_id;
}
}
}
2.3. ROLLUP
ROLLUP是一种用于指定通用类型分组集的简写符号,表示给定的表达式列表和列表的所有前缀,包括空列表。
- 具有ROLLUP的窗口聚合要求:要求window_start和window_end列都必须在GROUP BY子句中,而不是在ROLLUP子句中。
- ROLLUP是针对GROUPING SETS的简写,示例如下:
- ROLLUP(item,supplier_id) 相当于 GROUPING SETS((item,supplier_id),(item),())
- ROLLUP(supplier_id) 相当于 GROUPING SETS((supplier_id),())
- 如果ROLLUP的列表中定义了多个列,如A、B、C三列,则先按A、B、C进行分组统计,再按A、B进行分组合计、再按A进行分组合计,然后进行总合计,即在分组统计的基础上,逐步向上合计。注意:是支持去重合计的,如count(distinct(item))
CUBE是一种用于指定通用类型分组集的简写符号,表示给定的表达式列表及其所有可能的子集【幂集】。
- 具有CUBE的窗口聚合要求:要求window_start和window_end列都必须在GROUP BY子句中,而不是在CUBE子句中。
- CUBE是针对GROUPING SETS的简写,示例如下:
- CUBE(supplier_id) 相当于 GROUPING SETS((supplier_id),())
- CUBE(item,supplier_id) 相当于 GROUPING SETS((item,supplier_id),(item),(supplier_id),())
注意:Group Window Aggregation已经被弃用,鼓励使用 Window TVF Aggregation
与Group Window Aggregation相比,Window TVF Aggregation有很多优点,包括:
- 通过参数进行性能优化
- 支持标准的GROUPING SETS语法。
- 可以在窗口聚合结果后应用 Window TopN
- …
注意:
- Group Window Aggregation 在stream、batch下都支持,但 Window TVF Aggregation 只在stream下支持。
- Group Window Aggregation 支持session 窗口
语法如下:
TUMBLE(time_attr, interval) # 第二个interval 指定窗口的大小,第一个interval指定每次滑动的大小 HOP(time_attr, interval, interval) SESSION(time_attr, interval)3.2. Selecting Group Window Start and End Timestamps
Auxiliary Function
TUMBLE_START(time_attr, interval) HOP_START(time_attr, interval, interval) SESSION_START(time_attr, interval) TUMBLE_END(time_attr, interval) HOP_END(time_attr, interval, interval) SESSION_END(time_attr, interval) TUMBLE_ROWTIME(time_attr, interval) HOP_ROWTIME(time_attr, interval, interval) SESSION_ROWTIME(time_attr, interval) TUMBLE_PROCTIME(time_attr, interval) HOP_PROCTIME(time_attr, interval, interval) SESSION_PROCTIME(time_attr, interval)3.3. 源码示例
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
public class TVFSQLExample {
private static final Logger LOG = LoggerFactory.getLogger(TVFSQLExample.class);
public static void main(String[] args) {
EnvironmentSettings settings = null;
StreamTableEnvironment tEnv = null;
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
tEnv = StreamTableEnvironment.create(env, settings);
Configuration configuration = tEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true"); // local-global aggregation depends on mini-batch is enabled
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm");
DataStream dataStream =
env.fromElements(
new Bid(LocalDateTime.parse("2020-04-15 08:05",dateTimeFormatter), new BigDecimal("4.00"), "A", "supplier1"),
new Bid(LocalDateTime.parse("2020-04-15 08:07",dateTimeFormatter), new BigDecimal("2.00"), "A", "supplier1"),
new Bid(LocalDateTime.parse("2020-04-15 08:09",dateTimeFormatter), new BigDecimal("5.00"), "D", "supplier2"),
new Bid(LocalDateTime.parse("2020-04-15 08:11",dateTimeFormatter), new BigDecimal("3.00"), "F", "supplier2"),
new Bid(LocalDateTime.parse("2020-04-15 08:13",dateTimeFormatter), new BigDecimal("1.00"), "E", "supplier1"),
new Bid(LocalDateTime.parse("2020-04-15 08:17",dateTimeFormatter), new BigDecimal("6.00"), "F", "supplier2"));
Table table = tEnv.fromDataStream(dataStream, Schema.newBuilder()
.column("bidtime", DataTypes.TIMESTAMP(3))
.column("price", DataTypes.DECIMAL(10,2))
.column("item", DataTypes.STRING())
.column("supplier_id", DataTypes.STRING())
.watermark("bidtime", "bidtime - INTERVAL '1' SECOND")
.build());
table.printSchema();
tEnv.createTemporaryView("Bid",table);
String sql = "SELECt supplier_id,TUMBLE_START(bidtime, INTERVAL '10' MINUTE) AS wStart,TUMBLE_END(bidtime, INTERVAL '10' MINUTE) AS wEnd," +
" SUM(price) as price" +
" FROM Bid " +
" GROUP BY TUMBLE(bidtime, INTERVAL '10' MINUTE),supplier_id";
TableResult res = tEnv.executeSql(sql);
res.print();
tEnv.dropTemporaryView("Bid");
}catch (Exception e){
LOG.error(e.getMessage(),e);
}
}
//必须是静态类,且属性
public static class Bid{
public LocalDateTime bidtime;
public BigDecimal price;
public String item;
public String supplier_id;
public Bid() {}
public Bid(LocalDateTime bidtime, BigDecimal price, String item,String supplier_id) {
this.bidtime = bidtime;
this.price = price;
this.item = item;
this.supplier_id = supplier_id;
}
}
}



