栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink Table 读写Hbase

Flink Table 读写Hbase

Flink Table 读写Hbase
  • 1、获取表环境
  • 2、创建Hbase表连接
  • 3、Hbase读取计算和写入
    • 3.1利用Flink SQL读取Hbase明细表和计算
    • 3.2写入Hbase聚合表
  • 4、pom
  • 5、demo代码

1、获取表环境

批处理为例

        // 批执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();

        // 表环境
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inBatchMode()
                .build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);
2、创建Hbase表连接
        // 明细表
        TableResult tableResult = tableEnv.executeSql(
                "CREATE TABLE DWD_YDJT_SCALEANDINC_DATE (" +
                        " rowkey STRING," +
                        " info ROW," +
                        " PRIMARY KEY (rowkey) NOT ENFORCED" +
                        " ) WITH (" +
                        " 'connector' = 'hbase-2.2' ," +
                        " 'table-name' = 'DWD_YDJT_SCALEANDINC_DATE' ," +
                        " 'zookeeper.quorum' = 'linux121:2181,linux122:2181,linux123:2181'" +
                        " )");
        //聚合表
        TableResult dwsTableResult = tableEnv.executeSql(
                "CREATE TABLE DWS_YDJT_SCALEANDINC_DATE (" +
                        " rowkey STRING," +
                        " info ROW< channelSource STRING, businessDate STRING, businessIncome DOUBLE, businessScale DOUBLE>," +
                        " PRIMARY KEY (rowkey) NOT ENFORCED" +
                        " ) WITH (" +
                        " 'connector' = 'hbase-2.2' ," +
                        " 'table-name' = 'DWS_YDJT_SCALEANDINC_DATE' ," +
                        " 'zookeeper.quorum' = 'linux121:2181'" +
                        " )");

注意:connector’ = ‘hbase-2.2’ hbase版本要求不是很严格,以官网为例
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/table/overview/

3、Hbase读取计算和写入 3.1利用Flink SQL读取Hbase明细表和计算

读取数据并创建临时试图dwsTable

        Table table = tableEnv.sqlQuery("select TO_base64(channelSource||businessDate) ,channelSource,businessDate,sum(businessIncome) as businessIncome, sum(businessScale) as businessScale FROM DWD_YDJT_SCALEANDINC_DATE group by channelSource,businessDate");
        tableEnv.createTemporaryView("dwsTable", table);
3.2写入Hbase聚合表
        //        INSERT INTO hTable
//        SELECt rowkey, ROW(f1q1), ROW(f2q2, f2q3), ROW(f3q4, f3q5, f3q6) FROM T;
        TableResult executeResult = tableEnv.executeSql("insert into DWS_YDJT_SCALEANDINC_DATE " +
        "select TO_base64(channelSource||businessDate) ,ROW(channelSource,businessDate,businessIncome, businessScale) FROM dwsTable ");

参考官网:https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/table/hbase/#%E5%A6%82%E4%BD%95%E4%BD%BF%E7%94%A8-hbase-%E8%A1%A8

4、pom


    4.0.0

    org.example
    flinksqlConnHbase
    1.0-SNAPSHOT

    
        8
        8
        1.12.1
        2.12
        3.1.2
        8.0.19
        2.4.0
    


    
        
        
            org.apache.flink
            flink-hbase_2.12
            1.10.1
        

        
            org.apache.flink
            flink-scala_2.12
            1.12.1
        
        
        
            org.apache.flink
            flink-streaming-scala_2.12
            1.11.1
        
        
        
            org.apache.flink
            flink-clients_2.12
            1.11.1
        
        
        
            org.apache.flink
            flink-connector-kafka_2.12
            1.11.1
        
        
        
            com.alibaba
            fastjson
            1.2.73
        
        
        
            org.apache.flink
            flink-hbase_2.12
            1.10.1
        
        
        
            org.apache.hadoop
            hadoop-common
            2.9.2
        
        
        
            org.apache.hadoop
            hadoop-client
            2.9.2
        
        
        
            org.apache.hadoop
            hadoop-hdfs
            2.9.2
        
        
        
            org.apache.hadoop
            hadoop-mapreduce-client-core
            2.9.2
        

        
        
            org.apache.flink
            flink-java
            ${flink.version}
        
        
            org.apache.flink
            flink-streaming-java_${scala.binary.version}
            ${flink.version}
        
        
            org.apache.flink
            flink-clients_${scala.binary.version}
            ${flink.version}
        

        
        
            org.apache.flink
            flink-connector-hbase-2.2_${scala.binary.version}
            ${flink.version}
        
        
            org.apache.hbase
            hbase-server
            ${hbase.version}
        
        
            org.apache.hbase
            hbase-client
            ${hbase.version}
        

        
        
            org.apache.flink
            flink-table-planner-blink_${scala.binary.version}
            ${flink.version}
        
        
            org.apache.flink
            flink-table-planner_${scala.binary.version}
            ${flink.version}
        

        
        
            org.apache.flink
            flink-csv
            ${flink.version}
        

        
        
        
        
        
        

    


5、demo代码
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.descriptors.ConnectTableDescriptor;
import org.apache.flink.table.descriptors.Hbase;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

