栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink--Over Aggregation 使用讲解及源码示例

flink--Over Aggregation 使用讲解及源码示例

1. 什么是Over Aggregation

OVER聚合: 有序行范围内的每个输入行计算聚合值

  • 与GROUP BY聚合相比,OVER聚合不会将每个组的结果行数减少到一行
  • OVER聚合为每个输入行生成新的列,表示聚合值
2. Over Aggregation语法
SELECt
  agg_func(agg_col) OVER (
    [PARTITION BY col1[, col2, ...]]
    ORDER BY time_col
    range_definition),
  ...
FROM ...

注意: WINDOW子句可以用来在SELECt子句之外定义一个OVER窗口,这样使查询更具有可读性,也允许对多个聚合重用窗口定义。

示例:

-- 下面的查询为每个订单计算在当前订单之前一小时内收到的同一产品的所有订单的金额总和。
SELECT order_id, order_time, amount,
  SUM(amount) OVER (
    PARTITION BY product
    ORDER BY order_time
    RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
  ) AS one_hour_prod_amount_sum
FROM Orders

-- 下面的查询为每个订单计算在当前订单之前一小时内收到的同一产品的所有订单的金额总和 及 平均金额。
SELECt order_id, order_time, amount,
  SUM(amount) OVER w AS sum_amount,
  AVG(amount) OVER w AS avg_amount
FROM Orders
WINDOW w AS (
  PARTITION BY product
  ORDER BY order_time
  RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW)
2.1. ORDER BY

OVER窗口定义在一个有序的行序列集合上。
由于表没有固有的顺序,order by子句是必须的。
对于流式查询,Flink目前只支持按升序时间属性顺序定义的OVER窗口,其它顺序不支持。

2.2. PARTITION BY

OVER窗口可以在分区表上定义。
如果存在PARTITION BY子句,对于每个输入行,只在其分区的行上计算聚合。相当于基于分区列 进行分组计算聚合。

2.3. Range Definitions

范围定义指定聚合中包含多少行。使用BETWEEN子句定义范围,该子句定义下限和上限。这些边界之间的所有行都包含在聚合中。 Flink只支持CURRENT ROW作为上边界。

有两种方式进行范围定义: ROWS intervals 、 RANGE intervals.

2.3.1. RANGE intervals

RANGE intervals定义在ORDER BY列的值上,对于Flink,这列始终是一个时间属性

--下面的RANGE intervals定义时间属性最多比当前行少30分钟的所有行都包含在聚合中。
RANGE BETWEEN INTERVAL '30' MINUTE PRECEDING AND CURRENT ROW
2.3.2. ROW intervals

ROW intervals是一个基于计数的intervals。它确切地定义了在聚合中包含多少行。

--下面的ROWS intervals定义当前行之前的10行和当前行(总共11行)包含在聚合中。
ROWS BETWEEN 10 PRECEDING AND CURRENT ROW

--示例SQL
SELECt bidtime, price, item,supplier_id,
  max(price) OVER 
    PARTITION BY item 
    ORDER BY bidtime
    ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
  ) AS max_price
 FROM Bid
3. 源码示例 3.1. 源码
package com.streaming.flink.tvf;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

public class TVFSQLExample {
    private static final Logger LOG = LoggerFactory.getLogger(TVFSQLExample.class);

    public static void main(String[] args) {
        EnvironmentSettings settings = null;
        StreamTableEnvironment tEnv = null;
        try {

            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

            settings = EnvironmentSettings.newInstance()
                    .useBlinkPlanner()
                    .inStreamingMode()
                    .build();
            tEnv = StreamTableEnvironment.create(env, settings);
            Configuration configuration = tEnv.getConfig().getConfiguration();
            // set low-level key-value options
            configuration.setString("table.exec.mini-batch.enabled", "true"); // local-global aggregation depends on mini-batch is enabled
            configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
            configuration.setString("table.exec.mini-batch.size", "5000");
            configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
            configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");

            final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm");
            DataStream dataStream =
                    env.fromElements(
                            new Bid(LocalDateTime.parse("2020-04-15 08:05",dateTimeFormatter), new BigDecimal("4.00"), "A", "supplier1"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:06",dateTimeFormatter), new BigDecimal("4.00"), "A", "supplier2"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:07",dateTimeFormatter), new BigDecimal("2.00"), "A", "supplier1"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:08",dateTimeFormatter), new BigDecimal("2.00"), "A", "supplier3"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:09",dateTimeFormatter), new BigDecimal("5.00"), "B", "supplier4"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:11",dateTimeFormatter), new BigDecimal("2.00"), "B", "supplier3"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:13",dateTimeFormatter), new BigDecimal("1.00"), "B", "supplier1"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:15",dateTimeFormatter), new BigDecimal("3.00"), "B", "supplier2"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:17",dateTimeFormatter), new BigDecimal("6.00"), "B", "supplier5"));

            Table table = tEnv.fromDataStream(dataStream, Schema.newBuilder()
                    .column("bidtime", DataTypes.TIMESTAMP(3))
                    .column("price", DataTypes.DECIMAL(10,2))
                    .column("item", DataTypes.STRING())
                    .column("supplier_id", DataTypes.STRING())
                    .watermark("bidtime", "bidtime - INTERVAL '1' SECOND")
                    .build());
            table.printSchema();

            tEnv.createTemporaryView("Bid",table);

            String sql = "SELECt bidtime, price, item,supplier_id," +
                    "  SUM(price) OVER (" +
                    "    PARTITION BY item " +
                    "    ORDER BY bidtime " +
//                    "    RANGE BETWEEN INTERVAL '5' MINUTE PRECEDING AND CURRENT ROW" +
                    "    ROWS BETWEEN 3 PRECEDING AND CURRENT ROW" +
                    "  ) AS max_sum" +
                    " FROM Bid";
            TableResult res = tEnv.executeSql(sql);
            res.print();
            tEnv.dropTemporaryView("Bid");
        }catch (Exception e){
            LOG.error(e.getMessage(),e);
        }
    }

    //必须是静态类,且属性
    public static class Bid{
        public LocalDateTime bidtime;
        public BigDecimal price;
        public String item;
        public String supplier_id;

        public Bid() {}

        public Bid(LocalDateTime bidtime, BigDecimal price, String item,String supplier_id) {
            this.bidtime = bidtime;
            this.price = price;
            this.item = item;
            this.supplier_id = supplier_id;
        }
    }
}

3.2. 输出结果
| op |                 bidtime |        price | item | supplier_id |        max_sum |
+----+-------------------------+--------------+------+-------------+----------------+
| +I | 2020-04-15 08:05:00.000 |         4.00 |    A |   supplier1 |           4.00 |
| +I | 2020-04-15 08:06:00.000 |         4.00 |    A |   supplier2 |           8.00 |
| +I | 2020-04-15 08:07:00.000 |         2.00 |    A |   supplier1 |          10.00 |
| +I | 2020-04-15 08:08:00.000 |         2.00 |    A |   supplier3 |          12.00 |
| +I | 2020-04-15 08:09:00.000 |         5.00 |    B |   supplier4 |           5.00 |
| +I | 2020-04-15 08:11:00.000 |         2.00 |    B |   supplier3 |           7.00 |
| +I | 2020-04-15 08:13:00.000 |         1.00 |    B |   supplier1 |           8.00 |
| +I | 2020-04-15 08:15:00.000 |         3.00 |    B |   supplier2 |          11.00 |
| +I | 2020-04-15 08:17:00.000 |         6.00 |    B |   supplier5 |          12.00 |
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/600153.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号