栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink查询关联Hbase输出

Flink查询关联Hbase输出

1、前言

大家在开发Flink的时候,很多时候会把某些固定的维度信息存储在Hbase或者Redis等这些第三方库里,已方便来跟流发生关联查询输出。本文将从如何本地搭建一个Hbase环境开始讲起,到如何构建一个Hbase公共调用类,到如何构建一个异步调用Hbase的公共代码,再到最后实际调用代码后的输出。

2、本地利用Docker搭建Hbase环境

 本地如何搭建Docker环境,之前一篇博客中已经详细描述过,大家如果想学习如何搭建,可以去看下,地址如下:Docker入门-Windows 10&Mac系统安装_一个数据小开发的博客-CSDN博客一、何为DockerDocker 是一个开源的应用容器引擎,基于Go语言并遵从 Apache2.0 协议开源。Docker 可以让开发者打包他们的应用以及依赖包到一个轻量级、可移植的容器中,然后发布到任何流行的 Linux 机器上,也可以实现虚拟化。容器是完全使用沙箱机制,相互之间不会有任何接口(类似 iPhone 的 app),更重要的是容器性能开销极低。runc 是一个 Linux 命令行工具,用于根据 OCI容器运行时规范 创建和运行容器。containerd 是一个守护程序https://blog.csdn.net/Aaron_ch/article/details/115559512?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522164568960216780271525279%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fblog.%2522%257D&request_id=164568960216780271525279&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~blog~first_rank_ecpm_v1~rank_v31_ecpm-1-115559512.nonecase&utm_term=docker&spm=1018.2226.3001.4450

2.1、下载Hbase镜像
docker pull harisekhon/hbase
2.2、启动Hbase镜像:
docker run -d -h myhbase -p 2181:2181 -p 8080:8080 -p 8085:8085 -p 9090:9090 -p 9095:9095 -p 16000:16000 -p 16010:16010 -p 16020:16020 -p 16201:16201 -p 16301:16301 --name hbase harisekhon/hbase

Hbase 端口映射图:

2.3、本地添加host

打开/etc/hosts ,如下图所示,添加一行

127.0.0.1 myhbase

此时在浏览器中输入hbase访问的地址,可以看到hbase的页面

http://localhost:16010/master-status

 3、访问Hbase 3.1、图形化工具访问Hbase

如下的图形化工具,Mac和Windows都可以访问。

 如果需要图形化工具的,评论区留下邮箱,本人看到了会第一时间发送

3.2、Java工具类访问

直接上代码,可以直接使用

package com.horse.utils.hbase;

import com.alibaba.fastjson.JSONObject;
import com.google.common.base.CaseFormat;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.flink.util.StringUtils;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.List;


@Slf4j
public class HbaseUtils {

