栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink-- Window Top-N使用讲解及源码示例

flink-- Window Top-N使用讲解及源码示例

1. 什么是Window Top-N

Window Top-N是一个特殊的Top-N,它返回每个窗口和其他分区键的N个最小或最大值。

  • 对于流查询,与连续表上的常规top-N不同,窗口top -N不产生中间结果,只产生最终结果,即一个窗口中包含数据集的top -N记录。
  • Top-N窗口在不再需要时清除所有中间状态,因此如果用户不需要更新每条记录的结果,窗口Top-N查询具有更好的性能
  • 通常窗口Top-N与窗口聚合一起使用
2. window Top-N语法

Window Top-N要求PARTITION BY子句包含应用窗口TVF或窗口聚合函数产生的新关系的window_start和window_end列

SELECt [column_list]
FROM (
   SELECt [column_list],
     ROW_NUMBER() OVER (PARTITION BY window_start, window_end [, col_key1...]
       ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
   FROM table_name) -- relation applied windowing TVF
WHERe rownum <= N [AND conditions]

--示例SQL如下:
SELECt * FROM (
    SELECt *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownum
    FROM (
      SELECt window_start, window_end, supplier_id,LISTAGG(DATE_FORMAT(bidtime,'HH:mm:ss'),','), 
       SUM(price) as price, COUNT(*) as cnt
      FROM TABLE( TUMBLE(TABLE Bid, DEscriptOR(bidtime), INTERVAL '10' MINUTES))
      GROUP BY window_start, window_end, supplier_id
    )) WHERe rownum <= 3;
3. 源码示例 3.1. 源码
package com.streaming.flink.tvf;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

public class TVFSQLExample {
    private static final Logger LOG = LoggerFactory.getLogger(TVFSQLExample.class);

    public static void main(String[] args) {
        EnvironmentSettings settings = null;
        StreamTableEnvironment tEnv = null;
        try {

            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

            settings = EnvironmentSettings.newInstance()
                    .useBlinkPlanner()
                    .inStreamingMode()
                    .build();
            tEnv = StreamTableEnvironment.create(env, settings);
            Configuration configuration = tEnv.getConfig().getConfiguration();
            // set low-level key-value options
            configuration.setString("table.exec.mini-batch.enabled", "true"); // local-global aggregation depends on mini-batch is enabled
            configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
            configuration.setString("table.exec.mini-batch.size", "5000");
            configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
            configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");

            final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm");
            DataStream dataStream =
                    env.fromElements(
                            new Bid(LocalDateTime.parse("2020-04-15 08:05",dateTimeFormatter), new BigDecimal("4.00"), "A", "supplier1"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:06",dateTimeFormatter), new BigDecimal("4.00"), "C", "supplier2"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:07",dateTimeFormatter), new BigDecimal("2.00"), "G", "supplier1"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:08",dateTimeFormatter), new BigDecimal("2.00"), "B", "supplier3"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:09",dateTimeFormatter), new BigDecimal("5.00"), "D", "supplier4"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:11",dateTimeFormatter), new BigDecimal("2.00"), "B", "supplier3"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:13",dateTimeFormatter), new BigDecimal("1.00"), "E", "supplier1"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:15",dateTimeFormatter), new BigDecimal("3.00"), "H", "supplier2"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:17",dateTimeFormatter), new BigDecimal("6.00"), "F", "supplier5"));

            Table table = tEnv.fromDataStream(dataStream, Schema.newBuilder()
                    .column("bidtime", DataTypes.TIMESTAMP(3))
                    .column("price", DataTypes.DECIMAL(10,2))
                    .column("item", DataTypes.STRING())
                    .column("supplier_id", DataTypes.STRING())
                    .watermark("bidtime", "bidtime - INTERVAL '1' SECOND")
                    .build());
            table.printSchema();

            tEnv.createTemporaryView("Bid",table);

            String sql = "SELECt * FROM (" +
                    "    SELECt *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownum" +
                    "    FROM (" +
                    "      SELECt window_start, window_end, supplier_id,LISTAGG(DATE_FORMAT(bidtime,'HH:mm:ss'),','), SUM(price) as price, COUNT(*) as cnt" +
                    "      FROM TABLE( TUMBLE(TABLE Bid, DEscriptOR(bidtime), INTERVAL '10' MINUTES))" +
                    "      GROUP BY window_start, window_end, supplier_id" +
                    "    )) WHERe rownum <= 3";
            TableResult res = tEnv.executeSql(sql);
            res.print();
            tEnv.dropTemporaryView("Bid");
        }catch (Exception e){
            LOG.error(e.getMessage(),e);
        }
    }

    //必须是静态类,且属性
    public static class Bid{
        public LocalDateTime bidtime;
        public BigDecimal price;
        public String item;
        public String supplier_id;

        public Bid() {}

        public Bid(LocalDateTime bidtime, BigDecimal price, String item,String supplier_id) {
            this.bidtime = bidtime;
            this.price = price;
            this.item = item;
            this.supplier_id = supplier_id;
        }
    }
}

3.2. 输出结果
| op |            window_start |              window_end | supplier_id | price |cnt|rownum |
+----+-------------------------+-------------------------+-------------+-------+---+-------+
| +I | 2020-04-15 08:00:00.000 | 2020-04-15 08:10:00.000 |   supplier1 |  6.00 | 2 | 1     |
| +I | 2020-04-15 08:00:00.000 | 2020-04-15 08:10:00.000 |   supplier4 |  5.00 | 1 | 2     |
| +I | 2020-04-15 08:00:00.000 | 2020-04-15 08:10:00.000 |   supplier2 |  4.00 | 1 | 3     |
| +I | 2020-04-15 08:10:00.000 | 2020-04-15 08:20:00.000 |   supplier5 |  6.00 | 1 | 1     |
| +I | 2020-04-15 08:10:00.000 | 2020-04-15 08:20:00.000 |   supplier2 |  3.00 | 1 | 2     |
| +I | 2020-04-15 08:10:00.000 | 2020-04-15 08:20:00.000 |   supplier3 |  2.00 | 1 | 3     |
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/618695.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号