栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flinksql--windowtop-N <flink 1.12或者1.14>

flinksql--windowtop-N <flink 1.12或者1.14>

  1. 窗口 top-N flink 1.12
package com.cn.sql;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


public class SqlDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment sTenv = StreamTableEnvironment.create(env);
        //todo 创建kafka表
        String sql =
                "CREATE TABLE TEST (" +
                        "`bidtime` TIMESTAMP(3),n" +
                        "`price` FLOAT,n" +
                        "`item` VARCHAr(3),n" +
                        "`supplier_id` STRING,n" +
                        " WATERMARK FOR bidtime as bidtime - INTERVAL '5' SECOND)n" +
                        "WITH(" +
                        "'connector'='kafka',n" +
                        "'properties.bootstrap.servers'='localhost:9092',n" +
                        "'topic'='kafkaS',n" +
                        "'properties.group.id'='a'," +
                        "'format'='csv')";
        sTenv.executeSql(sql);
        //todo 创建输出表
        sTenv.executeSql("CREATE TABLE TESTS (" +
                "`window_start`  TIMESTAMP(3) NOT NULL,n" +
                "`window_end`  TIMESTAMP(3) NOT NULL,n" +
                "`supplier_id` STRING ,n" +
                "`price` FLOAT,n" +
                "`rownum` BIGINT NOT NULL,n" +
                "`cnt` BIGINT NOT NULLn)" +
                "WITH(" +
                "'connector'='print')");
        //todo 执行逻辑
        sTenv.executeSql("INSERT INTO TESTS SELECt *n" +
                "  FROM (n" +
                "    SELECt *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownumn" +
                "    FROM (n" +
                "    SELECt  n" +
                "    TUMBLE_START(bidtime, INTERVAL '10' MINUTES) as  window_start,  n" +
                "    TUMBLE_END(bidtime, INTERVAL '10' MINUTES) as  window_end, n " +
                "    supplier_id, n " +
                "    SUM(price) as price,n" +
                "    COUNT(*) as cnt  FROM TEST n" +
                "    GROUP BY TUMBLE(bidtime, INTERVAL '10' MINUTES),supplier_id )n" +
                "    ) n" +
                "   WHERe rownum <= 3 ");
    }
}
//也可以 使用CTE 公共表表达式增加可读性
   //todo 执行逻辑
        sTenv.executeSql("INSERT INTO TESTS n" +
                "WITH n" +
                "orders_with_total AS (n" +
                "    SELECt  n" +
                "    TUMBLE_START(bidtime, INTERVAL '10' MINUTES) as  window_start,  n" +
                "    TUMBLE_END(bidtime, INTERVAL '10' MINUTES) as  window_end, n " +
                "    supplier_id, n " +
                "    SUM(price) as price,n" +
                "    COUNT(*) as cnt  FROM TEST n" +
                "    GROUP BY TUMBLE(bidtime, INTERVAL '10' MINUTES),supplier_id )," +
                "orders_with_totasl AS ( n" +
                "    SELECt *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownumn" +
                "    FROM orders_with_total n" +
                ")  n" +
                " SELECt *n" +
                "  FROM orders_with_totasln" +
                "  WHERe rownum <= 3 ");
  1. 窗口 top-N flink 1.14
    (14 版本的 明显简化不少)
SELECt *
  FROM (
    SELECt *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownum
    FROM (
      SELECt window_start, window_end, supplier_id, SUM(price) as price, COUNT(*) as cnt
      FROM TABLE(
        TUMBLE(TABLE Bid, DEscriptOR(bidtime), INTERVAL '10' MINUTES))
      GROUP BY window_start, window_end, supplier_id
    )
  ) WHERe rownum <= 3

3. 时间类型的使用 确定watermark

(默认TIMPSTAMP 类型 是 yyyy-MM-dd HH:mm:ss.SSSSSSSSS)

1 当类型不是默认类型 例如 yyyy-MM-dd HH:mm
CREATE TABLE TEST (" +
                        "`bidtime` VARCHAr(20),n" +
                        "`price` FLOAT,n" +
                        "`item` VARCHAr(3),n" +
                        "`supplier_id` STRING,n" +
                        "`ts` AS TO_TIMESTAMP(bidtime,'yyyy-MM-dd HH:mm'),n" +
                        " WATERMARK FOR ts as ts - INTERVAL '5' SECOND)n" +
                        "WITH(" +
                        "'connector'='kafka',n" +
                        "'properties.bootstrap.servers'='localhost:9092',n" +
                        "'topic'='kafkaS',n" +
                        "'properties.group.id'='a'," +
                        "'format'='csv');

2 时间戳 (长整型的数字)
 "CREATE TABLE TEST (" +
                        "`bidtime` BIGINT,n" +
                        "`price` FLOAT,n" +
                        "`item` VARCHAr(3),n" +
                        "`supplier_id` STRING,n" +
                        "`ts` AS TO_TIMESTAMP(FROM_UNIXTIME(bidtime,'yyyy-MM-dd HH:mm')),n" +
                        " WATERMARK FOR ts as ts - INTERVAL '5' SECOND)n" +
                        "WITH(" +
                        "'connector'='kafka',n" +
                        "'properties.bootstrap.servers'='localhost:9092',n" +
                        "'topic'='kafkaS',n" +
                        "'properties.group.id'='a'," +
                        "'format'='csv')"

注意:这里是 时间 TIMPSTAP 类型默认为yyyy-MM-dd HH:mm:ss 如果 输入的格式不一致 或造成格式化异常 需要 上面的转化 就"ts AS TO_TIMESTAMP(bidtime,‘yyyy-MM-dd HH:mm’)
如果时间是以时间戳的形式出现那么类型BIGINT 需要使用 flink 1.14 版本需要 TO_TIMESTAMP_LTZ()转化一下 如果 是flink 1.12 版本 "ts AS TO_TIMESTAMP(FROM_UNIXTIME(bidtime,‘yyyy-MM-dd HH:mm’)) 就是先要 转化为本地时间 然你在转化为时间戳 所以 在埋点数据或者其他数据 最好是用 时间格式 而不是时间错

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/434043.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号