    public static Connection getHbaseConnection() {

        try {
            // 1.获取配置文件信息
            Configuration configuration = HbaseConfiguration.create();
            configuration.set("hbase.zookeeper.quorum", "localhost");
//            configuration.set("hbase.zookeeper.property.clientPort", "2181");//可写可不写,默认为2181

            // 2.创建连接对象
            return ConnectionFactory.createConnection(configuration);

        } catch (IOException e) {
            log.error("获取Hbase相关基础信息失");
            e.printStackTrace();
        }
        return null;
    }

//    static {
//        try {
//            // 1.获取配置文件信息
//            Configuration configuration = HbaseConfiguration.create();
//            configuration.set("hbase.zookeeper.quorum", "localhost");
//            configuration.set("hbase.zookeeper.property.clientPort", "2181");//可写可不写,默认为2181
//
//            // 2.创建连接对象
//            connection = ConnectionFactory.createConnection(configuration);
//
//            // 3.创建Admin对象
//            admin = connection.getAdmin();
//
//        } catch (IOException e) {
//            log.error("获取Hbase相关基础信息失");
//            e.printStackTrace();
//        }
//    }

    
    public static boolean isTableExist(String tableName, Admin admin) throws IOException {
        // 1.判断表是否存在
        boolean exists = admin.tableExists(TableName.valueOf(tableName));

        // 2.返回结果
        return exists;
    }


    
    public static boolean createTable(Admin admin, String tableName, boolean createNameSpaceIfExists, String... cfs) throws IOException {

        //1、判断传入的表是否正确
        if (tableName.split(":").length != 2) {
            log.error("{},传入的表名有问题,需要传入 nameSpace:tableName 这种格式", tableName);
            return false;
        }

        String nameSpace = tableName.split(":")[0];

        //2、判断命名空间相关信息
        if (!isNameSpaceExist(admin, nameSpace)) {
            if (createNameSpaceIfExists) {
                log.info("{},该命名空间不存在,开始创建该命名空间", nameSpace);
                createNameSpace(admin, nameSpace);
            } else {
                log.error("{},该命名空间不存在", nameSpace);
                return false;
            }
        }

        // 3、判断表是否存在
        if (isTableExist(tableName, admin)) {
            log.error("{},表已存在", tableName);
            return false;
        }

        // 4、判断是否存在列族信息
        if (cfs.length <= 0) {
            log.info("请设置列族信息!");
            return false;
        }

        // 5、创建表描述器
        try {
            if (admin.tableExists(TableName.valueOf(tableName))) {
                return false;
            }
            //定义表描述对象
            TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
            //遍历列族数组
            for (String cf : cfs) {
                //定义列族描述对象
                ColumnFamilyDescriptor columnFamily = ColumnFamilyDescriptorBuilder.of(cf);
                //给表添加列族信息
                tableDescriptorBuilder.setColumnFamily(columnFamily);
            }
            //创建表
            admin.createTable(tableDescriptorBuilder.build());
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }

        log.info("{},表新建成功", tableName);

        return true;

    }

    
    public static void dropTable(Admin admin, String tableName) throws IOException {

        // 1.判断表是否存在
        if (!isTableExist(tableName, admin)) {
            log.error("{},表不存在", tableName);
            return;
        }

        // 2.使表下线
        admin.disableTable(TableName.valueOf(tableName));

        // 3.删除表
        admin.deleteTable(TableName.valueOf(tableName));
    }

    
    public static boolean isNameSpaceExist(Admin admin, String nameSpace) throws IOException {
        NamespaceDescriptor[] namespaceDescriptors = admin.listNamespaceDescriptors();
        for (NamespaceDescriptor namespaceDescriptor : namespaceDescriptors) {
            if (namespaceDescriptor.getName().equals(nameSpace)) {
                return true;
            }
        }
        return false;
    }

    
    public static void createNameSpace(Admin admin, String namespace) {

        // 1.创建命名空间描述器
        NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(namespace).build();

        // 2.创建命名空间
        try {
            admin.createNamespace(namespaceDescriptor);
        } catch (NamespaceExistException e) {
            log.error("{},命名空间已存在", namespace);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    
    public static void putData(Connection connection, String tableName, String rowKey, String columnFamily, String columnName, String value) throws IOException {
        // 1.获取表对象
        Table table = connection.getTable(TableName.valueOf(tableName));

        // 2.创建put对象
        Put put = new Put(Bytes.toBytes(rowKey));

        // 3.给Put对象赋值
        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName), Bytes.toBytes(value));

        // 4.插入数据
        table.put(put);

        // 5.关闭表连接
        table.close();
    }


    
    public static  T getData(Connection connection, String tableName, String rowKey, String columnFamily, String columnName, boolean underScoreToCamel, Class clz) throws Exception {

        //1、判断传入的参数是否为空或空字符串
        if (StringUtils.isNullOrWhitespaceonly(tableName)) {
            log.error("传入的tableName为空:{}", tableName);
            return null;
        }

        if (StringUtils.isNullOrWhitespaceonly(rowKey)) {
            log.error("传入的rowKey为空:{}", rowKey);
            return null;
        }

        if (StringUtils.isNullOrWhitespaceonly(columnFamily)) {
            log.error("传入的columnFamily为空:{}", columnFamily);
            return null;
        }

        Get get = new Get(Bytes.toBytes(rowKey));

        if (StringUtils.isNullOrWhitespaceonly(columnName)) {
            get.addFamily(Bytes.toBytes(columnFamily));
//            log.info("传入的columnName为空:{},将根据传入的其他信息查询全量的列信息", columnName);
        } else {
            //传入的不为空的话,就查询传入的值
            get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName));
//            log.info("传入的columnName为:{},将根据传入的值查询信息", columnName);
        }

        //4、获取表对象
        Table table = connection.getTable(TableName.valueOf(tableName));

        //5、设置获取数据的版本数
        //get.setMaxVersions(5);

        //5、获取数据
        Result resultData = table.get(get);

        //6、关闭表连接
        table.close();

        //创建 泛型对象
        T t = clz.newInstance();

        //给泛型对象赋值

        List cells = resultData.listCells();

        for (Cell cell : cells) {

            String resultColumnName = Bytes.toString(CellUtil.cloneQualifier(cell));
            String resultValue = Bytes.toString(CellUtil.clonevalue(cell));

            //判断是否需要转换为驼峰命名
            if (underScoreToCamel) {

                if (resultColumnName.contains("_")) {
                    resultColumnName = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, resultColumnName.toLowerCase());
                } else if (resultColumnName.contains("-")) {
                    resultColumnName = CaseFormat.LOWER_HYPHEN.to(CaseFormat.LOWER_CAMEL, resultColumnName.toLowerCase());
                } else {
                    resultColumnName = CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_CAMEL, resultColumnName);
                }

            }

            BeanUtils.setProperty(t, resultColumnName, resultValue);
        }

        return t;

    }


    
    private static void scanTable(Connection connection, String tableName, String leftScanRowKey, String rightScanRowKey) throws IOException {


        // 1.获取表对象
        Table table = connection.getTable(TableName.valueOf(tableName));

        // 2.构建Scan对象 // 左闭右开
        Scan scan = new Scan(Bytes.toBytes(leftScanRowKey), Bytes.toBytes(rightScanRowKey));

        // 3.扫描表
        ResultScanner resultScanner = table.getScanner(scan);

        // 4.解析resultScanner
        for (Result result : resultScanner) {

            // 5.解析result并打印
            for (Cell cell : result.rawCells()) {

                // 6.打印数据
                System.out.println("RowKey:" + Bytes.toString(CellUtil.cloneRow(cell)) +
                        ",CF:" + Bytes.toString(CellUtil.cloneFamily(cell)) +
                        ",CN:" + Bytes.toString(CellUtil.cloneQualifier(cell)) +
                        ",Value:" + Bytes.toString(CellUtil.clonevalue(cell)));
            }
        }

        // 7.关闭表连接
        table.close();
    }


    
    public static void deleteData(Connection connection, String tableName, String rowKey, String cf, String cn) throws IOException {

        // 1.获取表对象
        Table table = connection.getTable(TableName.valueOf(tableName));

        // 2.创建删除对象
        Delete delete = new Delete(Bytes.toBytes(rowKey));


        //delete.addColumns(Bytes.toBytes(cf), Bytes.toBytes(cn));
        //delete.addColumns(Bytes.toBytes(cf), Bytes.toBytes(cn),1574158036021L);

        //delete.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn));
        //delete.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn),1574158036021L);

        // 2.2 删除指定的列族
        // addFamily:删除指定列族的所有列的所有版本数据。
        delete.addFamily(Bytes.toBytes(cf));

        // 3.执行删除操作
        table.delete(delete);

        // 4.关闭连接
        table.close();
    }

    
    public static void close(Admin admin, Connection connection) {

        if (admin != null) {
            try {
                admin.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        if (connection != null) {
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }



    public static JSonObject getJSonObject(String rowKey, List cells) {

        
        JSonObject jsonObject = new JSonObject();
        JSonObject valueJsonObject = new JSonObject();

        for (Cell cell : cells) {
            valueJsonObject.put(Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.clonevalue(cell)));
        }

        jsonObject.put("rowKey", rowKey);
        jsonObject.put("value", valueJsonObject);
        return jsonObject;
    }

}

比较核心的一个几个方法做出详细说明下:

3.2.1、新建表函数
public static boolean createTable(Admin admin, String tableName, boolean createNameSpaceIfExists, String... cfs)

会根据传入的相关信息进行表的创建,例如:

表名是"aaron1:test1",当"aaron1"命名空间不存在的时候,会根据createNameSpaceIfExists参数来选择是否在不存在的情况下创建,如果需要创建,就会先创建该命名空间后,再去新建表。

public static void main(String[] args) throws Exception{
        createTable(getHbaseConnection().getAdmin(),"aaron1:test1",false,"name","age","id");
    }

当传入的createNameSpaceIfExists是true的时候

public static void main(String[] args) throws Exception{
        createTable(getHbaseConnection().getAdmin(),"aaron1:test1",true,"name","age","id");
    }

在web页面可以查看到已经创建成功

  
3.2.2、插入数据

 根据传入的参数来插入具体的数据。

public static void putData(Connection connection, String tableName, String rowKey, String columnFamily, String columnName, String value)
public static void main(String[] args) throws Exception{
        putData(getHbaseConnection(),"aaron1:test1","aaaa","name","schoolName","Nanj");
    }

通过图形化工具可以看到数据已经插入成功

3.2.3、查询数据

本函数可以通过特定的rowKey查询出结果,并把结果转成JavaBean输出

public static  T getData(Connection connection, String tableName, String rowKey, String columnFamily, String columnName, boolean underScoreToCamel, Class clz)
public static void main(String[] args) throws Exception {
        PassengerInfor passengerInfor = getData(getHbaseConnection(), "aaron:test1", "11111", "first", "", true, PassengerInfor.class);
        log.info("查询出来的结果为:{},", passengerInfor);
    }

4、Flink异步IO访问Hbase

因为异步IO都是通过函数继承RichAsyncFunction这个抽象类,所以为了更大化的使用,先编写一个工具类。

4.1、异步IO访问Hbase工具类
 

源码如下,其中使用的时候,使用到了如上的一些Hbase工具类:

package com.horse.utils.hbase;

import com.horse.utils.ThreadPoolUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.hadoop.hbase.TableName;

import org.apache.hadoop.hbase.client.*;

import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.ThreadPoolExecutor;


@Slf4j
public abstract class HbaseAsyncFunctionUtil extends RichAsyncFunction {

    private Connection connection;
    private ThreadPoolExecutor threadPoolExecutor;

    private Admin hbaseAdmin;

    private Hbase queryHbaseBean;

    private Class clz;

    
    private Table hbaseTable;

    
    private String columnFamily;

    
    private String columnName;

    
    private String tableName;

    public HbaseAsyncFunctionUtil(String tableName, String columnFamily, String columnName, Class clz) {
        this(tableName, columnFamily, clz);
        this.columnName = columnName;
    }

    public HbaseAsyncFunctionUtil(String tableName, String columnFamily, Class clz) {

        this.clz = clz;
        this.tableName = tableName;
        this.columnFamily = columnFamily;
    }

    public abstract String setRowKey(IN input);

    public abstract OUT getList(String rowKey, IN input, Hbase hbaseResult);

    private void setQueryHbaseBean() throws Exception {
        queryHbaseBean = clz.newInstance();
    }

    @Override
    public void asyncInvoke(IN input, ResultFuture resultFuture) throws Exception {

        threadPoolExecutor.submit(new Runnable() {
            @Override
            public void run() {
                resultFuture.complete(Collections.singleton(getResult(input)));
            }
        });
    }


    @Override
    public void timeout(IN input, ResultFuture resultFuture) throws Exception {
        close();
        connc();
        log.info("Connection to Hbase TimeOut,Now is reConnect!");

        threadPoolExecutor.submit(new Runnable() {
            @Override
            public void run() {
                resultFuture.complete(Collections.singleton(getResult(input)));
            }
        });

    }

    private OUT getResult(IN input) {

        String rowKey = setRowKey(input);
        try {

            setQueryHbaseBean();

            Hbase hbaseResult = (Hbase) HbaseUtils.getData(connection, tableName, rowKey, columnFamily, columnName, true, queryHbaseBean.getClass());

            return getList(rowKey, input, hbaseResult);

        } catch (Exception e) {
            e.printStackTrace();
        }

        return null;
    }


    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        //赋值connection
        connc();
    }

    @Override
    public void close() throws Exception {
        super.close();

        if (hbaseTable != null) {
            hbaseTable.close();
        }

        if (hbaseAdmin != null) {
            hbaseAdmin.close();
        }

        if (connection != null) {
            connection.close();
        }
    }

    private void connc() throws IOException {
        connection = HbaseUtils.getHbaseConnection();

        hbaseAdmin = connection.getAdmin();

        if (!hbaseAdmin.tableExists(TableName.valueOf(tableName))) {
            throw new IOException("Query Hbase Table is not exists!");
        }

        hbaseTable = connection.getTable(TableName.valueOf(tableName));

        //初始化
        threadPoolExecutor = ThreadPoolUtil.getThreadPool();
    }
}
4.2、具体使用

