- 窗口 top-N flink 1.12
package com.cn.sql;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class SqlDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment sTenv = StreamTableEnvironment.create(env);
//todo 创建kafka表
String sql =
"CREATE TABLE TEST (" +
"`bidtime` TIMESTAMP(3),n" +
"`price` FLOAT,n" +
"`item` VARCHAr(3),n" +
"`supplier_id` STRING,n" +
" WATERMARK FOR bidtime as bidtime - INTERVAL '5' SECOND)n" +
"WITH(" +
"'connector'='kafka',n" +
"'properties.bootstrap.servers'='localhost:9092',n" +
"'topic'='kafkaS',n" +
"'properties.group.id'='a'," +
"'format'='csv')";
sTenv.executeSql(sql);
//todo 创建输出表
sTenv.executeSql("CREATE TABLE TESTS (" +
"`window_start` TIMESTAMP(3) NOT NULL,n" +
"`window_end` TIMESTAMP(3) NOT NULL,n" +
"`supplier_id` STRING ,n" +
"`price` FLOAT,n" +
"`rownum` BIGINT NOT NULL,n" +
"`cnt` BIGINT NOT NULLn)" +
"WITH(" +
"'connector'='print')");
//todo 执行逻辑
sTenv.executeSql("INSERT INTO TESTS SELECt *n" +
" FROM (n" +
" SELECt *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownumn" +
" FROM (n" +
" SELECt n" +
" TUMBLE_START(bidtime, INTERVAL '10' MINUTES) as window_start, n" +
" TUMBLE_END(bidtime, INTERVAL '10' MINUTES) as window_end, n " +
" supplier_id, n " +
" SUM(price) as price,n" +
" COUNT(*) as cnt FROM TEST n" +
" GROUP BY TUMBLE(bidtime, INTERVAL '10' MINUTES),supplier_id )n" +
" ) n" +
" WHERe rownum <= 3 ");
}
}
//也可以 使用CTE 公共表表达式增加可读性
//todo 执行逻辑
sTenv.executeSql("INSERT INTO TESTS n" +
"WITH n" +
"orders_with_total AS (n" +
" SELECt n" +
" TUMBLE_START(bidtime, INTERVAL '10' MINUTES) as window_start, n" +
" TUMBLE_END(bidtime, INTERVAL '10' MINUTES) as window_end, n " +
" supplier_id, n " +
" SUM(price) as price,n" +
" COUNT(*) as cnt FROM TEST n" +
" GROUP BY TUMBLE(bidtime, INTERVAL '10' MINUTES),supplier_id )," +
"orders_with_totasl AS ( n" +
" SELECt *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownumn" +
" FROM orders_with_total n" +
") n" +
" SELECt *n" +
" FROM orders_with_totasln" +
" WHERe rownum <= 3 ");
- 窗口 top-N flink 1.14
(14 版本的 明显简化不少)
SELECt *
FROM (
SELECt *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownum
FROM (
SELECt window_start, window_end, supplier_id, SUM(price) as price, COUNT(*) as cnt
FROM TABLE(
TUMBLE(TABLE Bid, DEscriptOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, supplier_id
)
) WHERe rownum <= 3
3. 时间类型的使用 确定watermark
(默认TIMPSTAMP 类型 是 yyyy-MM-dd HH:mm:ss.SSSSSSSSS)
1 当类型不是默认类型 例如 yyyy-MM-dd HH:mmCREATE TABLE TEST (" +
"`bidtime` VARCHAr(20),n" +
"`price` FLOAT,n" +
"`item` VARCHAr(3),n" +
"`supplier_id` STRING,n" +
"`ts` AS TO_TIMESTAMP(bidtime,'yyyy-MM-dd HH:mm'),n" +
" WATERMARK FOR ts as ts - INTERVAL '5' SECOND)n" +
"WITH(" +
"'connector'='kafka',n" +
"'properties.bootstrap.servers'='localhost:9092',n" +
"'topic'='kafkaS',n" +
"'properties.group.id'='a'," +
"'format'='csv');
2 时间戳 (长整型的数字)
"CREATE TABLE TEST (" +
"`bidtime` BIGINT,n" +
"`price` FLOAT,n" +
"`item` VARCHAr(3),n" +
"`supplier_id` STRING,n" +
"`ts` AS TO_TIMESTAMP(FROM_UNIXTIME(bidtime,'yyyy-MM-dd HH:mm')),n" +
" WATERMARK FOR ts as ts - INTERVAL '5' SECOND)n" +
"WITH(" +
"'connector'='kafka',n" +
"'properties.bootstrap.servers'='localhost:9092',n" +
"'topic'='kafkaS',n" +
"'properties.group.id'='a'," +
"'format'='csv')"
注意:这里是 时间 TIMPSTAP 类型默认为yyyy-MM-dd HH:mm:ss 如果 输入的格式不一致 或造成格式化异常 需要 上面的转化 就"ts AS TO_TIMESTAMP(bidtime,‘yyyy-MM-dd HH:mm’)
如果时间是以时间戳的形式出现那么类型BIGINT 需要使用 flink 1.14 版本需要 TO_TIMESTAMP_LTZ()转化一下 如果 是flink 1.12 版本 "ts AS TO_TIMESTAMP(FROM_UNIXTIME(bidtime,‘yyyy-MM-dd HH:mm’)) 就是先要 转化为本地时间 然你在转化为时间戳 所以 在埋点数据或者其他数据 最好是用 时间格式 而不是时间错



