栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

FlinkSQL流式关联Hbase大表方案(走二级索引)

FlinkSQL流式关联Hbase大表方案(走二级索引)

  我们在做实时数据开发的时候,通常要用spark、flink去消费kafka的数据,拿到数据流后会和外部数据库(Hbase、MySQL等)进行维表关联来把数据流打宽。当然了,有些外部数据库不只是存储维度数据,也会有很多事实数据,并且这些数据更新频繁,数据量巨大,但是我们的Flink流也会去实时的join这些巨大的事实表,这就需要选择一个合适的外部数据库作为支持,这个外部数据库一定要满足海量数据高效的读写性能,这样才能满足实时场景的需求,说到这,我们的目光自然而然的落到了Hbase上,来吧,我们直接上图,下面这张图就是以上所说场景的一个基本架构

那么问题来了,FlinkSQL如何去关联Hbase大表呢,如果关联字段不是hbase维表的rowkey那么将会触发全表扫描,如果这个表很大,全表扫描效率就很不乐观了,耗时少则几秒,多则无限延长,所以我们一定是要走hbase二级索引的,但是很遗憾,FlinkSQL里的Hbase connector不会处理索引,它要么scan,要么就get,那么我们该怎么办呢,别急,我们也有笨办法,那就是我们自己维护索引表,如果你还不懂hbase二级索引的实现方式请自行补充这方面知识,下面的内容就是有关二级索引的试用了,看图吧

 来,我描述一下上图的流程,首先消费到kafka数据后我们的流不能直接去join hbase的数据表而是要先去join索引表,这样就拿到了数据表的rowkey,然后我们再join数据表,这样就不会触发全表扫描了,而是通过rowkey查询,效率就一下子有了质的提升,那么代码是怎么实现呢?上案例!稍等,我先说一下这个案例,我们需要在Flink流中创建三个表:kafka表、hbase维度索引表、hbase维度表,然后我们就可以愉快的写SQL了

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        //接入socket流,测试数据如下
        //{'name':'kafka_tb','type':'INSERT','new':{'id':'1','name':'lxz'}}
        DataStream dataStream = env.socketTextStream("localhost", 9999);
        //定义kafka_tb表类型(有序)
        TypeInformation[] kafka_tb_types = new TypeInformation[]{Types.STRING,Types.STRING};
        RowTypeInfo kafka_tb_rowType = new RowTypeInfo(kafka_tb_types);
        //socket接收到的流转换后注册成kafka_tb表
        DataStream ds = dataStream.flatMap(new FlatMapFunction() {
            @Override
            public void flatMap(String value, Collector out) throws Exception {
                String type = JSON.parseObject(value).getString("type");
                JSonObject new_row = JSON.parseObject(value).getJSonObject("new");
                switch (type) {
                    case "INSERT":
                        out.collect(Row.ofKind(RowKind.INSERT, new_row.getString("id"), new_row.getString("name")));break;
                }
            }
        }).returns(kafka_tb_rowType);
        //注册kafka表kafka_tb
        Schema schema01 = Schema.newBuilder().build();
        Table tab1 = tEnv.fromChangelogStream(ds,schema01).as("id","name");
        tEnv.createTemporaryView("kafka_tb", tab1);
        //注册Hbase索引表hbase_index_tb
        tEnv.executeSql("CREATE TABLE hbase_index_tb (n" +
                " ID STRING,n" +
                " CF ROW,n" +
                " PRIMARY KEY (ID) NOT ENFORCEDn" +
                ") WITH (n" +
                " 'connector' = 'hbase-2.2',n" +
                " 'table-name' = 'hbase_index_tb',n" +
                " 'zookeeper.quorum' = 'prod-bigdata-pc10:2181,prod-bigdata-pc14:2181,prod-bigdata-pc15:2181',n" +
                " 'zookeeper.znode.parent' = '/hbase-unsecure'n"+
                ")");
        //注册Hbase数据表hbase_data_tb
        tEnv.executeSql("CREATE TABLE hbase_data_tb (n" +
                " ID STRING,n" +
                " CF ROW,n" +
                " PRIMARY KEY (ID) NOT ENFORCEDn" +
                ") WITH (n" +
                " 'connector' = 'hbase-2.2',n" +
                " 'table-name' = 'hbase_data_tb',n" +
                " 'zookeeper.quorum' = 'prod-bigdata-pc10:2181,prod-bigdata-pc14:2181,prod-bigdata-pc15:2181',n" +
                " 'zookeeper.znode.parent' = '/hbase-unsecure'n"+
                ")");
        //执行关联查询
        tEnv.executeSql("select a.* " +
                "from hbase_data_tb a " +
                "join hbase_index_tb b " +
                "on a.ID = b.ID " +
                "join kafka_tb c " +
                "on  c.name=b.NAME").print();
    }

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/735227.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号