同样还是利用nc模拟输入流数据

主类AsyncIOQueryHbase代码如下:

package com.horse.asyncio;

import com.horse.bean.PassengerInfor;
import com.horse.bean.UserLoginLog;
import com.horse.cep.function.MyFlatMapFunction;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.horse.utils.hbase.HbaseAsyncFunctionUtil;

import java.util.concurrent.TimeUnit;


public class AsyncIOQueryHbase {
    public static void main(String[] args) {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        DataStreamSource socketTextStream = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator dataStream = socketTextStream.flatMap(new MyFlatMapFunction());

        SingleOutputStreamOperator outputStreamOperator = AsyncDataStream.unorderedWait(dataStream,
                new HbaseAsyncFunctionUtil("aaron:test1", "first", "", PassengerInfor.class) {

                    @Override
                    public String setRowKey(UserLoginLog input) {
                        return input.getUserName();
                    }

                    @Override
                    public PassengerInfor getList(String rowKey, UserLoginLog input, PassengerInfor hbaseResult) {
                        return hbaseResult;
                    }

                }, 1000, TimeUnit.MINUTES, 100);

        outputStreamOperator.print("异步IO查询Hbase结果输出");

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

在nc中输入如下数据:

{"loginId":11111,"loginTime":1645177352000,"loginStatus":1,"userName":"11111"}

查看日志:

 可以看到如上图中所示,已经能够从hbase中查询出结果。

如果觉得写的不错的,可以适当表示下哈~本人表示感谢。如果有写的不到位的也可以提问。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/745405.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号