大家在开发Flink的时候,很多时候会把某些固定的维度信息存储在Hbase或者Redis等这些第三方库里,已方便来跟流发生关联查询输出。本文将从如何本地搭建一个Hbase环境开始讲起,到如何构建一个Hbase公共调用类,到如何构建一个异步调用Hbase的公共代码,再到最后实际调用代码后的输出。
2、本地利用Docker搭建Hbase环境本地如何搭建Docker环境,之前一篇博客中已经详细描述过,大家如果想学习如何搭建,可以去看下,地址如下:Docker入门-Windows 10&Mac系统安装_一个数据小开发的博客-CSDN博客一、何为DockerDocker 是一个开源的应用容器引擎,基于Go语言并遵从 Apache2.0 协议开源。Docker 可以让开发者打包他们的应用以及依赖包到一个轻量级、可移植的容器中,然后发布到任何流行的 Linux 机器上,也可以实现虚拟化。容器是完全使用沙箱机制,相互之间不会有任何接口(类似 iPhone 的 app),更重要的是容器性能开销极低。runc 是一个 Linux 命令行工具,用于根据 OCI容器运行时规范 创建和运行容器。containerd 是一个守护程序https://blog.csdn.net/Aaron_ch/article/details/115559512?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522164568960216780271525279%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fblog.%2522%257D&request_id=164568960216780271525279&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~blog~first_rank_ecpm_v1~rank_v31_ecpm-1-115559512.nonecase&utm_term=docker&spm=1018.2226.3001.4450
2.1、下载Hbase镜像docker pull harisekhon/hbase2.2、启动Hbase镜像:
docker run -d -h myhbase -p 2181:2181 -p 8080:8080 -p 8085:8085 -p 9090:9090 -p 9095:9095 -p 16000:16000 -p 16010:16010 -p 16020:16020 -p 16201:16201 -p 16301:16301 --name hbase harisekhon/hbase
Hbase 端口映射图:
2.3、本地添加host打开/etc/hosts ,如下图所示,添加一行
127.0.0.1 myhbase
此时在浏览器中输入hbase访问的地址,可以看到hbase的页面
http://localhost:16010/master-status
3、访问Hbase 3.1、图形化工具访问Hbase如下的图形化工具,Mac和Windows都可以访问。
如果需要图形化工具的,评论区留下邮箱,本人看到了会第一时间发送
3.2、Java工具类访问直接上代码,可以直接使用
package com.horse.utils.hbase;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.CaseFormat;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.flink.util.StringUtils;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.List;
@Slf4j
public class HbaseUtils {
public static Connection getHbaseConnection() {
try {
// 1.获取配置文件信息
Configuration configuration = HbaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", "localhost");
// configuration.set("hbase.zookeeper.property.clientPort", "2181");//可写可不写,默认为2181
// 2.创建连接对象
return ConnectionFactory.createConnection(configuration);
} catch (IOException e) {
log.error("获取Hbase相关基础信息失");
e.printStackTrace();
}
return null;
}
// static {
// try {
// // 1.获取配置文件信息
// Configuration configuration = HbaseConfiguration.create();
// configuration.set("hbase.zookeeper.quorum", "localhost");
// configuration.set("hbase.zookeeper.property.clientPort", "2181");//可写可不写,默认为2181
//
// // 2.创建连接对象
// connection = ConnectionFactory.createConnection(configuration);
//
// // 3.创建Admin对象
// admin = connection.getAdmin();
//
// } catch (IOException e) {
// log.error("获取Hbase相关基础信息失");
// e.printStackTrace();
// }
// }
public static boolean isTableExist(String tableName, Admin admin) throws IOException {
// 1.判断表是否存在
boolean exists = admin.tableExists(TableName.valueOf(tableName));
// 2.返回结果
return exists;
}
public static boolean createTable(Admin admin, String tableName, boolean createNameSpaceIfExists, String... cfs) throws IOException {
//1、判断传入的表是否正确
if (tableName.split(":").length != 2) {
log.error("{},传入的表名有问题,需要传入 nameSpace:tableName 这种格式", tableName);
return false;
}
String nameSpace = tableName.split(":")[0];
//2、判断命名空间相关信息
if (!isNameSpaceExist(admin, nameSpace)) {
if (createNameSpaceIfExists) {
log.info("{},该命名空间不存在,开始创建该命名空间", nameSpace);
createNameSpace(admin, nameSpace);
} else {
log.error("{},该命名空间不存在", nameSpace);
return false;
}
}
// 3、判断表是否存在
if (isTableExist(tableName, admin)) {
log.error("{},表已存在", tableName);
return false;
}
// 4、判断是否存在列族信息
if (cfs.length <= 0) {
log.info("请设置列族信息!");
return false;
}
// 5、创建表描述器
try {
if (admin.tableExists(TableName.valueOf(tableName))) {
return false;
}
//定义表描述对象
TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
//遍历列族数组
for (String cf : cfs) {
//定义列族描述对象
ColumnFamilyDescriptor columnFamily = ColumnFamilyDescriptorBuilder.of(cf);
//给表添加列族信息
tableDescriptorBuilder.setColumnFamily(columnFamily);
}
//创建表
admin.createTable(tableDescriptorBuilder.build());
} catch (Exception e) {
e.printStackTrace();
return false;
}
log.info("{},表新建成功", tableName);
return true;
}
public static void dropTable(Admin admin, String tableName) throws IOException {
// 1.判断表是否存在
if (!isTableExist(tableName, admin)) {
log.error("{},表不存在", tableName);
return;
}
// 2.使表下线
admin.disableTable(TableName.valueOf(tableName));
// 3.删除表
admin.deleteTable(TableName.valueOf(tableName));
}
public static boolean isNameSpaceExist(Admin admin, String nameSpace) throws IOException {
NamespaceDescriptor[] namespaceDescriptors = admin.listNamespaceDescriptors();
for (NamespaceDescriptor namespaceDescriptor : namespaceDescriptors) {
if (namespaceDescriptor.getName().equals(nameSpace)) {
return true;
}
}
return false;
}
public static void createNameSpace(Admin admin, String namespace) {
// 1.创建命名空间描述器
NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(namespace).build();
// 2.创建命名空间
try {
admin.createNamespace(namespaceDescriptor);
} catch (NamespaceExistException e) {
log.error("{},命名空间已存在", namespace);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void putData(Connection connection, String tableName, String rowKey, String columnFamily, String columnName, String value) throws IOException {
// 1.获取表对象
Table table = connection.getTable(TableName.valueOf(tableName));
// 2.创建put对象
Put put = new Put(Bytes.toBytes(rowKey));
// 3.给Put对象赋值
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName), Bytes.toBytes(value));
// 4.插入数据
table.put(put);
// 5.关闭表连接
table.close();
}
public static T getData(Connection connection, String tableName, String rowKey, String columnFamily, String columnName, boolean underScoreToCamel, Class clz) throws Exception {
//1、判断传入的参数是否为空或空字符串
if (StringUtils.isNullOrWhitespaceonly(tableName)) {
log.error("传入的tableName为空:{}", tableName);
return null;
}
if (StringUtils.isNullOrWhitespaceonly(rowKey)) {
log.error("传入的rowKey为空:{}", rowKey);
return null;
}
if (StringUtils.isNullOrWhitespaceonly(columnFamily)) {
log.error("传入的columnFamily为空:{}", columnFamily);
return null;
}
Get get = new Get(Bytes.toBytes(rowKey));
if (StringUtils.isNullOrWhitespaceonly(columnName)) {
get.addFamily(Bytes.toBytes(columnFamily));
// log.info("传入的columnName为空:{},将根据传入的其他信息查询全量的列信息", columnName);
} else {
//传入的不为空的话,就查询传入的值
get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName));
// log.info("传入的columnName为:{},将根据传入的值查询信息", columnName);
}
//4、获取表对象
Table table = connection.getTable(TableName.valueOf(tableName));
//5、设置获取数据的版本数
//get.setMaxVersions(5);
//5、获取数据
Result resultData = table.get(get);
//6、关闭表连接
table.close();
//创建 泛型对象
T t = clz.newInstance();
//给泛型对象赋值
List cells = resultData.listCells();
for (Cell cell : cells) {
String resultColumnName = Bytes.toString(CellUtil.cloneQualifier(cell));
String resultValue = Bytes.toString(CellUtil.clonevalue(cell));
//判断是否需要转换为驼峰命名
if (underScoreToCamel) {
if (resultColumnName.contains("_")) {
resultColumnName = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, resultColumnName.toLowerCase());
} else if (resultColumnName.contains("-")) {
resultColumnName = CaseFormat.LOWER_HYPHEN.to(CaseFormat.LOWER_CAMEL, resultColumnName.toLowerCase());
} else {
resultColumnName = CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_CAMEL, resultColumnName);
}
}
BeanUtils.setProperty(t, resultColumnName, resultValue);
}
return t;
}
private static void scanTable(Connection connection, String tableName, String leftScanRowKey, String rightScanRowKey) throws IOException {
// 1.获取表对象
Table table = connection.getTable(TableName.valueOf(tableName));
// 2.构建Scan对象 // 左闭右开
Scan scan = new Scan(Bytes.toBytes(leftScanRowKey), Bytes.toBytes(rightScanRowKey));
// 3.扫描表
ResultScanner resultScanner = table.getScanner(scan);
// 4.解析resultScanner
for (Result result : resultScanner) {
// 5.解析result并打印
for (Cell cell : result.rawCells()) {
// 6.打印数据
System.out.println("RowKey:" + Bytes.toString(CellUtil.cloneRow(cell)) +
",CF:" + Bytes.toString(CellUtil.cloneFamily(cell)) +
",CN:" + Bytes.toString(CellUtil.cloneQualifier(cell)) +
",Value:" + Bytes.toString(CellUtil.clonevalue(cell)));
}
}
// 7.关闭表连接
table.close();
}
public static void deleteData(Connection connection, String tableName, String rowKey, String cf, String cn) throws IOException {
// 1.获取表对象
Table table = connection.getTable(TableName.valueOf(tableName));
// 2.创建删除对象
Delete delete = new Delete(Bytes.toBytes(rowKey));
//delete.addColumns(Bytes.toBytes(cf), Bytes.toBytes(cn));
//delete.addColumns(Bytes.toBytes(cf), Bytes.toBytes(cn),1574158036021L);
//delete.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn));
//delete.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn),1574158036021L);
// 2.2 删除指定的列族
// addFamily:删除指定列族的所有列的所有版本数据。
delete.addFamily(Bytes.toBytes(cf));
// 3.执行删除操作
table.delete(delete);
// 4.关闭连接
table.close();
}
public static void close(Admin admin, Connection connection) {
if (admin != null) {
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static JSonObject getJSonObject(String rowKey, List| cells) {
JSonObject jsonObject = new JSonObject();
JSonObject valueJsonObject = new JSonObject();
for (Cell cell : cells) {
valueJsonObject.put(Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.clonevalue(cell)));
}
jsonObject.put("rowKey", rowKey);
jsonObject.put("value", valueJsonObject);
return jsonObject;
}
}
| |
比较核心的一个几个方法做出详细说明下:
3.2.1、新建表函数public static boolean createTable(Admin admin, String tableName, boolean createNameSpaceIfExists, String... cfs)
会根据传入的相关信息进行表的创建,例如:
表名是"aaron1:test1",当"aaron1"命名空间不存在的时候,会根据createNameSpaceIfExists参数来选择是否在不存在的情况下创建,如果需要创建,就会先创建该命名空间后,再去新建表。
public static void main(String[] args) throws Exception{
createTable(getHbaseConnection().getAdmin(),"aaron1:test1",false,"name","age","id");
}
当传入的createNameSpaceIfExists是true的时候
public static void main(String[] args) throws Exception{
createTable(getHbaseConnection().getAdmin(),"aaron1:test1",true,"name","age","id");
}
在web页面可以查看到已经创建成功
3.2.2、插入数据
根据传入的参数来插入具体的数据。
public static void putData(Connection connection, String tableName, String rowKey, String columnFamily, String columnName, String value)
public static void main(String[] args) throws Exception{
putData(getHbaseConnection(),"aaron1:test1","aaaa","name","schoolName","Nanj");
}
通过图形化工具可以看到数据已经插入成功
3.2.3、查询数据本函数可以通过特定的rowKey查询出结果,并把结果转成JavaBean输出
public staticT getData(Connection connection, String tableName, String rowKey, String columnFamily, String columnName, boolean underScoreToCamel, Class clz)
public static void main(String[] args) throws Exception {
PassengerInfor passengerInfor = getData(getHbaseConnection(), "aaron:test1", "11111", "first", "", true, PassengerInfor.class);
log.info("查询出来的结果为:{},", passengerInfor);
}
4、Flink异步IO访问Hbase
因为异步IO都是通过函数继承RichAsyncFunction这个抽象类,所以为了更大化的使用,先编写一个工具类。
4.1、异步IO访问Hbase工具类源码如下,其中使用的时候,使用到了如上的一些Hbase工具类:
package com.horse.utils.hbase; import com.horse.utils.ThreadPoolUtil; import lombok.extern.slf4j.Slf4j; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import java.io.IOException; import java.util.Collections; import java.util.concurrent.ThreadPoolExecutor; @Slf4j public abstract class HbaseAsyncFunctionUtil4.2、具体使用extends RichAsyncFunction { private Connection connection; private ThreadPoolExecutor threadPoolExecutor; private Admin hbaseAdmin; private Hbase queryHbaseBean; private Class clz; private Table hbaseTable; private String columnFamily; private String columnName; private String tableName; public HbaseAsyncFunctionUtil(String tableName, String columnFamily, String columnName, Class clz) { this(tableName, columnFamily, clz); this.columnName = columnName; } public HbaseAsyncFunctionUtil(String tableName, String columnFamily, Class clz) { this.clz = clz; this.tableName = tableName; this.columnFamily = columnFamily; } public abstract String setRowKey(IN input); public abstract OUT getList(String rowKey, IN input, Hbase hbaseResult); private void setQueryHbaseBean() throws Exception { queryHbaseBean = clz.newInstance(); } @Override public void asyncInvoke(IN input, ResultFuture resultFuture) throws Exception { threadPoolExecutor.submit(new Runnable() { @Override public void run() { resultFuture.complete(Collections.singleton(getResult(input))); } }); } @Override public void timeout(IN input, ResultFuture resultFuture) throws Exception { close(); connc(); log.info("Connection to Hbase TimeOut,Now is reConnect!"); threadPoolExecutor.submit(new Runnable() { @Override public void run() { resultFuture.complete(Collections.singleton(getResult(input))); } }); } private OUT getResult(IN input) { String rowKey = setRowKey(input); try { setQueryHbaseBean(); Hbase hbaseResult = (Hbase) HbaseUtils.getData(connection, tableName, rowKey, columnFamily, columnName, true, queryHbaseBean.getClass()); return getList(rowKey, input, hbaseResult); } catch (Exception e) { e.printStackTrace(); } return null; } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //赋值connection connc(); } @Override public void close() throws Exception { super.close(); if (hbaseTable != null) { hbaseTable.close(); } if (hbaseAdmin != null) { hbaseAdmin.close(); } if (connection != null) { connection.close(); } } private void connc() throws IOException { connection = HbaseUtils.getHbaseConnection(); hbaseAdmin = connection.getAdmin(); if (!hbaseAdmin.tableExists(TableName.valueOf(tableName))) { throw new IOException("Query Hbase Table is not exists!"); } hbaseTable = connection.getTable(TableName.valueOf(tableName)); //初始化 threadPoolExecutor = ThreadPoolUtil.getThreadPool(); } }
同样还是利用nc模拟输入流数据
主类AsyncIOQueryHbase代码如下:
package com.horse.asyncio;
import com.horse.bean.PassengerInfor;
import com.horse.bean.UserLoginLog;
import com.horse.cep.function.MyFlatMapFunction;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.horse.utils.hbase.HbaseAsyncFunctionUtil;
import java.util.concurrent.TimeUnit;
public class AsyncIOQueryHbase {
public static void main(String[] args) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource socketTextStream = env.socketTextStream("localhost", 8888);
SingleOutputStreamOperator dataStream = socketTextStream.flatMap(new MyFlatMapFunction());
SingleOutputStreamOperator outputStreamOperator = AsyncDataStream.unorderedWait(dataStream,
new HbaseAsyncFunctionUtil("aaron:test1", "first", "", PassengerInfor.class) {
@Override
public String setRowKey(UserLoginLog input) {
return input.getUserName();
}
@Override
public PassengerInfor getList(String rowKey, UserLoginLog input, PassengerInfor hbaseResult) {
return hbaseResult;
}
}, 1000, TimeUnit.MINUTES, 100);
outputStreamOperator.print("异步IO查询Hbase结果输出");
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
在nc中输入如下数据:
{"loginId":11111,"loginTime":1645177352000,"loginStatus":1,"userName":"11111"}
查看日志:
可以看到如上图中所示,已经能够从hbase中查询出结果。
如果觉得写的不错的,可以适当表示下哈~本人表示感谢。如果有写的不到位的也可以提问。



