栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink--Window Aggregation【TVF 聚合 及 分组窗口聚合 】讲解及源码示例

flink--Window Aggregation【TVF 聚合 及 分组窗口聚合 】讲解及源码示例

1. 什么是Window TVF Aggregation

Window TVF Aggregation 是指 基于应用Window TVF函数的分组统计,语法如下:

SELECt ...
FROM  -- relation applied windowing TVF
GROUP BY window_start, window_end, ...

在GROUP BY子句中定义的窗口聚合包含应用于窗口TVF函数产生的新关系的“window_start”和“window_end”列。
就像使用普通的GROUP BY查询一样,使用窗口聚合的group query将为每个组【group】计算出一个结果行。

2. Window TVF Aggregation 2.1. Window TVF Aggregation 基本用法

Flink支持TUMBLE、HOP和CUMULATE 类型的窗口聚合,使用示例详见:flink–Window TVF【窗口化表值函数】讲解及源码示例

SQL示例如下:

--TUMBLE函数
SELECt window_start, window_end, SUM(price)
FROM TABLE(TUMBLE(TABLE Bid, DEscriptOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end

--HOP函数
SELECt window_start, window_end, SUM(price)
FROM TABLE(HOP(TABLE Bid, DEscriptOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
GROUP BY window_start, window_end

--CUMULATE函数
SELECt window_start, window_end, SUM(price) 
FROM TABLE(CUMULATE(TABLE Bid, DEscriptOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
GROUP BY window_start, window_end
2.2. GROUPING SETS 2.2.1. 介绍

Grouping sets【分组集】允许进行比标准GROUP by所描述的更复杂的分组操作
行按照每个指定的分组集分别分组,并为每个分组计算聚合,就像简单的group by子句一样

  • 具有分组集的窗口聚合要求:要求window_start和window_end列都必须在GROUP BY子句中,而不是在GROUPING SETS子句中。
  • GROUPING SETS的每个子列表可以指定零个或多个列或表达式,就像直接在GROUP BY子句中使用一样
  • 空分组集意味着所有行都聚合为一个组,即使没有输入行,这也是输出。
SELECt window_start, window_end, supplier_id, SUM(price) as price
FROM TABLE(TUMBLE(TABLE Bid, DEscriptOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, GROUPING SETS ((supplier_id), ())

注意:GROUPING SETS可以指定多个分组条件,如((supplier_id),(item),()) 即指定了三组分组条件,针对一个窗口内的数据,会分别按supplier_id、item、所有行聚合为一个组进行分组计算

|          bidtime | price | item | supplier_id |
+------------------+-------+------+-------------+
| 2020-04-15 08:05 | 4.00  | A    | supplier1   |
| 2020-04-15 08:07 | 2.00  | A    | supplier1   |
| 2020-04-15 08:09 | 5.00  | D    | supplier2   |
| 2020-04-15 08:11 | 3.00  | F    | supplier2   |
| 2020-04-15 08:13 | 1.00  | E    | supplier1   |
| 2020-04-15 08:17 | 6.00  | F    | supplier2   |

按 GROUP BY window_start, window_end, GROUPING SETS ((supplier_id),(item),()) 分组聚合后的结果如下:
| op |            window_start |              window_end | supplier_id |  item |price |
+----+-------------------------+-------------------------+-------------+-------+------+
| +I | 2020-04-15 08:00:00.000 | 2020-04-15 08:10:00.000 |   supplier1 |(NULL) | 6.00 |
| +I | 2020-04-15 08:10:00.000 | 2020-04-15 08:20:00.000 |   supplier1 |(NULL) | 1.00 |
| +I | 2020-04-15 08:00:00.000 | 2020-04-15 08:10:00.000 |   supplier2 |(NULL) | 5.00 |
| +I | 2020-04-15 08:10:00.000 | 2020-04-15 08:20:00.000 |   supplier2 |(NULL) | 9.00 |
| +I | 2020-04-15 08:10:00.000 | 2020-04-15 08:20:00.000 |      (NULL) |     E | 1.00 |
| +I | 2020-04-15 08:10:00.000 | 2020-04-15 08:20:00.000 |      (NULL) |     F | 9.00 |
| +I | 2020-04-15 08:00:00.000 | 2020-04-15 08:10:00.000 |      (NULL) |     A | 6.00 |
| +I | 2020-04-15 08:00:00.000 | 2020-04-15 08:10:00.000 |      (NULL) |     D | 5.00 |
| +I | 2020-04-15 08:00:00.000 | 2020-04-15 08:10:00.000 |      (NULL) |(NULL) |11.00 |
| +I | 2020-04-15 08:10:00.000 | 2020-04-15 08:20:00.000 |      (NULL) |(NULL) |10.00 |
2.2.2. 源码示例
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

public class TVFSQLExample {
    private static final Logger LOG = LoggerFactory.getLogger(TVFSQLExample.class);

    public static void main(String[] args) {
        EnvironmentSettings settings = null;
        StreamTableEnvironment tEnv = null;
        try {

            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

            settings = EnvironmentSettings.newInstance()
                    .useBlinkPlanner()
                    .inStreamingMode()
                    .build();
            tEnv = StreamTableEnvironment.create(env, settings);
            Configuration configuration = tEnv.getConfig().getConfiguration();
			// set low-level key-value options
			configuration.setString("table.exec.mini-batch.enabled", "true"); // local-global aggregation depends on mini-batch is enabled
			configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
			configuration.setString("table.exec.mini-batch.size", "5000");
			configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
			configuration..setString("table.optimizer.distinct-agg.split.enabled", "true");  

            final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm");
            DataStream dataStream =
                    env.fromElements(
                            new Bid(LocalDateTime.parse("2020-04-15 08:05",dateTimeFormatter), new BigDecimal("4.00"), "A", "supplier1"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:07",dateTimeFormatter), new BigDecimal("2.00"), "A", "supplier1"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:09",dateTimeFormatter), new BigDecimal("5.00"), "D", "supplier2"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:11",dateTimeFormatter), new BigDecimal("3.00"), "F", "supplier2"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:13",dateTimeFormatter), new BigDecimal("1.00"), "E", "supplier1"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:17",dateTimeFormatter), new BigDecimal("6.00"), "F", "supplier2"));

            Table table = tEnv.fromDataStream(dataStream, Schema.newBuilder()
                    .column("bidtime", DataTypes.TIMESTAMP(3))
                    .column("price", DataTypes.DECIMAL(10,2))
                    .column("item", DataTypes.STRING())
                    .column("supplier_id", DataTypes.STRING())
                    .watermark("bidtime", "bidtime - INTERVAL '1' SECOND")
                    .build());
            table.printSchema();

            tEnv.createTemporaryView("Bid",table);

            String sql = "SELECt window_start, window_end, supplier_id,item, SUM(price) as price" +
                    "  FROM TABLE(TUMBLE(TABLE Bid, DEscriptOR(bidtime), INTERVAL '10' MINUTES))" +
                    "  GROUP BY window_start, window_end, GROUPING SETS ((supplier_id),(item),()) ";
            TableResult res = tEnv.executeSql(sql);
            res.print();
            tEnv.dropTemporaryView("Bid");
        }catch (Exception e){
            LOG.error(e.getMessage(),e);
        }
    }

    //必须是静态类,且属性
    public static class Bid{
        public LocalDateTime bidtime;
        public BigDecimal price;
        public String item;
        public String supplier_id;

        public Bid() {}

        public Bid(LocalDateTime bidtime, BigDecimal price, String item,String supplier_id) {
            this.bidtime = bidtime;
            this.price = price;
            this.item = item;
            this.supplier_id = supplier_id;
        }
    }
}

2.3. ROLLUP

ROLLUP是一种用于指定通用类型分组集的简写符号,表示给定的表达式列表和列表的所有前缀,包括空列表。

  • 具有ROLLUP的窗口聚合要求:要求window_start和window_end列都必须在GROUP BY子句中,而不是在ROLLUP子句中。
  • ROLLUP是针对GROUPING SETS的简写,示例如下:
    • ROLLUP(item,supplier_id) 相当于 GROUPING SETS((item,supplier_id),(item),())
    • ROLLUP(supplier_id) 相当于 GROUPING SETS((supplier_id),())
    • 如果ROLLUP的列表中定义了多个列,如A、B、C三列,则先按A、B、C进行分组统计,再按A、B进行分组合计、再按A进行分组合计,然后进行总合计,即在分组统计的基础上,逐步向上合计。注意:是支持去重合计的,如count(distinct(item))
2.4. CUBE

CUBE是一种用于指定通用类型分组集的简写符号,表示给定的表达式列表及其所有可能的子集【幂集】。

  • 具有CUBE的窗口聚合要求:要求window_start和window_end列都必须在GROUP BY子句中,而不是在CUBE子句中。
  • CUBE是针对GROUPING SETS的简写,示例如下:
    • CUBE(supplier_id) 相当于 GROUPING SETS((supplier_id),())
    • CUBE(item,supplier_id) 相当于 GROUPING SETS((item,supplier_id),(item),(supplier_id),())
3. Group Window Aggregation

注意:Group Window Aggregation已经被弃用,鼓励使用 Window TVF Aggregation
与Group Window Aggregation相比,Window TVF Aggregation有很多优点,包括:

  • 通过参数进行性能优化
  • 支持标准的GROUPING SETS语法。
  • 可以在窗口聚合结果后应用 Window TopN

注意:

  • Group Window Aggregation 在stream、batch下都支持,但 Window TVF Aggregation 只在stream下支持。
  • Group Window Aggregation 支持session 窗口
3.1. Group Window Functions

语法如下:

TUMBLE(time_attr, interval)
# 第二个interval 指定窗口的大小,第一个interval指定每次滑动的大小
HOP(time_attr, interval, interval)
SESSION(time_attr, interval)
3.2. Selecting Group Window Start and End Timestamps

Auxiliary Function

TUMBLE_START(time_attr, interval)
HOP_START(time_attr, interval, interval)
SESSION_START(time_attr, interval)

TUMBLE_END(time_attr, interval)
HOP_END(time_attr, interval, interval)
SESSION_END(time_attr, interval)

TUMBLE_ROWTIME(time_attr, interval)
HOP_ROWTIME(time_attr, interval, interval)
SESSION_ROWTIME(time_attr, interval)

TUMBLE_PROCTIME(time_attr, interval)
HOP_PROCTIME(time_attr, interval, interval)
SESSION_PROCTIME(time_attr, interval)
3.3. 源码示例
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

public class TVFSQLExample {
    private static final Logger LOG = LoggerFactory.getLogger(TVFSQLExample.class);

    public static void main(String[] args) {
        EnvironmentSettings settings = null;
        StreamTableEnvironment tEnv = null;
        try {

            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

            settings = EnvironmentSettings.newInstance()
                    .useBlinkPlanner()
                    .inStreamingMode()
                    .build();
            tEnv = StreamTableEnvironment.create(env, settings);
            Configuration configuration = tEnv.getConfig().getConfiguration();
            // set low-level key-value options
            configuration.setString("table.exec.mini-batch.enabled", "true"); // local-global aggregation depends on mini-batch is enabled
            configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
            configuration.setString("table.exec.mini-batch.size", "5000");
            configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
            configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");

            final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm");
            DataStream dataStream =
                    env.fromElements(
                            new Bid(LocalDateTime.parse("2020-04-15 08:05",dateTimeFormatter), new BigDecimal("4.00"), "A", "supplier1"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:07",dateTimeFormatter), new BigDecimal("2.00"), "A", "supplier1"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:09",dateTimeFormatter), new BigDecimal("5.00"), "D", "supplier2"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:11",dateTimeFormatter), new BigDecimal("3.00"), "F", "supplier2"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:13",dateTimeFormatter), new BigDecimal("1.00"), "E", "supplier1"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:17",dateTimeFormatter), new BigDecimal("6.00"), "F", "supplier2"));

            Table table = tEnv.fromDataStream(dataStream, Schema.newBuilder()
                    .column("bidtime", DataTypes.TIMESTAMP(3))
                    .column("price", DataTypes.DECIMAL(10,2))
                    .column("item", DataTypes.STRING())
                    .column("supplier_id", DataTypes.STRING())
                    .watermark("bidtime", "bidtime - INTERVAL '1' SECOND")
                    .build());
            table.printSchema();

            tEnv.createTemporaryView("Bid",table);

            String sql = "SELECt supplier_id,TUMBLE_START(bidtime, INTERVAL '10' MINUTE) AS wStart,TUMBLE_END(bidtime, INTERVAL '10' MINUTE) AS wEnd," +
                    "  SUM(price) as price" +
                    " FROM Bid " +
                    " GROUP BY TUMBLE(bidtime, INTERVAL '10' MINUTE),supplier_id";
            TableResult res = tEnv.executeSql(sql);
            res.print();
            tEnv.dropTemporaryView("Bid");
        }catch (Exception e){
            LOG.error(e.getMessage(),e);
        }
    }

    //必须是静态类,且属性
    public static class Bid{
        public LocalDateTime bidtime;
        public BigDecimal price;
        public String item;
        public String supplier_id;

        public Bid() {}

        public Bid(LocalDateTime bidtime, BigDecimal price, String item,String supplier_id) {
            this.bidtime = bidtime;
            this.price = price;
            this.item = item;
            this.supplier_id = supplier_id;
        }
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/604689.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号