主流取自Kafka, 通过Flink Hbase SQL Connector 从Hbase读取维度数据,进行数据补全。
本文代码基于Flink 1.12.0版本
注册Kakfa tableorg.apache.flink flink-connector-hbase-2.2_${scala.binary.version} ${flink.version}
CREATE TABLE fact(
key STRING,
value1 STRING,
proctime AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'test-hb',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
注册Hbase table
CREATE TABLE hb_stream(
rowkey String,
cf ROW,
PRIMARY KEY (rowkey) NOT ENFORCED) with(
'connector' = 'hbase-2.2',
'table-name' = 'test',
'zookeeper.quorum' = 'localhost:12181'
)
如果Hbase表有多个column family,可以如下定义:
CREATE TABLE hTable ( rowkey String, family1 ROW查询维度数据, family2 ROW , family3 ROW , PRIMARY KEY (rowkey) NOT ENFORCED ) WITH ( 'connector' = 'hbase-2.2', 'table-name' = 'test', 'zookeeper.quorum' = 'localhost:2181' );
select * from fact left join hb_stream FOR SYSTEM_TIME AS OF fact.proctime on fact.key = hb_stream.rowkey and fact.value1 = hb_stream.cf.name完整代码
public class HbaseDimDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env,environmentSettings);
String kafkaDdlString = "CREATE TABLE fact(n" +
" key STRING,n" +
" value1 STRING,n" +
" proctime AS PROCTIME()n" +
") WITH (n" +
" 'connector' = 'kafka',n" +
" 'topic' = 'test-hb',n" +
" 'properties.bootstrap.servers' = 'localhost:9092',n" +
" 'properties.group.id' = 'testGroup',n" +
" 'scan.startup.mode' = 'latest-offset',n" +
" 'format' = 'json'n" +
")";
String ddlString = "CREATE TABLE hb_stream(n" +
" rowkey String, n" +
" cf ROW,n" +
"PRIMARY KEY (rowkey) NOT ENFORCED) with(n" +
" 'connector' = 'hbase-2.2',n" +
" 'table-name' = 'test',n" +
" 'zookeeper.quorum' = 'localhost:12181'n" +
")";
tenv.executeSql(kafkaDdlString);
tenv.executeSql(ddlString);
//连接条件中必须包含rowkey
Table table = tenv.sqlQuery("select * from fact " +
"left join hb_stream FOR SYSTEM_TIME AS OF fact.proctime on fact.key = hb_stream.rowkey and fact.value1 = hb_stream.cf.name");
tenv.toRetractStream(table, Row.class).print();
table.printSchema();
env.execute();
}
}
适用场景
- 主流数据量大,维度表要能提供高并发的的查询服务维度数据量级大, 适用普通关系数据库承载导致查询性能低



