2016年12月,国际标准化组织(ISO)发布了新版SQL标准,其中包括Row Pattern Recognition in SQL (ISO/IEC TR 19075-5:2016)。标准中有很多示例,感兴趣的可以看一下。
Row Pattern Recognition 标准允许Flink使用 MATCH_RECOGNIZE clause合并CEP和SQL API,以便在SQL中处理复杂的事件。
1.2. Pattern Recognition支持的任务MATCH_RECOGNIZE clause 支持以下任务:
- 对数据进行逻辑分区和排序【PARTITION BY and ORDER BY clauses】
- 定义要查找的行模式,语法类似于正则表达式的语法【PATTERN clause】
- 指定行模式变量的逻辑组件【DEFINE clause】
- 定义度量表达式【即输出的内容】,这些表达式可用于SQL查询的其他部分【MEASURES clause】
每个MATCH_RECOGNIZE查询都由以下子句组成:
- PARTITION BY - 定义表的逻辑分区,类似于GROUP BY操作。
- ORDER BY - 指定传入的行应该如何排序;这很重要,因为模式依赖于顺序。
- MEASURES - 定义输出的内容,类似于SELECt子句。
- ONE ROW PER MATCH - 输出模式,它定义了每个匹配应该产生多少行。
- AFTER MATCH SKIP - 指定下一个匹配应该从哪里开始;这也是一种控制单个事件可以属于多少不同匹配的方法。
- PATTERN - 构建将要搜索的模式,语法类似于正则表达式
- DEFINE - 定义模式变量必须满足的条件,类似于WHERe子句。
- 如果没有为模式变量定义条件,则将使用默认条件,该条件对每一行的计算结果为true。
- 必须满足指定的条件,行才能被分类到相应的模式变量
注意: 目前,MATCH_RECOGNIZE 只能应用于 append table。此外它还总是生成一个 append table。
MATCH_RECOGNIZE SQL示例如下:
SELECT *
FROM Ticker
MATCH_RECOGNIZE(
PARTITION BY symbol
ORDER BY rowtime
MEASURES
C.price AS lastPrice
ONE ROW PER MATCH
AFTER MATCH SKIP PAST LAST ROW
PATTERN (A B* C)
DEFINE
A AS A.price > 10,
B AS B.price < 15,
C AS C.price > 12
)
下面针对SQL中的重点知识进行讲解
2.1. Aggregations聚合可以在DEFINE和MEASURES clause中使用。支持内置函数和自定义用户定义函数。
聚合函数应用于映射到匹配项的行的每个子集
只要这些事件的平均价格不超过15,该查询就将这些事件作为模式变量A的一部分进行累积
SELECt *
FROM Ticker
MATCH_RECOGNIZE (
PARTITION BY symbol
ORDER BY rowtime
MEASURES
FIRST(A.rowtime) AS start_tstamp,
LAST(A.rowtime) AS end_tstamp,
AVG(A.price) AS avgPrice
ONE ROW PER MATCH
AFTER MATCH SKIP PAST LAST ROW
PATTERN (A+ B)
DEFINE
A AS AVG(A.price) < 15
) MR;
注意:
- 聚合可以应用于表达式,但仅当它们引用单个模式变量时。因此SUM(A.price * A.tax)是有效的,但是AVG(a .tax)是有效的,但AVG(A.price * B.tax)是无效的
- DISTINCT aggregations 目前还不支持
每个模式都是由称为模式变量的基本构建块构成的,可以对其应用操作符(量词和其他修饰词)。整个模式必须用括号括起来。
示例如下:
PATTERN (A B+ C* D)
注意:
- 不支持可能产生空匹配的模式。这类模式的例子有PATTERN (A*)、PATTERN (A? B*), PATTERN (A{0,} B{0,} C*),等等。
- 不能对模式的最后一个变量使用贪婪量词,如 (A B*) 是不支持的,需要增加一个停止条件(until)或否定条件(not)
- 可选的reluctant量词(A??或A{0,1}?)目前不支持。
Concatenation - 模式(A B) 表示A、B之间是严格连续【Strict Contiguity】
Quantifiers - 修改可以映射到模式变量的行数
| 参数 | 说明 |
|---|---|
| * | 0次或多次 |
| + | 1次或多次 |
| ? | 0次或1次 |
| {n} | n次 |
| {n,} | 大于等于n次 |
| {n, m} | 大于等于n次,小于等于m次 |
| {,m} | 小于等于m次 |
每个量词可以是greedy(默认行为)或reluctant
- greedy量词试图匹配尽可能多的行,
- reluctant的量词试图匹配尽可能少的行。
特别是对于流式数据处理,通常要求模式在给定的时间内完成。这允许限制 Flink必须在内部维护的整体状态大小,即使是在贪婪的量词的情况下。
因此,Flink SQL支持附加的(非标准SQL) WITHIN clause,用于为模式定义时间约束。WITHIN clause可以在PATTERNclause之后定义,并以毫秒为间隔进行解析。
如果一个潜在匹配的第一个和最后一个事件之间的时间比给定值长,那么这样的匹配将不会被追加到结果表中。
注意:通常鼓励使用WITHIN子句,因为它帮助Flink实现高效的内存管理。一旦达到阈值,底层状态就可以被修剪。
使用WITH Clause的SQL示例如下:
该SQL检测到在1小时间隔内发生的价格下降10
SELECt *
FROM Ticker
MATCH_RECOGNIZE(
PARTITION BY symbol
ORDER BY rowtime
MEASURES
C.rowtime AS dropTime,
A.price - C.price AS dropDiff
ONE ROW PER MATCH
AFTER MATCH SKIP PAST LAST ROW
PATTERN (A B* C) WITHIN INTERVAL '1' HOUR
DEFINE
B AS B.price > A.price - 10
C AS C.price < A.price - 10
)
2.3. Output Mode
附录
FlinkCEP sql match_recognize
Flink CEP SQL详解
match_recognize



