栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink sql 时态和和静态表的理解

flink sql 时态和和静态表的理解

静态表

flink sql 定义的维度表,一般程序启动的时候将维度即信息一次性缓存到内存中,下次流数据与之关联的时候,实际是与内存中数据进行关联查询
样例SQL

create table source_kafka (
id bigint,
name string,
proctime as proctime()
) WITH (
  'connector' = 'kafka',
  'topic' = 'test',
  'properties.bootstrap.servers' = 'xxx',
  'properties.group.id' = 'test',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);


CREATE TABLE source_hbase (
  id string,
  cf ROW ,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'zyd_test',
'zookeeper.quorum' = 'xxx'
);

create table sink_print(
id bigint,
name string
)
 WITH (
'connector' = 'print'
);

insert into sink_print
select 
t1.id,t2.name
from
source_kafka t1
left join 
source_hbase t2
on t1.id=cast(t2.id as bigint);

kafka 测试用例,第二条数据是修改hbase后发送的

hbase操作

flink 输出日志

hbase中再增加一条数据

kafka中也发送了

flink 结果

动态表

针对以上问题,维度发生更新了怎么办,需要引入动态表,锁定事件的时间

create table source_kafka (
id bigint,
name string,
proctime as proctime()
) WITH (
  'connector' = 'kafka',
  'topic' = 'test',
  'properties.bootstrap.servers' = 'xxx',
  'properties.group.id' = 'test',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);


CREATE TABLE source_hbase (
  id string, --客户id
  cf ROW , --预约id
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'zyd_test',
'zookeeper.quorum' = 'xxx'
);

create table sink_print(
id bigint,
name string
)
 WITH (
'connector' = 'print'
);

insert into sink_print
select 
t1.id,t2.name
from
source_kafka t1
left join 
source_hbase for system_time as of proctime as t2
on cast( t1.id as string)=t2.id ;




换种写法报错

insert into sink_print
select 
t1.id,t2.name
from
source_kafka t1
left join 
source_hbase for system_time as of proctime as t2
on t1.id =cast(t2.id as bigint);

Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Temporal table join requires equivalent condition of the same type, but the condition is id[BIGINT]=id[STRING NOT NULL]
可以发现 flink sql 关联动态表的时候,其实先将流表数据与动态表关联,再去flink sql 中逻辑处理,所以报错类型不匹配

这样流的数据量过大,对于维表不是有很大的io消耗么,所以flink sql 使用了lookupcache的概念,那就是加缓存

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/773894.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号