栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Kafka 流式计算工具 ksqlDB 笔记:Pull Query 的用途及特性

Kafka 流式计算工具 ksqlDB 笔记:Pull Query 的用途及特性

ksqlDB 是学习和开发 kafka 流式计算的很方便的工具。它支持 Push Query 和 Pull Query。下面是一些 Pull Query 的测试。

测试对象

我建立了下面的 stream 作为测试对象:

CREATE OR REPLACe STREAM tagvalue (tagId INT, value DOUBLE)
  WITH (kafka_topic='tagvalue', value_format='json', partitions=1);
插入数据
INSERT INTO tagvalue (tagId, value) VALUES (1, 11000);
INSERT INTO tagvalue (tagId, value) VALUES (2, 10000);
执行 Pull Query 直接对 stream 执行 pull query
SELECT *
FROM tagvalue;

系统如下。系统要求必须有 where 语句。

增加 where 语句:

SELECt *
FROM tagvalue
WHERe tagId = 1;

系统提示如下。系统说我们的 stream 没有主键。

由于不可能修改 stream 的 schema,我们使用系统推荐的方法,改变如下配置为 true,再次执行查询得到如下提示:


系统的提示是 不能对 stream 执行 pull query.

对 table 执行 pull query

创建基于 tagvalue topic 的 table:

CREATE OR REPLACe TABLE tagvalueview (tagId INT PRIMARY KEY, value DOUBLE)
  WITH (kafka_topic='tagvalue', value_format='json', partitions=1);

执行 pull query:

select *
from tagvalueview;

得到如下结果。系统不能直接查询基于 topic 创建的 table

按照系统提示创建能查询的 table:

CREATE TABLE QUERYABLE_TAGVALUEVIEW AS SELECt * FROM TAGVALUEVIEW

这时候系统增加了一个新的topic:

select *
from QUERYABLE_TAGVALUEVIEW ;

以下是执行结果。我们可以看到,系统什么都没返回:


我们再创建一个实时统计 tag 数值数量的 table:

CREATE OR REPLACe TABLE tagvalueview AS
  SELECT tagId, count(*)
  FROM tagvalue
  GROUP BY tagId
  EMIT CHANGES;

执行以下查询:

select *
from tagvalueview;

得到查询结果:

结论

Pull Query 只能在 table 上执行,而且是 queryable table. Pull Query 结合 table 可以帮助开发者统计已有数据的结果。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/329379.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号