栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink入门教程(四)——窗口(二)

Flink入门教程(四)——窗口(二)

欢迎关注公众号——《数据三分钟》

一线大厂的师兄师姐结合自己的工作实践,将数据知识浅显道来,每天三分钟,助你成为数据达人。还有面试指导和内推机会。

 

        上一节将到标准开窗函数在实时流计算中的实现,这一节我们来讲讲window aggregate。我把这两块都放在窗口来写,因为他们本质都是一种计算的触发机制,即在满足一定情况下去启动需要的计算。不同的是window agg被定义在group by语句中,它定义出一个key,其实就是一个窗口的范围了。window agg还可以形象地看成一个一个按照一定规则排列的桶(bucket),而每一个桶就是一个window,就是一个计算单元。

        window agg一般可以分为滑动窗口、滚动窗口、会话窗口。

1、滚动窗口(Tumble window)

        滚动窗口彼此之间没有重叠,且严丝合缝,窗口与窗口之间连续分布。假设划定一个2分钟的滚动窗口,那么窗口的分布大致如下:

        从上图可以看出每隔2分钟会触发一次滚动窗口。在window agg聚合中,我们通常要考虑数据的乱序和延迟问题,因此一般会搭配watermark(水印)去使用(后续章节会详细描述),假设我们需要计算每个商品每分钟的曝光次数,允许有3秒钟的数据延迟,代码如下:

CREATE TABLE tumble_window(
  item_id varchar,
  item_url varchar,
  ts timeStamp,
  WATERMARK wk FOR ts as withOffset(ts, 3000) 
) with (
  type='kafka',
  ...
);

CREATE TABLE tumble_result(
  window_start TIMESTAMP,
  window_end TIMESTAMP,
  item_id VARCHAR,
  exps BIGINT
) with (
  type='hbase'
);

INSERT INTO tumble_result
SELECt 
  start_time(ts, INTERVAL '1' MINUTE),
  end_time(ts, INTERVAL '1' MINUTE),
  item_id, 
  COUNT(item_url) as cnt
FROM tumble_window
GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE), item_id

测试数据:

item_id

item_url

ts

item_1

http://taobao.com/123456

2021-06-18 12:00:00

item_1

淘宝网 - 淘!我喜欢

2021-06-18 12:00:48

item_1

淘宝网 - 淘!我喜欢

2021-06-18 12:00:58

item_1

淘宝网 - 淘!我喜欢

2021-06-18 12:01:01

item_1

淘宝网 - 淘!我喜欢

2021-06-18 12:01:15

item_1

淘宝网 - 淘!我喜欢

2021-06-18 12:01:45

item_1

淘宝网 - 淘!我喜欢

2021-06-18 12:01:56

item_1

淘宝网 - 淘!我喜欢

2021-06-18 12:02:25

item_2

http://taobao.com/1234567

2021-06-18 12:02:26

输出结果:

start_time

end_time

item_id

cnt

2021-06-18 12:00:00

2021-06-18 12:01:00

item_1

3

2021-06-18 12:01:00

2021-06-18 12:02:00

item_1

4

2021-06-18 12:02:00

2021-06-18 12:03:00

item_1

1

item_2

1

2、滑动窗口(Hop window)

        滑动窗口和滚动窗口不同在于,滑动窗口要设置两个参数,一个是窗口长度,一个是滑动步长。当滑动步长小于窗口长度时,窗口之间存在重叠;当滑动步长等于窗口长度时,滑动窗口就退化成了滚动窗口;当滑动步长大于窗口长度时,窗口之间就会存在间隙。滑动窗口如下图所示,表示的是一个4min窗口长度,滑动步长为1min的滑动窗口:

        滑动窗口在实践中被经常使用到,譬如计算某网站最近一分钟被浏览次数最多的商品;统计最近10分钟最热门的航班,统计最近5分钟某传感器最大的测量值等等。假设我们还是需要计算每个商品最近1分钟的曝光次数,允许有3秒钟的数据延迟,每30s更新一次输出,代码如下:

CREATE TABLE tumble_window(
  item_id varchar,
  item_url varchar,
  ts timeStamp,
  WATERMARK wk FOR ts as withOffset(ts, 3000) 
) with (
  type='kafka',
  ...
);

CREATE TABLE tumble_result(
  window_start TIMESTAMP,
  window_end TIMESTAMP,
  item_id VARCHAR,
  exps BIGINT
) with (
  type='hbase'
);

INSERT INTO tumble_result
SELECt 
  start_time(ts, INTERVAL '1' MINUTE),
  end_time(ts, INTERVAL '1' MINUTE),
  item_id, 
  COUNT(item_url) as cnt
