最近项目使用到hbase当做数据库存储数据,经过多番网上搜索并把通用方法进行整合,对hbase操作的功能写成了自己的通用方法,现在公布出来希望对大家有所帮助,后期有新的通用功能会实时更新的,通用方法不采用springboot集成的hbase,使用原生java API 操作hbase方法
开发环境
系统:windows
JDK:1.8
开发工具:Intellij IDEA
虚拟机:vmware
三台linux节点: 172.162.15.111、172.162.15.112、172.162.15.113
Linux:CentOS 7 迷你版 CentOS-7-x86_64-Minimal-1708.iso
Hadoop: Hadoop-3.1.2
Hbase : hbase-2.0.5
Zookeeper: zookeeper-3.4.1
第二步:配置windows系统host,根据您的需求,这一步可有可无
#虚拟机访问地址
172.162.15.111 davidone
172.162.15.112 davidtwo
172.162.15.113 davidthree
第三步:项目结构配置pom.xml
org.springframework.boot spring-boot-configuration-processortrue org.apache.hbase hbase-client2.0.5 org.slf4j slf4j-log4j12log4j log4jjavax.servlet servlet-api
application.yml 这个配置以增量的方式添加到文件
logging:
config: classpath:Logback.xml
bigdata:
config:
hbaseConfigMap: {hbase.zookeeper.quorum : 'davidone:2181,davidtwo:2181,davidthree:2181'}
第四步:核心代码
@Component()
@ConfigurationProperties(prefix = "bigdata.config")
public class BigDataConfig {
private Map hbaseConfigMap;
public Map gethbaseConfigMap() {
return hbaseConfigMap;
}
public void sethbaseConfigMap(Map hbaseConfigMap) {
this.hbaseConfigMap = hbaseConfigMap;
}
}
HbaseRepository.java 通用操作hbase数据代码,包含了通用的二级搜索,分页,多条件查询,要求对hbase shell 操作有一定的了解后,能够很好的了解这个通用方法
建议去看下这位博主写的文章:Hbase学习笔记
分段进行对HbaseRepository进行展示
4.1、对hbase进行初始化
@Autowired
private BigDataConfig bigDataConfig;
private static Configuration configuration = HbaseConfiguration.create();
//设置连接池
private static ExecutorService executorServicePoolSize = Executors.newScheduledThreadPool(20);
private ThreadLocal connectionThreadLocal = new ThreadLocal<>();
private ThreadLocal adminThreadLocal = new ThreadLocal<>();
private static Connection connection = null;
private static Admin admin = null;
private final byte[] POSTFIX = new byte[] { 0x00 };
private void initConfigurationInfo(){
Map hbaseConfigMap = bigDataConfig.gethbaseConfigMap();
if(hbaseConfigMap.size() == 0){
logger.debug(MessageFormat.format("Hbase配置信息初始化失败:{0}",JSON.toJSonString(hbaseConfigMap)));
}else{
for (Map.Entry confEntry : hbaseConfigMap.entrySet()) {
configuration.set(confEntry.getKey(), confEntry.getValue());
}
}
}
private void initHbaseClientAdmin(){
try{
admin = adminThreadLocal.get();
if(admin == null && connection != null){
admin = connection.getAdmin();
adminThreadLocal.set(admin);
}else{
logger.debug(MessageFormat.format("创建hbase connection连接 失败:{0}",connection));
}
}catch (Exception e){
logger.error(MessageFormat.format("初始化hbase client admin 客户端管理失败:错误信息:{0}",e));
}
}
public void initRepository(){
try{
initConfigurationInfo();
connection = connectionThreadLocal.get();
if(connection == null){
connection = ConnectionFactory.createConnection(configuration, executorServicePoolSize);
connectionThreadLocal.set(connection);
}
initHbaseClientAdmin();
}catch (Exception e){
logger.error(MessageFormat.format("创建hbase connection 连接失败:{0}",e));
e.printStackTrace();
}finally {
close(admin,null,null);
}
}
4.2、提取使用到的通用方法
private Table getTable(String tableName) {
try {
return connection.getTable(TableName.valueOf(tableName));
}catch (Exception e){
e.printStackTrace();
return null;
}
}
private int createTable(String tableName,List columnFamily) throws Exception {
if(admin.tableExists(TableName.valueOf(tableName))){
logger.debug(MessageFormat.format("创建Hbase表名:{0} 在Hbase数据库中已经存在",tableName));
return 2;
}else{
List familyDescriptors =new ArrayList<>(columnFamily.size());
for(String column : columnFamily){
familyDescriptors.add(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(column)).build());
}
TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))
.setColumnFamilies(familyDescriptors).build();
admin.createTable(tableDescriptor);
logger.info(MessageFormat.format("创建表成功:表名:{0},列簇:{1}",tableName,JSON.toJSonString(columnFamily)));
return 1;
}
}
private FilterList handlePageFilterData(Scan scan, int pageSize,String lastRowKey){
Filter pageFilter = new PageFilter(pageSize);
FilterList pageFilterList = new FilterList();
pageFilterList.addFilter(pageFilter);
if(!StringUtils.isEmpty(lastRowKey)){
byte[] startRow = Bytes.add(Bytes.toBytes(lastRowKey), POSTFIX);
scan.setStartRow(startRow);
}
return pageFilterList;
}
private FilterList queryFilterData(String columnFamily,List queryParam,String regex,boolean bool){
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
if(!bool){
filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
}
for(String param: queryParam){
String[] queryArray = param.split(regex);
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes(columnFamily), Bytes.toBytes(queryArray[0]), CompareOperator.EQUAL,Bytes.toBytes(queryArray[1]));
singleColumnValueFilter.setFilterIfMissing(true);
filterList.addFilter(singleColumnValueFilter);
}
return filterList;
}
private List
4.3、根据个人需求可以同时创建单张或多张数据表
public boolean createManyTable(Map> tableMap){
try{
if(admin != null){
for (Map.Entry> confEntry : tableMap.entrySet()) {
createTable(confEntry.getKey(), confEntry.getValue());
}
}
}catch (Exception e){
logger.error(MessageFormat.format("创建多个表出现未知错误:{0}",e.getMessage()));
e.printStackTrace();
return false;
}finally {
close(admin,null,null);
}
return true;
}
public int createoneTable (String tableName,String... columnFamily){
try{
//创建表,先查看表是否存在,然后在删除重新创建
if(admin != null){
return createTable(tableName,Arrays.asList(columnFamily));
}else{
logger.error("admin变量没有初始化成功");
return 0;
}
}catch (Exception e){
logger.debug(MessageFormat.format("创建表失败:{0},错误信息是:{1}",tableName,e.getMessage()));
e.printStackTrace();
return 0;
}finally {
close(admin,null,null);
}
}
4.4、同时添加或更新多列和单列
public boolean insertManyColumnRecords(String tableName,String row,String columnFamily,List columns,List values){
try{
Table table = getTable(tableName);
Put put = new Put(Bytes.toBytes(row));
for(int i = 0; i < columns.size(); i++){
put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columns.get(i)),Bytes.toBytes(values.get(i)));
table.put(put);
}
logger.info(MessageFormat.format("添加单行单列族-多列多值数据成功:表名:{0},列名:{1},列值:{2}",tableName, JSON.toJSonString(columns),JSON.toJSonString(values)));
return true;
}catch (Exception e){
e.printStackTrace();
logger.debug(MessageFormat.format("添加单行单列族-多列多值数据失败:表名:{0};错误信息是:{1}",tableName,e.getMessage()));
return false;
}
}
public boolean insertoneColumnRecords(String tableName,String row,String columnFamily,String column,String value){
try{
Table table = getTable(tableName);
Put put = new Put(Bytes.toBytes(row));
put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(column),Bytes.toBytes(value));
table.put(put);
logger.info(MessageFormat.format("添加单行单列族-单列单值数据成功:表名:{0},列名:{1},列值:{2}",tableName,column,value));
return true;
}catch (Exception e){
logger.debug(MessageFormat.format("添加单行单列族-单列单值数据失败:表名:{0},错误信息是:{1}",tableName,e.getMessage()));
e.printStackTrace();
return false;
}
}
4.5、对hbase 做删除数据或是删除表的操作
public boolean deleteDataByRowNumber(String tableName,String rowNumber){
try{
Table table = getTable(tableName);
Delete delete = new Delete(Bytes.toBytes(rowNumber));
table.delete(delete);
logger.info(MessageFormat.format("根据行号删除表中记录成功:表名:{0},行号:{1}",tableName,rowNumber));
return true;
}catch (Exception e){
e.printStackTrace();
logger.debug(MessageFormat.format("根据行号删除表中记录失败:表名:{0},行号:{1}",tableName,rowNumber));
return false;
}
}
public boolean deleteDataByColumnFamily(String tableName,String columnFamily){
try{
if(!admin.tableExists(TableName.valueOf(tableName))){
logger.debug(MessageFormat.format("根据行号和列簇名称删除这行列簇相关的数据失败:表名不存在:{0}",tableName));
return false;
}
admin.deleteColumnFamily(TableName.valueOf(tableName),Bytes.toBytes(columnFamily));
logger.info(MessageFormat.format("删除该表中列簇下所有数据成功:表名:{0},列簇:{1}",tableName,columnFamily));
return true;
}catch (Exception e){
e.printStackTrace();
logger.debug(MessageFormat.format("删除该表中列簇下所有数据失败:表名:{0},列簇:{1},错误信息:{2}",tableName,columnFamily,e.getMessage()));
return false;
}
}
public boolean deleteDataByColumn(String tableName,String rowNumber,String columnFamily,String cloumn){
try{
if(!admin.tableExists(TableName.valueOf(tableName))){
logger.debug(MessageFormat.format("根据行号表名列簇删除指定列 ->删除最新列,保留旧列失败:表名不存在:{0}",tableName));
return false;
}
Table table = getTable(tableName);
Delete delete = new Delete(rowNumber.getBytes());
delete.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(cloumn));
table.delete(delete);
logger.info(MessageFormat.format("根据行号表名列簇删除指定列 ->删除最新列,保留旧列成功:表名:{0},行号:{1},列簇:{2},列:{3}",tableName,rowNumber,columnFamily,cloumn));
return true;
}catch (Exception e){
e.printStackTrace();
logger.info(MessageFormat.format("根据行号表名列簇删除指定列 ->删除最新列,保留旧列失败:表名:{0},行号:{1},列簇:{2},列:{3},错误信息:{4}",tableName,rowNumber,columnFamily,cloumn,e.getMessage()));
return false;
}
}
public boolean deleteDataByAllcolumn(String tableName,String rowNumber,String columnFamily,String cloumn){
try{
if(!admin.tableExists(TableName.valueOf(tableName))){
logger.debug(MessageFormat.format("根据行号表名列簇删除指定列 ->新旧列都会删除失败:表名不存在:{0}",tableName));
return false;
}
Table table = getTable(tableName);
Delete delete = new Delete(rowNumber.getBytes());
delete.addColumns(Bytes.toBytes(columnFamily),Bytes.toBytes(cloumn));
table.delete(delete);
logger.info(MessageFormat.format("根据行号表名列簇删除指定列 ->新旧列都会删除成功:表名:{0},行号:{1},列簇:{2},列:{3}",tableName,rowNumber,columnFamily,cloumn));
return true;
}catch (Exception e){
e.printStackTrace();
logger.error(MessageFormat.format("根据行号表名列簇删除指定列 ->新旧列都会删除失败:表名:{0},行号:{1},列簇:{2},列:{3},错误信息:{4}",tableName,rowNumber,columnFamily,cloumn,e.getMessage()));
return false;
}
}
public boolean deleteTable(String tableName){
try{
TableName table = TableName.valueOf(tableName);
if(admin.tableExists(table)){
//禁止使用表,然后删除表
admin.disableTable(table);
admin.deleteTable(table);
}
logger.info(MessageFormat.format("删除表成功:{0}",tableName));
return true;
}catch (Exception e){
e.printStackTrace();
logger.debug(MessageFormat.format("删除表失败:{0},错误信息是:{1}",tableName,e.getMessage()));
return false;
}finally {
close(admin,null,null);
}
}
4.6、对hbase查询操作 查询条件包含着and 或or 多条件查询,但是没有模糊查询
public List getAllTableNames(){
List resultList = new ArrayList<>();
try {
TableName[] tableNames = admin.listTableNames();
for(TableName tableName : tableNames){
resultList.add(tableName.getNameAsString());
}
logger.info(MessageFormat.format("查询库中所有表的表名成功:{0}",JSON.toJSonString(resultList)));
}catch (IOException e) {
logger.error("获取所有表的表名失败",e);
}finally {
close(admin,null,null);
}
return resultList;
}
public Map selectoneRowDataMap(String tableName,String rowNumber){
Map resultMap = new HashMap<>();
Get get = new Get(Bytes.toBytes(rowNumber));
Table table = getTable(tableName);
try{
Result result = table.get(get);
if(result !=null && !result.isEmpty()){
for(Cell cell : result.listCells()){
resultMap.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()),
Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())
);
}
}
logger.info(MessageFormat.format("根据表名和行号查询数据:表名:{0},行号:{1},查询结果:{2}",tableName,rowNumber,JSON.toJSonString(resultMap)));
}catch (Exception e){
e.printStackTrace();
logger.debug(MessageFormat.format("根据表名和行号查询数据失败:表名:{0},行号:{1},错误信息:{2}",tableName,rowNumber,e.getMessage()));
}finally {
close(null,null,table);
}
return resultMap;
}
public List> selectTableDataByFilter(String tableName,String columnFamily,List queryParam,String regex,boolean bool){
Scan scan = new Scan();
Table table = getTable(tableName);
FilterList filterList = queryFilterData(columnFamily, queryParam, regex, bool);
scan.setFilter(filterList);
return queryData(table,scan);
}
public List> selectColumnValueDataByFilter(String tableName,String columnFamily,List queryParam,String regex,String column,boolean bool){
Scan scan = new Scan();
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
if(!bool){
filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
}
Table table = getTable(tableName);
for(String param: queryParam){
String[] queryArray = param.split(regex);
SingleColumnValueExcludeFilter singleColumnValueExcludeFilter = new SingleColumnValueExcludeFilter(Bytes.toBytes(columnFamily), Bytes.toBytes(queryArray[0]), CompareOperator.EQUAL,Bytes.toBytes(queryArray[1]));
filterList.addFilter(singleColumnValueExcludeFilter);
scan.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(queryArray[0]));
}
scan.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(column));
scan.setFilter(filterList);
return queryData(table,scan);
}
public List> selectTableAllDataMap(String tableName){
Table table = getTable(tableName);
Scan scan = new Scan();
return queryData(table,scan);
}
public List> selectTableAllDataMap(String tableName,String columnFamily){
Table table = getTable(tableName);
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes(columnFamily));
return queryData(table,scan);
}
public Map selectTableByRowNumberAndColumnFamily(String tableName,String rowNumber,String columnFamily){
ResultScanner resultScanner = null;
Map resultMap = new HashMap<>();
Table table = getTable(tableName);
try {
Get get = new Get(Bytes.toBytes(rowNumber));
get.addFamily(Bytes.toBytes(columnFamily));
Result result = table.get(get);
for(Cell cell :result.listCells()){
resultMap.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()),
Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())
);
}
logger.info(MessageFormat.format("根据行号和列簇查询表中数据信息:表名:{0},行号:{1},列簇:{2},查询结果:{3}",tableName,rowNumber,columnFamily,JSON.toJSonString(resultMap)));
} catch (IOException e) {
e.printStackTrace();
logger.info(MessageFormat.format("根据行号和列簇查询表中数据信息:表名:{0},行号:{1},列簇:{2},错误信息:{3}",tableName,rowNumber,columnFamily,e.getMessage()));
}finally {
close(null,resultScanner,table);
}
return resultMap;
}
public String selectColumnValue(String tableName,String rowNumber,String columnFamily,String column){
Table table = getTable(tableName);
try {
Get get = new Get(Bytes.toBytes(rowNumber));
get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
Result result = table.get(get);
logger.info(MessageFormat.format("根据表名、行号、列簇、列查询指定列的值:表名:{0},行号:{1},列簇:{2},列名:{3},查询结果:{4}",tableName,rowNumber,columnFamily,column,Bytes.toString(result.value())));
return Bytes.toString(result.value());
} catch (IOException e) {
e.printStackTrace();
logger.info(MessageFormat.format("根据表名、行号、列簇、列查询指定列的值:表名:{0},行号:{1},列簇:{2},列名:{3},错误信息:{4}",tableName,rowNumber,columnFamily,column,e.getMessage()));
return "";
}finally {
close(null,null,table);
}
}
4.7、对hbase分页查询
public List> selectTableDataByFilterPage(String tableName,String columnFamily,List queryParam,String regex,boolean bool,int pageSize,String lastRow){
Scan scan = new Scan();
Table table = getTable(tableName);
FilterList filterList = queryFilterData(columnFamily, queryParam, regex, bool);
FilterList pageFilterList = handlePageFilterData(scan, pageSize, lastRow);
pageFilterList.addFilter(filterList);
scan.setFilter(pageFilterList);
return queryData(table,scan);
}
public List> selectTableAllDataMapPage(String tableName,int pageSize,String lastRow){
Scan scan = new Scan();
Table table = getTable(tableName);
FilterList pageFilterList = handlePageFilterData(scan, pageSize, lastRow);
scan.setFilter(pageFilterList);
return queryData(table,scan);
}
下面是完整的HbaseRepository
@Component
public class HbaseRepository{
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private BigDataConfig bigDataConfig;
private static Configuration configuration = HbaseConfiguration.create();
//设置连接池
private static ExecutorService executorServicePoolSize = Executors.newScheduledThreadPool(20);
private ThreadLocal connectionThreadLocal = new ThreadLocal<>();
private ThreadLocal adminThreadLocal = new ThreadLocal<>();
private static Connection connection = null;
private static Admin admin = null;
private final byte[] POSTFIX = new byte[] { 0x00 };
private void initConfigurationInfo(){
Map hbaseConfigMap = bigDataConfig.gethbaseConfigMap();
if(hbaseConfigMap.size() == 0){
logger.debug(MessageFormat.format("Hbase配置信息初始化失败:{0}",JSON.toJSonString(hbaseConfigMap)));
}else{
for (Map.Entry confEntry : hbaseConfigMap.entrySet()) {
configuration.set(confEntry.getKey(), confEntry.getValue());
}
}
}
private void initHbaseClientAdmin(){
try{
admin = adminThreadLocal.get();
if(admin == null && connection != null){
admin = connection.getAdmin();
adminThreadLocal.set(admin);
}else{
logger.debug(MessageFormat.format("创建hbase connection连接 失败:{0}",connection));
}
}catch (Exception e){
logger.error(MessageFormat.format("初始化hbase client admin 客户端管理失败:错误信息:{0}",e));
}
}
public void initRepository(){
try{
initConfigurationInfo();
connection = connectionThreadLocal.get();
if(connection == null){
connection = ConnectionFactory.createConnection(configuration, executorServicePoolSize);
connectionThreadLocal.set(connection);
}
initHbaseClientAdmin();
}catch (Exception e){
logger.error(MessageFormat.format("创建hbase connection 连接失败:{0}",e));
e.printStackTrace();
}finally {
close(admin,null,null);
}
}
private Table getTable(String tableName) {
try {
return connection.getTable(TableName.valueOf(tableName));
}catch (Exception e){
e.printStackTrace();
return null;
}
}
public boolean createManyTable(Map> tableMap){
try{
if(admin != null){
for (Map.Entry> confEntry : tableMap.entrySet()) {
createTable(confEntry.getKey(), confEntry.getValue());
}
}
}catch (Exception e){
logger.error(MessageFormat.format("创建多个表出现未知错误:{0}",e.getMessage()));
e.printStackTrace();
return false;
}finally {
close(admin,null,null);
}
return true;
}
public int createoneTable (String tableName,String... columnFamily){
try{
//创建表,先查看表是否存在,然后在删除重新创建
if(admin != null){
return createTable(tableName,Arrays.asList(columnFamily));
}else{
logger.error("admin变量没有初始化成功");
return 0;
}
}catch (Exception e){
logger.debug(MessageFormat.format("创建表失败:{0},错误信息是:{1}",tableName,e.getMessage()));
e.printStackTrace();
return 0;
}finally {
close(admin,null,null);
}
}
private int createTable(String tableName,List columnFamily) throws Exception {
if(admin.tableExists(TableName.valueOf(tableName))){
logger.debug(MessageFormat.format("创建Hbase表名:{0} 在Hbase数据库中已经存在",tableName));
return 2;
}else{
List familyDescriptors =new ArrayList<>(columnFamily.size());
for(String column : columnFamily){
familyDescriptors.add(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(column)).build());
}
TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))
.setColumnFamilies(familyDescriptors).build();
admin.createTable(tableDescriptor);
logger.info(MessageFormat.format("创建表成功:表名:{0},列簇:{1}",tableName,JSON.toJSonString(columnFamily)));
return 1;
}
}
public boolean insertManyColumnRecords(String tableName,String row,String columnFamily,List columns,List values){
try{
Table table = getTable(tableName);
Put put = new Put(Bytes.toBytes(row));
for(int i = 0; i < columns.size(); i++){
put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columns.get(i)),Bytes.toBytes(values.get(i)));
table.put(put);
}
logger.info(MessageFormat.format("添加单行单列族-多列多值数据成功:表名:{0},列名:{1},列值:{2}",tableName, JSON.toJSonString(columns),JSON.toJSonString(values)));
return true;
}catch (Exception e){
e.printStackTrace();
logger.debug(MessageFormat.format("添加单行单列族-多列多值数据失败:表名:{0};错误信息是:{1}",tableName,e.getMessage()));
return false;
}
}
public boolean insertoneColumnRecords(String tableName,String row,String columnFamily,String column,String value){
try{
Table table = getTable(tableName);
Put put = new Put(Bytes.toBytes(row));
put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(column),Bytes.toBytes(value));
table.put(put);
logger.info(MessageFormat.format("添加单行单列族-单列单值数据成功:表名:{0},列名:{1},列值:{2}",tableName,column,value));
return true;
}catch (Exception e){
logger.debug(MessageFormat.format("添加单行单列族-单列单值数据失败:表名:{0},错误信息是:{1}",tableName,e.getMessage()));
e.printStackTrace();
return false;
}
}
public boolean deleteDataByRowNumber(String tableName,String rowNumber){
try{
Table table = getTable(tableName);
Delete delete = new Delete(Bytes.toBytes(rowNumber));
table.delete(delete);
logger.info(MessageFormat.format("根据行号删除表中记录成功:表名:{0},行号:{1}",tableName,rowNumber));
return true;
}catch (Exception e){
e.printStackTrace();
logger.debug(MessageFormat.format("根据行号删除表中记录失败:表名:{0},行号:{1}",tableName,rowNumber));
return false;
}
}
public boolean deleteDataByColumnFamily(String tableName,String columnFamily){
try{
if(!admin.tableExists(TableName.valueOf(tableName))){
logger.debug(MessageFormat.format("根据行号和列簇名称删除这行列簇相关的数据失败:表名不存在:{0}",tableName));
return false;
}
admin.deleteColumnFamily(TableName.valueOf(tableName),Bytes.toBytes(columnFamily));
logger.info(MessageFormat.format("删除该表中列簇下所有数据成功:表名:{0},列簇:{1}",tableName,columnFamily));
return true;
}catch (Exception e){
e.printStackTrace();
logger.debug(MessageFormat.format("删除该表中列簇下所有数据失败:表名:{0},列簇:{1},错误信息:{2}",tableName,columnFamily,e.getMessage()));
return false;
}
}
public boolean deleteDataByColumn(String tableName,String rowNumber,String columnFamily,String cloumn){
try{
if(!admin.tableExists(TableName.valueOf(tableName))){
logger.debug(MessageFormat.format("根据行号表名列簇删除指定列 ->删除最新列,保留旧列失败:表名不存在:{0}",tableName));
return false;
}
Table table = getTable(tableName);
Delete delete = new Delete(rowNumber.getBytes());
delete.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(cloumn));
table.delete(delete);
logger.info(MessageFormat.format("根据行号表名列簇删除指定列 ->删除最新列,保留旧列成功:表名:{0},行号:{1},列簇:{2},列:{3}",tableName,rowNumber,columnFamily,cloumn));
return true;
}catch (Exception e){
e.printStackTrace();
logger.info(MessageFormat.format("根据行号表名列簇删除指定列 ->删除最新列,保留旧列失败:表名:{0},行号:{1},列簇:{2},列:{3},错误信息:{4}",tableName,rowNumber,columnFamily,cloumn,e.getMessage()));
return false;
}
}
public boolean deleteDataByAllcolumn(String tableName,String rowNumber,String columnFamily,String cloumn){
try{
if(!admin.tableExists(TableName.valueOf(tableName))){
logger.debug(MessageFormat.format("根据行号表名列簇删除指定列 ->新旧列都会删除失败:表名不存在:{0}",tableName));
return false;
}
Table table = getTable(tableName);
Delete delete = new Delete(rowNumber.getBytes());
delete.addColumns(Bytes.toBytes(columnFamily),Bytes.toBytes(cloumn));
table.delete(delete);
logger.info(MessageFormat.format("根据行号表名列簇删除指定列 ->新旧列都会删除成功:表名:{0},行号:{1},列簇:{2},列:{3}",tableName,rowNumber,columnFamily,cloumn));
return true;
}catch (Exception e){
e.printStackTrace();
logger.error(MessageFormat.format("根据行号表名列簇删除指定列 ->新旧列都会删除失败:表名:{0},行号:{1},列簇:{2},列:{3},错误信息:{4}",tableName,rowNumber,columnFamily,cloumn,e.getMessage()));
return false;
}
}
public boolean deleteTable(String tableName){
try{
TableName table = TableName.valueOf(tableName);
if(admin.tableExists(table)){
//禁止使用表,然后删除表
admin.disableTable(table);
admin.deleteTable(table);
}
logger.info(MessageFormat.format("删除表成功:{0}",tableName));
return true;
}catch (Exception e){
e.printStackTrace();
logger.debug(MessageFormat.format("删除表失败:{0},错误信息是:{1}",tableName,e.getMessage()));
return false;
}finally {
close(admin,null,null);
}
}
public List getAllTableNames(){
List resultList = new ArrayList<>();
try {
TableName[] tableNames = admin.listTableNames();
for(TableName tableName : tableNames){
resultList.add(tableName.getNameAsString());
}
logger.info(MessageFormat.format("查询库中所有表的表名成功:{0}",JSON.toJSonString(resultList)));
}catch (IOException e) {
logger.error("获取所有表的表名失败",e);
}finally {
close(admin,null,null);
}
return resultList;
}
public Map selectoneRowDataMap(String tableName,String rowNumber){
Map resultMap = new HashMap<>();
Get get = new Get(Bytes.toBytes(rowNumber));
Table table = getTable(tableName);
try{
Result result = table.get(get);
if(result !=null && !result.isEmpty()){
for(Cell cell : result.listCells()){
resultMap.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()),
Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())
);
}
}
logger.info(MessageFormat.format("根据表名和行号查询数据:表名:{0},行号:{1},查询结果:{2}",tableName,rowNumber,JSON.toJSonString(resultMap)));
}catch (Exception e){
e.printStackTrace();
logger.debug(MessageFormat.format("根据表名和行号查询数据失败:表名:{0},行号:{1},错误信息:{2}",tableName,rowNumber,e.getMessage()));
}finally {
close(null,null,table);
}
return resultMap;
}
public List> selectTableDataByFilter(String tableName,String columnFamily,List queryParam,String regex,boolean bool){
Scan scan = new Scan();
Table table = getTable(tableName);
FilterList filterList = queryFilterData(columnFamily, queryParam, regex, bool);
scan.setFilter(filterList);
return queryData(table,scan);
}
public List> selectTableDataByFilterPage(String tableName,String columnFamily,List queryParam,String regex,boolean bool,int pageSize,String lastRow){
Scan scan = new Scan();
Table table = getTable(tableName);
FilterList filterList = queryFilterData(columnFamily, queryParam, regex, bool);
FilterList pageFilterList = handlePageFilterData(scan, pageSize, lastRow);
pageFilterList.addFilter(filterList);
scan.setFilter(pageFilterList);
return queryData(table,scan);
}
private FilterList handlePageFilterData(Scan scan, int pageSize,String lastRowKey){
Filter pageFilter = new PageFilter(pageSize);
FilterList pageFilterList = new FilterList();
pageFilterList.addFilter(pageFilter);
if(!StringUtils.isEmpty(lastRowKey)){
byte[] startRow = Bytes.add(Bytes.toBytes(lastRowKey), POSTFIX);
scan.setStartRow(startRow);
}
return pageFilterList;
}
private FilterList queryFilterData(String columnFamily,List queryParam,String regex,boolean bool){
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
if(!bool){
filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
}
for(String param: queryParam){
String[] queryArray = param.split(regex);
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes(columnFamily), Bytes.toBytes(queryArray[0]), CompareOperator.EQUAL,Bytes.toBytes(queryArray[1]));
singleColumnValueFilter.setFilterIfMissing(true);
filterList.addFilter(singleColumnValueFilter);
}
return filterList;
}
public List> selectColumnValueDataByFilter(String tableName,String columnFamily,List queryParam,String regex,String column,boolean bool){
Scan scan = new Scan();
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
if(!bool){
filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
}
Table table = getTable(tableName);
for(String param: queryParam){
String[] queryArray = param.split(regex);
SingleColumnValueExcludeFilter singleColumnValueExcludeFilter = new SingleColumnValueExcludeFilter(Bytes.toBytes(columnFamily), Bytes.toBytes(queryArray[0]), CompareOperator.EQUAL,Bytes.toBytes(queryArray[1]));
filterList.addFilter(singleColumnValueExcludeFilter);
scan.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(queryArray[0]));
}
scan.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(column));
scan.setFilter(filterList);
return queryData(table,scan);
}
public List> selectTableAllDataMap(String tableName){
Table table = getTable(tableName);
Scan scan = new Scan();
return queryData(table,scan);
}
public List> selectTableAllDataMapPage(String tableName,int pageSize,String lastRow){
Scan scan = new Scan();
Table table = getTable(tableName);
FilterList pageFilterList = handlePageFilterData(scan, pageSize, lastRow);
scan.setFilter(pageFilterList);
return queryData(table,scan);
}
public List> selectTableAllDataMap(String tableName,String columnFamily){
Table table = getTable(tableName);
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes(columnFamily));
return queryData(table,scan);
}
public Map selectTableByRowNumberAndColumnFamily(String tableName,String rowNumber,String columnFamily){
ResultScanner resultScanner = null;
Map resultMap = new HashMap<>();
Table table = getTable(tableName);
try {
Get get = new Get(Bytes.toBytes(rowNumber));
get.addFamily(Bytes.toBytes(columnFamily));
Result result = table.get(get);
for(Cell cell :result.listCells()){
resultMap.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()),
Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())
);
}
logger.info(MessageFormat.format("根据行号和列簇查询表中数据信息:表名:{0},行号:{1},列簇:{2},查询结果:{3}",tableName,rowNumber,columnFamily,JSON.toJSonString(resultMap)));
} catch (IOException e) {
e.printStackTrace();
logger.info(MessageFormat.format("根据行号和列簇查询表中数据信息:表名:{0},行号:{1},列簇:{2},错误信息:{3}",tableName,rowNumber,columnFamily,e.getMessage()));
}finally {
close(null,resultScanner,table);
}
return resultMap;
}
public String selectColumnValue(String tableName,String rowNumber,String columnFamily,String column){
Table table = getTable(tableName);
try {
Get get = new Get(Bytes.toBytes(rowNumber));
get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
Result result = table.get(get);
logger.info(MessageFormat.format("根据表名、行号、列簇、列查询指定列的值:表名:{0},行号:{1},列簇:{2},列名:{3},查询结果:{4}",tableName,rowNumber,columnFamily,column,Bytes.toString(result.value())));
return Bytes.toString(result.value());
} catch (IOException e) {
e.printStackTrace();
logger.info(MessageFormat.format("根据表名、行号、列簇、列查询指定列的值:表名:{0},行号:{1},列簇:{2},列名:{3},错误信息:{4}",tableName,rowNumber,columnFamily,column,e.getMessage()));
return "";
}finally {
close(null,null,table);
}
}
private List> queryData(Table table,Scan scan){
ResultScanner resultScanner =null;
List> resultList = new ArrayList<>();
try {
resultScanner = table.getScanner(scan);
for(Result result : resultScanner){
logger.info(MessageFormat.format("查询每条Hbase数据的行号:{0}",Bytes.toString(result.getRow())));
Map resultMap = new HashMap<>();
resultMap.put("rowKey",Bytes.toString(result.getRow()));
for(Cell cell :result.listCells()){
resultMap.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()),
Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())
);
}
resultList.add(resultMap);
}
logger.info(MessageFormat.format("查询指定表中数据信息:表名:{0},查询结果:{1}",Bytes.toString(table.getName().getName()),JSON.toJSonString(resultList)));
} catch (Exception e) {
e.printStackTrace();
logger.debug(MessageFormat.format("查询指定表中数据信息:表名:{0},错误信息:{1}",Bytes.toString(table.getName().getName()),e.getMessage()));
}finally {
close(null,resultScanner,table);
}
return resultList;
}
public int getTableDataCount(String tableName){
Table table = getTable(tableName);
Scan scan = new Scan();
return queryDataCount(table,scan);
}
public int getTableDataCount(String tableName,String columnFamily){
Table table = getTable(tableName);
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes(columnFamily));
return queryDataCount(table,scan);
}
private int queryDataCount(Table table,Scan scan){
scan.setFilter(new FirstKeyonlyFilter());
ResultScanner resultScanner = null;
int rowCount = 0;
try {
resultScanner = table.getScanner(scan);
for(Result result : resultScanner){
rowCount += result.size();
}
logger.info(MessageFormat.format("统计全表数据总数:表名:{0},查询结果:{1}",Bytes.toString(table.getName().getName()),rowCount));
return rowCount;
} catch (Exception e) {
e.printStackTrace();
logger.debug(MessageFormat.format("查询指定表中数据信息:表名:{0},错误信息:{1}",Bytes.toString(table.getName().getName()),e.getMessage()));
return rowCount;
}finally {
close(null,resultScanner,table);
}
}
private void close(Admin admin, ResultScanner rs, Table table){
if(admin != null){
try {
admin.close();
} catch (IOException e) {
logger.error("关闭Admin失败",e);
}
}
if(rs != null){
rs.close();
}
if(table != null){
try {
table.close();
} catch (IOException e) {
logger.error("关闭Table失败",e);
}
}
}
}
针对HbaseRepository通用方法的测试用例
@RunWith(SpringRunner.class)
@SpringBootTest
public class HbaseUtilsTest {
@Autowired
private HbaseRepository hbaseRepository;
@Test
public void initTable(){
hbaseRepository.initRepository();
Map> tableMap = new HashMap<>();
tableMap.put("david_topic:actionFlow",Arrays.asList("info","logs"));
tableMap.put("david_topic:recording",Arrays.asList("info","logs"));
tableMap.put("david_topic:syncLog",Arrays.asList("info","logs"));
tableMap.put("david_topic:uploadVersion",Arrays.asList("info","logs"));
hbaseRepository.createManyTable(tableMap);
}
@Test
public void createoneTable(){
//在指定命名空间创建表
hbaseRepository.createoneTable("david_topic:topictest","info","log");
}
@Test
public void deleteTable(){
hbaseRepository.deleteTable("david_topic:topictest");
}
@Test
public void insertManyColumnRecords(){
String tableName = "david_topic:topictest";
String rowNumber = "105155";
String columnFamily = "info";
List columns = Arrays.asList("id", "title", "poform", "dept", "level", "createUser");
List values = Arrays.asList("105155", "瑞风S4品牌临时页面", "Mobile", "营销中心", "普通", "王丽");
hbaseRepository.insertManyColumnRecords(tableName, rowNumber, columnFamily, columns, values);
}
@Test
public void selectTableByRowNumberAndColumnFamily(){
String tableName = "david_topic:topictest";
String rowNumber = "105155";
String columnFamily = "info";
hbaseRepository.selectTableByRowNumberAndColumnFamily(tableName,rowNumber,columnFamily);
}
@Test
public void getAllTableNames(){
hbaseRepository.getAllTableNames();
}
@Test
public void insertoneColumnRecords(){
String tableName = "david_topic:topictest";
String rowNumber = "105155";
String columnFamily = "info";
hbaseRepository.insertoneColumnRecords(tableName, rowNumber, columnFamily, "delete", "否");
}
@Test
public void deleteDataBycolumn(){
String tableName = "david_topic:topictest";
String rowNumber = "105155";
String columnFamily = "info";
String column = "delete";
hbaseRepository.deleteDataByColumn(tableName,rowNumber,columnFamily,column);
}
@Test
public void getTableDataCount(){
String tableName = "david_topic:topictest";
hbaseRepository.getTableDataCount(tableName,"log");
}
@Test
public void deleteDataByRowNumber(){
String tableName = "david_topic:topictest";
String rowNumber = "105155";
hbaseRepository.deleteDataByRowNumber(tableName, rowNumber);
}
@Test
public void selectTableAllDataMap(){
String tableName = "david_topic:topictest";
hbaseRepository.selectTableAllDataMap(tableName);
}
@Test
public void selectTableAllDataMapColumnFamily(){
String tableName = "david_topic:topictest";
String columnFamily = "log";
hbaseRepository.selectTableAllDataMap(tableName,columnFamily);
}
@Test
public void selectoneRowDataMap(){
String tableName = "david_topic:topictest";
String rowNumber = "105155";
hbaseRepository.selectoneRowDataMap(tableName,rowNumber);
}
@Test
public void selectColumnValue(){
String tableName = "david_topic:topictest";
String rowNumber = "105155";
String columnFamily = "info";
String column = "title";
hbaseRepository.selectColumnValue(tableName,rowNumber,columnFamily,column);
}
@Test
public void deleteDataByRowNumberAndColumnFamily(){
String tableName = "david_topic:topictest";
String columnFamily = "info";
hbaseRepository.deleteDataByColumnFamily(tableName,columnFamily);
}
@Test
public void selectTableDataByFilter(){
String tableName = "david_topic:topictest";
List queryParam = Arrays.asList("id,105155","dept,营销中心");
hbaseRepository.selectTableDataByFilter(tableName, "info", queryParam, ",",true);
}
@Test
public void selectColumnValueDataByFilter(){
String tableName = "david_topic:topictest";
List queryParam = Arrays.asList("topicFileId,6282");
hbaseRepository.selectColumnValueDataByFilter(tableName, "info", queryParam, ",","id",true);
}
@Test
public void selectTableAllDataMapAllPage(){
String tableName ="david_topic:topictest";
int pageSize = 3;
String key = null;
int dataCount = pageSize;
while (dataCount == pageSize) {
String mobileKey = null;
if (key != null) {
mobileKey = key;
}
List> result = hbaseRepository.selectTableAllDataMapPage(tableName, pageSize, mobileKey);
if(result != null && result.size() > 0 ){
dataCount = result.size();
key = result.get(dataCount-1).get("rowKey").toString();
System.out.println(key);
}else{
break;
}
}
}
@Test
public void selectTableAllDataMapPage(){
String tableName ="david_topic:topictest";
int pageSize = 3;
List> result = hbaseRepository.selectTableAllDataMapPage(tableName, pageSize, "32956703305764864");
System.out.println(result);
}
}
希望我文章能够对大家有所帮助,能够提高工作效率



