ksqlDB 是学习和开发 kafka 流式计算的很方便的工具。它支持 Push Query 和 Pull Query。下面是一些 Pull Query 的测试。
测试对象我建立了下面的 stream 作为测试对象:
CREATE OR REPLACe STREAM tagvalue (tagId INT, value DOUBLE) WITH (kafka_topic='tagvalue', value_format='json', partitions=1);插入数据
INSERT INTO tagvalue (tagId, value) VALUES (1, 11000); INSERT INTO tagvalue (tagId, value) VALUES (2, 10000);执行 Pull Query 直接对 stream 执行 pull query
SELECT * FROM tagvalue;
系统如下。系统要求必须有 where 语句。
增加 where 语句:
SELECt * FROM tagvalue WHERe tagId = 1;
系统提示如下。系统说我们的 stream 没有主键。
由于不可能修改 stream 的 schema,我们使用系统推荐的方法,改变如下配置为 true,再次执行查询得到如下提示:
系统的提示是 不能对 stream 执行 pull query.
创建基于 tagvalue topic 的 table:
CREATE OR REPLACe TABLE tagvalueview (tagId INT PRIMARY KEY, value DOUBLE) WITH (kafka_topic='tagvalue', value_format='json', partitions=1);
执行 pull query:
select * from tagvalueview;
得到如下结果。系统不能直接查询基于 topic 创建的 table
按照系统提示创建能查询的 table:
CREATE TABLE QUERYABLE_TAGVALUEVIEW AS SELECt * FROM TAGVALUEVIEW
这时候系统增加了一个新的topic:
select * from QUERYABLE_TAGVALUEVIEW ;
以下是执行结果。我们可以看到,系统什么都没返回:
我们再创建一个实时统计 tag 数值数量的 table:
CREATE OR REPLACe TABLE tagvalueview AS SELECT tagId, count(*) FROM tagvalue GROUP BY tagId EMIT CHANGES;
执行以下查询:
select * from tagvalueview;
得到查询结果:
Pull Query 只能在 table 上执行,而且是 queryable table. Pull Query 结合 table 可以帮助开发者统计已有数据的结果。