FROM tumble_window
GROUP BY HOP(ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE), item_id

测试数据如下:

item_id

item_url

ts

item_1

http://taobao.com/123456

2021-06-18 12:00:00

item_1

淘宝网 - 淘!我喜欢

2021-06-18 12:00:48

item_1

淘宝网 - 淘!我喜欢

2021-06-18 12:00:58

item_1

淘宝网 - 淘!我喜欢

2021-06-18 12:01:01

item_1

淘宝网 - 淘!我喜欢

2021-06-18 12:01:15

item_1

淘宝网 - 淘!我喜欢

2021-06-18 12:01:45

item_1

淘宝网 - 淘!我喜欢

2021-06-18 12:01:56

item_1

淘宝网 - 淘!我喜欢

2021-06-18 12:02:25

item_2

http://taobao.com/1234567

2021-06-18 12:02:26

输出结果:

start_time

end_time

item_id

cnt

2021-06-18 11:59:30

2021-06-18 12:00:30

item_1

1

2021-06-18 12:00:00

2021-06-18 12:01:00

item_1

2

2021-06-18 12:00:30

2021-06-18 12:01:30

item_1

4

2021-06-18 12:01:00

2021-06-18 12:02:00

item_1

4

2021-06-18 12:01:30

2021-06-18 12:02:30

item_1

3

item_2

1

2021-06-18 12:02:00

2021-06-18 12:03:00

item_1

1

item_2

1

3、会话窗口(Session window)

        会话窗口与前两个窗口相比,第一没有固定的窗口长度,第二没有窗口的重叠,它是根据数据元素的分布间隔进行窗口的切割的,一旦元素的间隔达到设定的长度就会触发窗口计算,下图表示为一个间隔时间2min的会话窗口,也就是说,两分钟没有新的数据元素就会触发窗口计算:

        会话窗口特别适合监测活跃交互次数,假设我们要计算某一个商品的活跃曝光次数,会话间隔是30s:

CREATE TABLE tumble_window(
  item_id varchar,
  item_url varchar,
  ts timeStamp,
  WATERMARK wk FOR ts as withOffset(ts, 3000) 
) with (
  type='kafka',
  ...
);

CREATE TABLE tumble_result(
  window_start TIMESTAMP,
  window_end TIMESTAMP,
  item_id VARCHAR,
  exps BIGINT
) with (
  type='hbase'
);

INSERT INTO tumble_result
SELECt 
  start_time(ts, INTERVAL '1' MINUTE),
  end_time(ts, INTERVAL '1' MINUTE),
  item_id, 
  COUNT(item_url) as cnt
FROM tumble_window
GROUP BY SESSION(ts, INTERVAL '30' SECOND), item_id

测试数据如下:

item_id

item_url

ts

item_1

http://taobao.com/123456

2021-06-18 12:00:00

item_1

淘宝网 - 淘!我喜欢

2021-06-18 12:00:48

item_1

淘宝网 - 淘!我喜欢

2021-06-18 12:00:58

item_1

淘宝网 - 淘!我喜欢

2021-06-18 12:01:01

item_1

淘宝网 - 淘!我喜欢

2021-06-18 12:01:15

item_1

淘宝网 - 淘!我喜欢

2021-06-18 12:01:45

item_1

淘宝网 - 淘!我喜欢

2021-06-18 12:01:56

item_1

淘宝网 - 淘!我喜欢

2021-06-18 12:02:25

item_2

http://taobao.com/1234567

2021-06-18 12:02:26

输出结果如下:

start_time

end_time

item_id

cnt

2021-06-18 12:00:00

2021-06-18 12:00:30

item_1

1

2021-06-18 12:00:48

2021-06-18 12:01:45

item_1

5

2021-06-18 12:01:45

2021-06-18 12:02:56

item_1

3

item_2

1

4、插一段历史

        吴王阖闾仔细阅读了孙武(《孙子兵法》作者)晋献的兵法十三篇,非常敬佩孙武,于是想试探一下他是否有真才实学,就把他诏进王宫,并给他180名宫女操练。孙武领命,将她们分为两队,指定两名吴王宠妃为队长,执黄旗前导。演练开始,队伍一片混乱。孙武严肃宣布:“没有讲清楚,是我为将的过错。”再次说明演练要求、列队动作以及军法纪律以后,进行演练,仍然混乱得很,如是者三。孙武为严肃军纪,要求处斩两名队长,吴王为两名妃子求情,孙子不许,坚持将两位妃子处斩,说到:“将在外,君命有所不受。”吴王无奈,只能痛失两位爱妃。这就是著名的“吴宫练兵”的故事。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/662983.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号