Window Top-N是一个特殊的Top-N,它返回每个窗口和其他分区键的N个最小或最大值。
- 对于流查询,与连续表上的常规top-N不同,窗口top -N不产生中间结果,只产生最终结果,即一个窗口中包含数据集的top -N记录。
- Top-N窗口在不再需要时清除所有中间状态,因此如果用户不需要更新每条记录的结果,窗口Top-N查询具有更好的性能
- 通常窗口Top-N与窗口聚合一起使用
Window Top-N要求PARTITION BY子句包含应用窗口TVF或窗口聚合函数产生的新关系的window_start和window_end列
SELECt [column_list]
FROM (
SELECt [column_list],
ROW_NUMBER() OVER (PARTITION BY window_start, window_end [, col_key1...]
ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
FROM table_name) -- relation applied windowing TVF
WHERe rownum <= N [AND conditions]
--示例SQL如下:
SELECt * FROM (
SELECt *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownum
FROM (
SELECt window_start, window_end, supplier_id,LISTAGG(DATE_FORMAT(bidtime,'HH:mm:ss'),','),
SUM(price) as price, COUNT(*) as cnt
FROM TABLE( TUMBLE(TABLE Bid, DEscriptOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, supplier_id
)) WHERe rownum <= 3;
3. 源码示例
3.1. 源码
package com.streaming.flink.tvf;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
public class TVFSQLExample {
private static final Logger LOG = LoggerFactory.getLogger(TVFSQLExample.class);
public static void main(String[] args) {
EnvironmentSettings settings = null;
StreamTableEnvironment tEnv = null;
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
tEnv = StreamTableEnvironment.create(env, settings);
Configuration configuration = tEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true"); // local-global aggregation depends on mini-batch is enabled
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm");
DataStream dataStream =
env.fromElements(
new Bid(LocalDateTime.parse("2020-04-15 08:05",dateTimeFormatter), new BigDecimal("4.00"), "A", "supplier1"),
new Bid(LocalDateTime.parse("2020-04-15 08:06",dateTimeFormatter), new BigDecimal("4.00"), "C", "supplier2"),
new Bid(LocalDateTime.parse("2020-04-15 08:07",dateTimeFormatter), new BigDecimal("2.00"), "G", "supplier1"),
new Bid(LocalDateTime.parse("2020-04-15 08:08",dateTimeFormatter), new BigDecimal("2.00"), "B", "supplier3"),
new Bid(LocalDateTime.parse("2020-04-15 08:09",dateTimeFormatter), new BigDecimal("5.00"), "D", "supplier4"),
new Bid(LocalDateTime.parse("2020-04-15 08:11",dateTimeFormatter), new BigDecimal("2.00"), "B", "supplier3"),
new Bid(LocalDateTime.parse("2020-04-15 08:13",dateTimeFormatter), new BigDecimal("1.00"), "E", "supplier1"),
new Bid(LocalDateTime.parse("2020-04-15 08:15",dateTimeFormatter), new BigDecimal("3.00"), "H", "supplier2"),
new Bid(LocalDateTime.parse("2020-04-15 08:17",dateTimeFormatter), new BigDecimal("6.00"), "F", "supplier5"));
Table table = tEnv.fromDataStream(dataStream, Schema.newBuilder()
.column("bidtime", DataTypes.TIMESTAMP(3))
.column("price", DataTypes.DECIMAL(10,2))
.column("item", DataTypes.STRING())
.column("supplier_id", DataTypes.STRING())
.watermark("bidtime", "bidtime - INTERVAL '1' SECOND")
.build());
table.printSchema();
tEnv.createTemporaryView("Bid",table);
String sql = "SELECt * FROM (" +
" SELECt *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownum" +
" FROM (" +
" SELECt window_start, window_end, supplier_id,LISTAGG(DATE_FORMAT(bidtime,'HH:mm:ss'),','), SUM(price) as price, COUNT(*) as cnt" +
" FROM TABLE( TUMBLE(TABLE Bid, DEscriptOR(bidtime), INTERVAL '10' MINUTES))" +
" GROUP BY window_start, window_end, supplier_id" +
" )) WHERe rownum <= 3";
TableResult res = tEnv.executeSql(sql);
res.print();
tEnv.dropTemporaryView("Bid");
}catch (Exception e){
LOG.error(e.getMessage(),e);
}
}
//必须是静态类,且属性
public static class Bid{
public LocalDateTime bidtime;
public BigDecimal price;
public String item;
public String supplier_id;
public Bid() {}
public Bid(LocalDateTime bidtime, BigDecimal price, String item,String supplier_id) {
this.bidtime = bidtime;
this.price = price;
this.item = item;
this.supplier_id = supplier_id;
}
}
}
3.2. 输出结果
| op | window_start | window_end | supplier_id | price |cnt|rownum | +----+-------------------------+-------------------------+-------------+-------+---+-------+ | +I | 2020-04-15 08:00:00.000 | 2020-04-15 08:10:00.000 | supplier1 | 6.00 | 2 | 1 | | +I | 2020-04-15 08:00:00.000 | 2020-04-15 08:10:00.000 | supplier4 | 5.00 | 1 | 2 | | +I | 2020-04-15 08:00:00.000 | 2020-04-15 08:10:00.000 | supplier2 | 4.00 | 1 | 3 | | +I | 2020-04-15 08:10:00.000 | 2020-04-15 08:20:00.000 | supplier5 | 6.00 | 1 | 1 | | +I | 2020-04-15 08:10:00.000 | 2020-04-15 08:20:00.000 | supplier2 | 3.00 | 1 | 2 | | +I | 2020-04-15 08:10:00.000 | 2020-04-15 08:20:00.000 | supplier3 | 2.00 | 1 | 3 |



