栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink SQL查询HBase维表

Flink SQL查询HBase维表

背景

主流取自Kafka, 通过Flink Hbase SQL Connector 从Hbase读取维度数据,进行数据补全。
本文代码基于Flink 1.12.0版本

引入依赖
        
            org.apache.flink
            flink-connector-hbase-2.2_${scala.binary.version}
            ${flink.version}
        
注册Kakfa table
CREATE TABLE fact(
                key STRING,
                value1 STRING,
                proctime AS PROCTIME()
                ) WITH (
                'connector' = 'kafka',
                'topic' = 'test-hb',
                'properties.bootstrap.servers' = 'localhost:9092',
                'properties.group.id' = 'testGroup',
                'scan.startup.mode' = 'latest-offset',
                'format' = 'json'
                );
注册Hbase table
CREATE TABLE hb_stream(
                rowkey String, 
                cf ROW,
                PRIMARY KEY (rowkey) NOT ENFORCED) with(
                'connector' = 'hbase-2.2',
                'table-name' = 'test',
                'zookeeper.quorum' = 'localhost:12181'
                )

如果Hbase表有多个column family,可以如下定义:

CREATE TABLE hTable (
 rowkey String,
 family1 ROW,
 family2 ROW,
 family3 ROW,
 PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
 'connector' = 'hbase-2.2',
 'table-name' = 'test',
 'zookeeper.quorum' = 'localhost:2181'
);
查询维度数据
select * from fact 
left join hb_stream 
FOR SYSTEM_TIME AS OF fact.proctime 
on fact.key = hb_stream.rowkey 
and fact.value1 = hb_stream.cf.name
完整代码
public class HbaseDimDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env,environmentSettings);

        String kafkaDdlString = "CREATE TABLE fact(n" +
                "    key STRING,n" +
                "    value1 STRING,n" +
                "    proctime AS PROCTIME()n" +
                ") WITH (n" +
                "    'connector' = 'kafka',n" +
                "    'topic' = 'test-hb',n" +
                "    'properties.bootstrap.servers' = 'localhost:9092',n" +
                "    'properties.group.id' = 'testGroup',n" +
                "    'scan.startup.mode' = 'latest-offset',n" +
                "    'format' = 'json'n" +
                ")";

        String ddlString = "CREATE TABLE hb_stream(n" +
                "    rowkey String, n" +
                "    cf ROW,n" +
                "PRIMARY KEY (rowkey) NOT ENFORCED) with(n" +
                "    'connector' = 'hbase-2.2',n" +
                "    'table-name' = 'test',n" +
                "    'zookeeper.quorum' = 'localhost:12181'n" +
                ")";

        tenv.executeSql(kafkaDdlString);
        tenv.executeSql(ddlString);

        //连接条件中必须包含rowkey
        Table table = tenv.sqlQuery("select * from fact " +
                "left join hb_stream FOR SYSTEM_TIME AS OF fact.proctime on fact.key = hb_stream.rowkey and fact.value1 = hb_stream.cf.name");
        tenv.toRetractStream(table, Row.class).print();
        table.printSchema();

        env.execute();


    }
}
适用场景
    主流数据量大,维度表要能提供高并发的的查询服务维度数据量级大, 适用普通关系数据库承载导致查询性能低
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/720918.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号