public class HbaseTest {
    public static void main(String[] args) throws Exception {
        // 批执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();

        // 表环境
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inBatchMode()
                .build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        // 明细表
        TableResult tableResult = tableEnv.executeSql(
                "CREATE TABLE DWD_YDJT_SCALEANDINC_DATE (" +
                        " rowkey STRING," +
                        " info ROW," +
                        " PRIMARY KEY (rowkey) NOT ENFORCED" +
                        " ) WITH (" +
                        " 'connector' = 'hbase-2.2' ," +
                        " 'table-name' = 'DWD_YDJT_SCALEANDINC_DATE' ," +
                        " 'zookeeper.quorum' = 'linux121:2181,linux122:2181,linux123:2181'" +
                        " )");
        //聚合表
        TableResult dwsTableResult = tableEnv.executeSql(
                "CREATE TABLE DWS_YDJT_SCALEANDINC_DATE (" +
                        " rowkey STRING," +
                        " info ROW< channelSource STRING, businessDate STRING, businessIncome DOUBLE, businessScale DOUBLE>," +
                        " PRIMARY KEY (rowkey) NOT ENFORCED" +
                        " ) WITH (" +
                        " 'connector' = 'hbase-2.2' ," +
                        " 'table-name' = 'DWS_YDJT_SCALEANDINC_DATE' ," +
                        " 'zookeeper.quorum' = 'linux121:2181'" +
                        " )");

        // 查询是否能获取到Hbase里的数据
//        Table table = tableEnv.sqlQuery("SELECt rowkey, info FROM DWD_YDJT_SCALEANDINC_DATE");

        Table table = tableEnv.sqlQuery("select TO_base64(channelSource||businessDate) ,channelSource,businessDate,sum(businessIncome) as businessIncome, sum(businessScale) as businessScale FROM DWD_YDJT_SCALEANDINC_DATE group by channelSource,businessDate");
        tableEnv.createTemporaryView("dwsTable", table);

        //        INSERT INTO hTable
//        SELECt rowkey, ROW(f1q1), ROW(f2q2, f2q3), ROW(f3q4, f3q5, f3q6) FROM T;
        TableResult executeResult = tableEnv.executeSql("insert into DWS_YDJT_SCALEANDINC_DATE " +
        "select TO_base64(channelSource||businessDate) ,ROW(channelSource,businessDate,businessIncome, businessScale) FROM dwsTable ");

        // 查询的结果
//        TableResult executeResult = table.execute();


        // 获取查询结果
        CloseableIterator collect = executeResult.collect();

        // 输出 (执行print或者下面的 Consumer之后,数据就被消费了。两个只能留下一个)
        executeResult.print();

        List dwdList = new ArrayList<>();

        //结构遍历解析
        collect.forEachRemaining(new Consumer() {
            @Override
            public void accept(Row row) {
                System.out.println(".......Consumer.........");
                String field0 = String.valueOf(row.getField(0));
                String[] user_movie = String.valueOf(row.getField(1)).split(",");
//                Double ratting = Double.valueOf(String.valueOf(row.getField(1)));
                dwdList.add(new DwdYdjtScaleAndIncDate(user_movie[0],user_movie[1],user_movie[1],user_movie[1],user_movie[1]));
            }
        });


        System.out.println("................");

        for(DwdYdjtScaleAndIncDate um : dwdList){
            System.out.println(um);
        }

        env.execute("OutputTableDemo");
    }
}


//public class HbaseTest {
//    public static void main(String[] args) throws Exception {
//        // 批执行环境
//        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//
//        // 表环境
//        EnvironmentSettings settings = EnvironmentSettings.newInstance()
//                .useBlinkPlanner()
//                .inBatchMode()
//                .build();
//        TableEnvironment tableEnv = TableEnvironment.create(settings);
//
//        // 创建用户-电影表 u_m
//        TableResult tableResult = tableEnv.executeSql(
//                "CREATE TABLE DWD_YDJT_SCALEANDINC_DATE (" +
//                        " rowkey STRING," +
//                        " info ROW," +
//                        " PRIMARY KEY (rowkey) NOT ENFORCED" +
//                        " ) WITH (" +
//                        " 'connector' = 'hbase-2.2' ," +
//                        " 'table-name' = 'DWD_YDJT_SCALEANDINC_DATE' ," +
//                        " 'zookeeper.quorum' = 'linux121:2181,linux122:2181,linux123:2181'" +
//                        " )");
//
//        // 查询是否能获取到Hbase里的数据
//        Table table = tableEnv.sqlQuery("SELECt rowkey, info FROM DWD_YDJT_SCALEANDINC_DATE");
//
//        // 相当于 scan
        Table table = tableEnv.sqlQuery("SELECt * FROM DWD_YDJT_SCALEANDINC_DATE");
//
//        // 查询的结果
//        TableResult executeResult = table.execute();
//
//        // 获取查询结果
//        CloseableIterator collect = executeResult.collect();
//
//        // 输出 (执行print或者下面的 Consumer之后,数据就被消费了。两个只能留下一个)
//        executeResult.print();
//
//        List dwdList = new ArrayList<>();
//
//        collect.forEachRemaining(new Consumer() {
//            @Override
//            public void accept(Row row) {
//                String field0 = String.valueOf(row.getField(0));
//                String[] user_movie = field0.split(",");
//                Double ratting = Double.valueOf(String.valueOf(row.getField(1)));
//                dwdList.add(new DwdYdjtScaleAndIncDate(user_movie[0],user_movie[1],user_movie[1],user_movie[1],user_movie[1]));
//            }
//        });
//
//
//        System.out.println("................");
//
//        for(DwdYdjtScaleAndIncDate um : dwdList){
//            System.out.println(um);
//        }
//
//
//    }
//}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/307877.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号