栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java > SpringBoot

Spring Boot项目中使用HBase2.0.5 Java API操作HBase

SpringBoot 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Spring Boot项目中使用HBase2.0.5 Java API操作HBase

最近项目使用到hbase当做数据库存储数据,经过多番网上搜索并把通用方法进行整合,对hbase操作的功能写成了自己的通用方法,现在公布出来希望对大家有所帮助,后期有新的通用功能会实时更新的,通用方法不采用springboot集成的hbase,使用原生java API 操作hbase方法
开发环境
系统:windows
JDK:1.8
开发工具:Intellij IDEA
虚拟机:vmware
三台linux节点: 172.162.15.111、172.162.15.112、172.162.15.113
Linux:CentOS 7 迷你版 CentOS-7-x86_64-Minimal-1708.iso
Hadoop: Hadoop-3.1.2
Hbase : hbase-2.0.5
Zookeeper: zookeeper-3.4.1
第二步:配置windows系统host,根据您的需求,这一步可有可无
#虚拟机访问地址
172.162.15.111 davidone
172.162.15.112 davidtwo
172.162.15.113 davidthree
第三步:项目结构配置pom.xml


     org.springframework.boot
     spring-boot-configuration-processor
     true


     org.apache.hbase
  hbase-client
     2.0.5
     
  
      org.slf4j
      slf4j-log4j12
  
  
      log4j
      log4j
  
  
      javax.servlet
      servlet-api
  
     
  

application.yml 这个配置以增量的方式添加到文件

logging:
  config: classpath:Logback.xml
bigdata:
  config:
    hbaseConfigMap: {hbase.zookeeper.quorum : 'davidone:2181,davidtwo:2181,davidthree:2181'}

第四步:核心代码

@Component()
@ConfigurationProperties(prefix = "bigdata.config")
public class BigDataConfig {
	private Map hbaseConfigMap;
    public Map gethbaseConfigMap() {
 return hbaseConfigMap;
    }
    public void sethbaseConfigMap(Map hbaseConfigMap) {
 this.hbaseConfigMap = hbaseConfigMap;
    }
}

HbaseRepository.java 通用操作hbase数据代码,包含了通用的二级搜索,分页,多条件查询,要求对hbase shell 操作有一定的了解后,能够很好的了解这个通用方法
建议去看下这位博主写的文章:Hbase学习笔记
分段进行对HbaseRepository进行展示
4.1、对hbase进行初始化

	@Autowired
    private BigDataConfig bigDataConfig;

    private static Configuration configuration = HbaseConfiguration.create();
    //设置连接池
    private static ExecutorService executorServicePoolSize = Executors.newScheduledThreadPool(20);
    private ThreadLocal connectionThreadLocal = new ThreadLocal<>();
    private ThreadLocal adminThreadLocal = new ThreadLocal<>();
    private static Connection connection = null;
    private static Admin admin = null;
    private final byte[] POSTFIX = new byte[] { 0x00 };
    
    private void initConfigurationInfo(){
 Map hbaseConfigMap = bigDataConfig.gethbaseConfigMap();
 if(hbaseConfigMap.size() == 0){
     logger.debug(MessageFormat.format("Hbase配置信息初始化失败:{0}",JSON.toJSonString(hbaseConfigMap)));
 }else{
     for (Map.Entry confEntry : hbaseConfigMap.entrySet()) {
  configuration.set(confEntry.getKey(), confEntry.getValue());
     }
 }
    }
    
    private  void initHbaseClientAdmin(){
 try{
     admin = adminThreadLocal.get();
     if(admin == null && connection != null){
  admin = connection.getAdmin();
  adminThreadLocal.set(admin);
     }else{
  logger.debug(MessageFormat.format("创建hbase connection连接 失败:{0}",connection));
     }
 }catch (Exception e){
     logger.error(MessageFormat.format("初始化hbase client admin 客户端管理失败:错误信息:{0}",e));
 }
    }
    
    public void initRepository(){
 try{
     initConfigurationInfo();
     connection = connectionThreadLocal.get();
     if(connection == null){
  connection = ConnectionFactory.createConnection(configuration, executorServicePoolSize);
  connectionThreadLocal.set(connection);
     }
     initHbaseClientAdmin();
 }catch (Exception e){
     logger.error(MessageFormat.format("创建hbase connection 连接失败:{0}",e));
     e.printStackTrace();
 }finally {
     close(admin,null,null);
 }
    }

4.2、提取使用到的通用方法

    private Table getTable(String tableName) {
 try {
     return connection.getTable(TableName.valueOf(tableName));
 }catch (Exception e){
     e.printStackTrace();
     return  null;
 }
}

    private int createTable(String tableName,List columnFamily) throws Exception {
 if(admin.tableExists(TableName.valueOf(tableName))){
     logger.debug(MessageFormat.format("创建Hbase表名:{0} 在Hbase数据库中已经存在",tableName));
     return 2;
 }else{
     List familyDescriptors =new ArrayList<>(columnFamily.size());
     for(String column : columnFamily){
  familyDescriptors.add(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(column)).build());
     }
     TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))
      .setColumnFamilies(familyDescriptors).build();
     admin.createTable(tableDescriptor);
     logger.info(MessageFormat.format("创建表成功:表名:{0},列簇:{1}",tableName,JSON.toJSonString(columnFamily)));
     return 1;
 }
    }

    private FilterList handlePageFilterData(Scan scan, int pageSize,String lastRowKey){
 Filter pageFilter = new PageFilter(pageSize);
 FilterList pageFilterList = new FilterList();
 pageFilterList.addFilter(pageFilter);
 if(!StringUtils.isEmpty(lastRowKey)){
     byte[] startRow = Bytes.add(Bytes.toBytes(lastRowKey), POSTFIX);
     scan.setStartRow(startRow);
 }
 return pageFilterList;
    }

    private FilterList queryFilterData(String columnFamily,List queryParam,String regex,boolean bool){
 FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
 if(!bool){
     filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
 }
 for(String param: queryParam){
     String[] queryArray = param.split(regex);
     SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes(columnFamily), Bytes.toBytes(queryArray[0]), CompareOperator.EQUAL,Bytes.toBytes(queryArray[1]));
     singleColumnValueFilter.setFilterIfMissing(true);
     filterList.addFilter(singleColumnValueFilter);
 }
 return filterList;
    }


    private List> queryData(Table table,Scan scan){
 ResultScanner resultScanner =null;
 List> resultList = new ArrayList<>();
 try {
     resultScanner = table.getScanner(scan);
     for(Result result : resultScanner){
  logger.info(MessageFormat.format("查询每条Hbase数据的行号:{0}",Bytes.toString(result.getRow())));
  Map resultMap = new HashMap<>();
  resultMap.put("rowKey",Bytes.toString(result.getRow()));
  for(Cell cell :result.listCells()){
      resultMap.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()),
Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())
      );
  }
  resultList.add(resultMap);
     }
     logger.info(MessageFormat.format("查询指定表中数据信息:表名:{0},查询结果:{1}",Bytes.toString(table.getName().getName()),JSON.toJSonString(resultList)));
 } catch (Exception e) {
     e.printStackTrace();
     logger.debug(MessageFormat.format("查询指定表中数据信息:表名:{0},错误信息:{1}",Bytes.toString(table.getName().getName()),e.getMessage()));
 }finally {
     close(null,resultScanner,table);
 }
 return resultList;
    }


    private int queryDataCount(Table table,Scan scan){
 scan.setFilter(new FirstKeyonlyFilter());
 ResultScanner resultScanner = null;
 int rowCount = 0;
 try {
     resultScanner = table.getScanner(scan);
     for(Result result : resultScanner){
  rowCount += result.size();
     }
     logger.info(MessageFormat.format("统计全表数据总数:表名:{0},查询结果:{1}",Bytes.toString(table.getName().getName()),rowCount));
     return rowCount;
 } catch (Exception e) {
     e.printStackTrace();
     logger.debug(MessageFormat.format("查询指定表中数据信息:表名:{0},错误信息:{1}",Bytes.toString(table.getName().getName()),e.getMessage()));
     return rowCount;
 }finally {
     close(null,resultScanner,table);
 }
    }


    private void close(Admin admin, ResultScanner rs, Table table){
 if(admin != null){
     try {
  admin.close();
     } catch (IOException e) {
  logger.error("关闭Admin失败",e);
     }
 }

 if(rs != null){
     rs.close();
 }

 if(table != null){
     try {
  table.close();
     } catch (IOException e) {
  logger.error("关闭Table失败",e);
     }
 }
    }

4.3、根据个人需求可以同时创建单张或多张数据表

    public boolean createManyTable(Map> tableMap){
 try{
     if(admin != null){
  for (Map.Entry> confEntry : tableMap.entrySet()) {
      createTable(confEntry.getKey(), confEntry.getValue());
  }
     }
 }catch (Exception e){
     logger.error(MessageFormat.format("创建多个表出现未知错误:{0}",e.getMessage()));
     e.printStackTrace();
     return false;
 }finally {
     close(admin,null,null);
 }
 return true;
    }


    
    public int createoneTable (String tableName,String... columnFamily){
 try{
     //创建表,先查看表是否存在,然后在删除重新创建
     if(admin != null){
  return createTable(tableName,Arrays.asList(columnFamily));
     }else{
  logger.error("admin变量没有初始化成功");
  return 0;
     }
 }catch (Exception e){
     logger.debug(MessageFormat.format("创建表失败:{0},错误信息是:{1}",tableName,e.getMessage()));
     e.printStackTrace();
     return 0;
 }finally {
     close(admin,null,null);
 }
    }

4.4、同时添加或更新多列和单列

    public boolean insertManyColumnRecords(String tableName,String row,String columnFamily,List columns,List values){
 try{
     Table table = getTable(tableName);
     Put put = new Put(Bytes.toBytes(row));
     for(int i = 0; i < columns.size(); i++){
  put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columns.get(i)),Bytes.toBytes(values.get(i)));
  table.put(put);
     }
     logger.info(MessageFormat.format("添加单行单列族-多列多值数据成功:表名:{0},列名:{1},列值:{2}",tableName, JSON.toJSonString(columns),JSON.toJSonString(values)));
     return  true;
 }catch (Exception e){
     e.printStackTrace();
     logger.debug(MessageFormat.format("添加单行单列族-多列多值数据失败:表名:{0};错误信息是:{1}",tableName,e.getMessage()));
     return false;
 }
    }


    
    public boolean insertoneColumnRecords(String tableName,String row,String columnFamily,String column,String value){
 try{
     Table table = getTable(tableName);
     Put put = new Put(Bytes.toBytes(row));
     put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(column),Bytes.toBytes(value));
     table.put(put);
     logger.info(MessageFormat.format("添加单行单列族-单列单值数据成功:表名:{0},列名:{1},列值:{2}",tableName,column,value));
     return true;
 }catch (Exception e){
     logger.debug(MessageFormat.format("添加单行单列族-单列单值数据失败:表名:{0},错误信息是:{1}",tableName,e.getMessage()));
     e.printStackTrace();
     return false;
 }
    }

4.5、对hbase 做删除数据或是删除表的操作

    public boolean deleteDataByRowNumber(String tableName,String rowNumber){
try{
    Table table = getTable(tableName);
    Delete delete = new Delete(Bytes.toBytes(rowNumber));
    table.delete(delete);
    logger.info(MessageFormat.format("根据行号删除表中记录成功:表名:{0},行号:{1}",tableName,rowNumber));
    return true;
}catch (Exception e){
     e.printStackTrace();
    logger.debug(MessageFormat.format("根据行号删除表中记录失败:表名:{0},行号:{1}",tableName,rowNumber));
     return false;
}
    }


    
    public boolean deleteDataByColumnFamily(String tableName,String columnFamily){
 try{
     if(!admin.tableExists(TableName.valueOf(tableName))){
  logger.debug(MessageFormat.format("根据行号和列簇名称删除这行列簇相关的数据失败:表名不存在:{0}",tableName));
  return false;
     }
     admin.deleteColumnFamily(TableName.valueOf(tableName),Bytes.toBytes(columnFamily));
     logger.info(MessageFormat.format("删除该表中列簇下所有数据成功:表名:{0},列簇:{1}",tableName,columnFamily));
     return true;
 }catch (Exception e){
     e.printStackTrace();
     logger.debug(MessageFormat.format("删除该表中列簇下所有数据失败:表名:{0},列簇:{1},错误信息:{2}",tableName,columnFamily,e.getMessage()));
     return false;
 }
    }

    
    public  boolean deleteDataByColumn(String tableName,String rowNumber,String columnFamily,String cloumn){
 try{
     if(!admin.tableExists(TableName.valueOf(tableName))){
  logger.debug(MessageFormat.format("根据行号表名列簇删除指定列 ->删除最新列,保留旧列失败:表名不存在:{0}",tableName));
  return false;
     }
     Table table = getTable(tableName);
     Delete delete = new Delete(rowNumber.getBytes());
     delete.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(cloumn));
     table.delete(delete);
     logger.info(MessageFormat.format("根据行号表名列簇删除指定列 ->删除最新列,保留旧列成功:表名:{0},行号:{1},列簇:{2},列:{3}",tableName,rowNumber,columnFamily,cloumn));
     return true;
 }catch (Exception e){
     e.printStackTrace();
     logger.info(MessageFormat.format("根据行号表名列簇删除指定列 ->删除最新列,保留旧列失败:表名:{0},行号:{1},列簇:{2},列:{3},错误信息:{4}",tableName,rowNumber,columnFamily,cloumn,e.getMessage()));
     return false;
 }
    }

    
    public  boolean deleteDataByAllcolumn(String tableName,String rowNumber,String columnFamily,String cloumn){
 try{
     if(!admin.tableExists(TableName.valueOf(tableName))){
  logger.debug(MessageFormat.format("根据行号表名列簇删除指定列 ->新旧列都会删除失败:表名不存在:{0}",tableName));
  return false;
     }
     Table table = getTable(tableName);

     Delete delete = new Delete(rowNumber.getBytes());
     delete.addColumns(Bytes.toBytes(columnFamily),Bytes.toBytes(cloumn));
     table.delete(delete);
     logger.info(MessageFormat.format("根据行号表名列簇删除指定列 ->新旧列都会删除成功:表名:{0},行号:{1},列簇:{2},列:{3}",tableName,rowNumber,columnFamily,cloumn));
     return true;
 }catch (Exception e){
     e.printStackTrace();
     logger.error(MessageFormat.format("根据行号表名列簇删除指定列 ->新旧列都会删除失败:表名:{0},行号:{1},列簇:{2},列:{3},错误信息:{4}",tableName,rowNumber,columnFamily,cloumn,e.getMessage()));
     return false;
 }
    }

    
    public  boolean  deleteTable(String tableName){
 try{
     TableName table = TableName.valueOf(tableName);
     if(admin.tableExists(table)){
  //禁止使用表,然后删除表
  admin.disableTable(table);
  admin.deleteTable(table);
     }
     logger.info(MessageFormat.format("删除表成功:{0}",tableName));
     return true;
 }catch (Exception e){
     e.printStackTrace();
     logger.debug(MessageFormat.format("删除表失败:{0},错误信息是:{1}",tableName,e.getMessage()));
     return false;
 }finally {
     close(admin,null,null);
 }
    }

4.6、对hbase查询操作 查询条件包含着and 或or 多条件查询,但是没有模糊查询

    public List getAllTableNames(){
 List resultList = new ArrayList<>();
 try {
     TableName[] tableNames = admin.listTableNames();
     for(TableName tableName : tableNames){
  resultList.add(tableName.getNameAsString());
     }
     logger.info(MessageFormat.format("查询库中所有表的表名成功:{0}",JSON.toJSonString(resultList)));
 }catch (IOException e) {
     logger.error("获取所有表的表名失败",e);
 }finally {
     close(admin,null,null);
 }
 return resultList;
    }


    
    public Map selectoneRowDataMap(String tableName,String rowNumber){
 Map resultMap = new HashMap<>();
 Get get = new Get(Bytes.toBytes(rowNumber));
 Table table = getTable(tableName);
 try{
     Result result = table.get(get);
     if(result !=null && !result.isEmpty()){
  for(Cell cell : result.listCells()){
      resultMap.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()),
Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())
      );
  }
     }
     logger.info(MessageFormat.format("根据表名和行号查询数据:表名:{0},行号:{1},查询结果:{2}",tableName,rowNumber,JSON.toJSonString(resultMap)));
 }catch (Exception e){
     e.printStackTrace();
     logger.debug(MessageFormat.format("根据表名和行号查询数据失败:表名:{0},行号:{1},错误信息:{2}",tableName,rowNumber,e.getMessage()));
 }finally {
     close(null,null,table);
 }
 return resultMap;
    }


    
    public List> selectTableDataByFilter(String tableName,String columnFamily,List queryParam,String regex,boolean bool){
 Scan scan = new Scan();
 Table table = getTable(tableName);
 FilterList filterList = queryFilterData(columnFamily, queryParam, regex, bool);
 scan.setFilter(filterList);
 return queryData(table,scan);
}

    public List> selectColumnValueDataByFilter(String tableName,String columnFamily,List queryParam,String regex,String column,boolean bool){
 Scan scan = new Scan();
 FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
 if(!bool){
     filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
 }
 Table table = getTable(tableName);
 for(String param: queryParam){
     String[] queryArray = param.split(regex);
     SingleColumnValueExcludeFilter singleColumnValueExcludeFilter = new SingleColumnValueExcludeFilter(Bytes.toBytes(columnFamily), Bytes.toBytes(queryArray[0]), CompareOperator.EQUAL,Bytes.toBytes(queryArray[1]));
     filterList.addFilter(singleColumnValueExcludeFilter);
     scan.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(queryArray[0]));
 }
 scan.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(column));
 scan.setFilter(filterList);
 return queryData(table,scan);

    }

    
    public List> selectTableAllDataMap(String tableName){
 Table table = getTable(tableName);
 Scan scan = new Scan();
 return queryData(table,scan);
    }

    public List> selectTableAllDataMap(String tableName,String columnFamily){
 Table table = getTable(tableName);
 Scan scan = new Scan();
 scan.addFamily(Bytes.toBytes(columnFamily));
 return queryData(table,scan);
    }

    
    public Map selectTableByRowNumberAndColumnFamily(String tableName,String rowNumber,String columnFamily){
 ResultScanner resultScanner = null;
 Map resultMap = new HashMap<>();
 Table table = getTable(tableName);
 try {
     Get get = new Get(Bytes.toBytes(rowNumber));
     get.addFamily(Bytes.toBytes(columnFamily));
     Result result = table.get(get);
     for(Cell cell :result.listCells()){
  resultMap.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()),
   Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())
  );
     }
     logger.info(MessageFormat.format("根据行号和列簇查询表中数据信息:表名:{0},行号:{1},列簇:{2},查询结果:{3}",tableName,rowNumber,columnFamily,JSON.toJSonString(resultMap)));
 } catch (IOException e) {
     e.printStackTrace();
     logger.info(MessageFormat.format("根据行号和列簇查询表中数据信息:表名:{0},行号:{1},列簇:{2},错误信息:{3}",tableName,rowNumber,columnFamily,e.getMessage()));
 }finally {
     close(null,resultScanner,table);
 }
 return resultMap;
    }

    
    public String selectColumnValue(String tableName,String rowNumber,String columnFamily,String column){
 Table table = getTable(tableName);
 try {
     Get get = new Get(Bytes.toBytes(rowNumber));
     get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
     Result result = table.get(get);
     logger.info(MessageFormat.format("根据表名、行号、列簇、列查询指定列的值:表名:{0},行号:{1},列簇:{2},列名:{3},查询结果:{4}",tableName,rowNumber,columnFamily,column,Bytes.toString(result.value())));
     return Bytes.toString(result.value());
 } catch (IOException e) {
     e.printStackTrace();
     logger.info(MessageFormat.format("根据表名、行号、列簇、列查询指定列的值:表名:{0},行号:{1},列簇:{2},列名:{3},错误信息:{4}",tableName,rowNumber,columnFamily,column,e.getMessage()));
     return "";
 }finally {
     close(null,null,table);
 }
    }

4.7、对hbase分页查询

    public List> selectTableDataByFilterPage(String tableName,String columnFamily,List queryParam,String regex,boolean bool,int pageSize,String lastRow){
 Scan scan = new Scan();
 Table table = getTable(tableName);
 FilterList filterList = queryFilterData(columnFamily, queryParam, regex, bool);
 FilterList pageFilterList = handlePageFilterData(scan, pageSize, lastRow);
 pageFilterList.addFilter(filterList);
 scan.setFilter(pageFilterList);
 return queryData(table,scan);
}

    public List> selectTableAllDataMapPage(String tableName,int pageSize,String lastRow){
 Scan scan = new Scan();
 Table table = getTable(tableName);
 FilterList pageFilterList = handlePageFilterData(scan, pageSize, lastRow);
 scan.setFilter(pageFilterList);
 return queryData(table,scan);
    }

下面是完整的HbaseRepository

@Component
public class HbaseRepository{
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    @Autowired
    private BigDataConfig bigDataConfig;
    private static Configuration configuration = HbaseConfiguration.create();
    //设置连接池
    private static ExecutorService executorServicePoolSize = Executors.newScheduledThreadPool(20);
    private ThreadLocal connectionThreadLocal = new ThreadLocal<>();
    private ThreadLocal adminThreadLocal = new ThreadLocal<>();
    private static Connection connection = null;
    private static Admin admin = null;
    private final byte[] POSTFIX = new byte[] { 0x00 };
    
    private void initConfigurationInfo(){
 Map hbaseConfigMap = bigDataConfig.gethbaseConfigMap();
 if(hbaseConfigMap.size() == 0){
     logger.debug(MessageFormat.format("Hbase配置信息初始化失败:{0}",JSON.toJSonString(hbaseConfigMap)));
 }else{
     for (Map.Entry confEntry : hbaseConfigMap.entrySet()) {
  configuration.set(confEntry.getKey(), confEntry.getValue());
     }
 }
    }
    
    private  void initHbaseClientAdmin(){
 try{
     admin = adminThreadLocal.get();
     if(admin == null && connection != null){
  admin = connection.getAdmin();
  adminThreadLocal.set(admin);
     }else{
  logger.debug(MessageFormat.format("创建hbase connection连接 失败:{0}",connection));
     }
 }catch (Exception e){
     logger.error(MessageFormat.format("初始化hbase client admin 客户端管理失败:错误信息:{0}",e));
 }
    }
    
    public void initRepository(){
 try{
     initConfigurationInfo();
     connection = connectionThreadLocal.get();
     if(connection == null){
  connection = ConnectionFactory.createConnection(configuration, executorServicePoolSize);
  connectionThreadLocal.set(connection);
     }
     initHbaseClientAdmin();
 }catch (Exception e){
     logger.error(MessageFormat.format("创建hbase connection 连接失败:{0}",e));
     e.printStackTrace();
 }finally {
     close(admin,null,null);
 }

    }
    
    private Table getTable(String tableName) {
 try {
     return connection.getTable(TableName.valueOf(tableName));
 }catch (Exception e){
     e.printStackTrace();
     return  null;
 }
    }
    
    public boolean createManyTable(Map> tableMap){
 try{
     if(admin != null){
  for (Map.Entry> confEntry : tableMap.entrySet()) {
      createTable(confEntry.getKey(), confEntry.getValue());
  }
     }
 }catch (Exception e){
     logger.error(MessageFormat.format("创建多个表出现未知错误:{0}",e.getMessage()));
     e.printStackTrace();
     return false;
 }finally {
     close(admin,null,null);
 }
 return true;
    }
    
    public int createoneTable (String tableName,String... columnFamily){
 try{
     //创建表,先查看表是否存在,然后在删除重新创建
     if(admin != null){
  return createTable(tableName,Arrays.asList(columnFamily));
     }else{
  logger.error("admin变量没有初始化成功");
  return 0;
     }
 }catch (Exception e){
     logger.debug(MessageFormat.format("创建表失败:{0},错误信息是:{1}",tableName,e.getMessage()));
     e.printStackTrace();
     return 0;
 }finally {
     close(admin,null,null);
 }
    }
    
    private int createTable(String tableName,List columnFamily) throws Exception {
 if(admin.tableExists(TableName.valueOf(tableName))){
     logger.debug(MessageFormat.format("创建Hbase表名:{0} 在Hbase数据库中已经存在",tableName));
     return 2;
 }else{
     List familyDescriptors =new ArrayList<>(columnFamily.size());
     for(String column : columnFamily){
  familyDescriptors.add(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(column)).build());
     }
     TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))
      .setColumnFamilies(familyDescriptors).build();
     admin.createTable(tableDescriptor);
     logger.info(MessageFormat.format("创建表成功:表名:{0},列簇:{1}",tableName,JSON.toJSonString(columnFamily)));
     return 1;
 }
    }
    
    public boolean insertManyColumnRecords(String tableName,String row,String columnFamily,List columns,List values){
 try{
     Table table = getTable(tableName);
     Put put = new Put(Bytes.toBytes(row));
     for(int i = 0; i < columns.size(); i++){
  put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columns.get(i)),Bytes.toBytes(values.get(i)));
  table.put(put);
     }
     logger.info(MessageFormat.format("添加单行单列族-多列多值数据成功:表名:{0},列名:{1},列值:{2}",tableName, JSON.toJSonString(columns),JSON.toJSonString(values)));
     return  true;
 }catch (Exception e){
     e.printStackTrace();
     logger.debug(MessageFormat.format("添加单行单列族-多列多值数据失败:表名:{0};错误信息是:{1}",tableName,e.getMessage()));
     return false;
 }
    }
    
    public boolean insertoneColumnRecords(String tableName,String row,String columnFamily,String column,String value){
 try{
     Table table = getTable(tableName);
     Put put = new Put(Bytes.toBytes(row));
     put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(column),Bytes.toBytes(value));
     table.put(put);
     logger.info(MessageFormat.format("添加单行单列族-单列单值数据成功:表名:{0},列名:{1},列值:{2}",tableName,column,value));
     return true;
 }catch (Exception e){
     logger.debug(MessageFormat.format("添加单行单列族-单列单值数据失败:表名:{0},错误信息是:{1}",tableName,e.getMessage()));
     e.printStackTrace();
     return false;
 }
    }
    
    public boolean deleteDataByRowNumber(String tableName,String rowNumber){
try{
    Table table = getTable(tableName);
    Delete delete = new Delete(Bytes.toBytes(rowNumber));
    table.delete(delete);
    logger.info(MessageFormat.format("根据行号删除表中记录成功:表名:{0},行号:{1}",tableName,rowNumber));
    return true;
}catch (Exception e){
     e.printStackTrace();
    logger.debug(MessageFormat.format("根据行号删除表中记录失败:表名:{0},行号:{1}",tableName,rowNumber));
     return false;
}
    }
    
    public boolean deleteDataByColumnFamily(String tableName,String columnFamily){
 try{
     if(!admin.tableExists(TableName.valueOf(tableName))){
  logger.debug(MessageFormat.format("根据行号和列簇名称删除这行列簇相关的数据失败:表名不存在:{0}",tableName));
  return false;
     }
     admin.deleteColumnFamily(TableName.valueOf(tableName),Bytes.toBytes(columnFamily));
     logger.info(MessageFormat.format("删除该表中列簇下所有数据成功:表名:{0},列簇:{1}",tableName,columnFamily));
     return true;
 }catch (Exception e){
     e.printStackTrace();
     logger.debug(MessageFormat.format("删除该表中列簇下所有数据失败:表名:{0},列簇:{1},错误信息:{2}",tableName,columnFamily,e.getMessage()));
     return false;
 }
    }
    
    public  boolean deleteDataByColumn(String tableName,String rowNumber,String columnFamily,String cloumn){
 try{
     if(!admin.tableExists(TableName.valueOf(tableName))){
  logger.debug(MessageFormat.format("根据行号表名列簇删除指定列 ->删除最新列,保留旧列失败:表名不存在:{0}",tableName));
  return false;
     }
     Table table = getTable(tableName);
     Delete delete = new Delete(rowNumber.getBytes());
     delete.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(cloumn));
     table.delete(delete);
     logger.info(MessageFormat.format("根据行号表名列簇删除指定列 ->删除最新列,保留旧列成功:表名:{0},行号:{1},列簇:{2},列:{3}",tableName,rowNumber,columnFamily,cloumn));
     return true;
 }catch (Exception e){
     e.printStackTrace();
     logger.info(MessageFormat.format("根据行号表名列簇删除指定列 ->删除最新列,保留旧列失败:表名:{0},行号:{1},列簇:{2},列:{3},错误信息:{4}",tableName,rowNumber,columnFamily,cloumn,e.getMessage()));
     return false;
 }
    }
    
    public  boolean deleteDataByAllcolumn(String tableName,String rowNumber,String columnFamily,String cloumn){
 try{
     if(!admin.tableExists(TableName.valueOf(tableName))){
  logger.debug(MessageFormat.format("根据行号表名列簇删除指定列 ->新旧列都会删除失败:表名不存在:{0}",tableName));
  return false;
     }
     Table table = getTable(tableName);

     Delete delete = new Delete(rowNumber.getBytes());
     delete.addColumns(Bytes.toBytes(columnFamily),Bytes.toBytes(cloumn));
     table.delete(delete);
     logger.info(MessageFormat.format("根据行号表名列簇删除指定列 ->新旧列都会删除成功:表名:{0},行号:{1},列簇:{2},列:{3}",tableName,rowNumber,columnFamily,cloumn));
     return true;
 }catch (Exception e){
     e.printStackTrace();
     logger.error(MessageFormat.format("根据行号表名列簇删除指定列 ->新旧列都会删除失败:表名:{0},行号:{1},列簇:{2},列:{3},错误信息:{4}",tableName,rowNumber,columnFamily,cloumn,e.getMessage()));
     return false;
 }
    }
    
    public  boolean  deleteTable(String tableName){
 try{
     TableName table = TableName.valueOf(tableName);
     if(admin.tableExists(table)){
  //禁止使用表,然后删除表
  admin.disableTable(table);
  admin.deleteTable(table);
     }
     logger.info(MessageFormat.format("删除表成功:{0}",tableName));
     return true;
 }catch (Exception e){
     e.printStackTrace();
     logger.debug(MessageFormat.format("删除表失败:{0},错误信息是:{1}",tableName,e.getMessage()));
     return false;
 }finally {
     close(admin,null,null);
 }
    }

    
    public List getAllTableNames(){
 List resultList = new ArrayList<>();
 try {
     TableName[] tableNames = admin.listTableNames();
     for(TableName tableName : tableNames){
  resultList.add(tableName.getNameAsString());
     }
     logger.info(MessageFormat.format("查询库中所有表的表名成功:{0}",JSON.toJSonString(resultList)));
 }catch (IOException e) {
     logger.error("获取所有表的表名失败",e);
 }finally {
     close(admin,null,null);
 }
 return resultList;
    }

    
    public Map selectoneRowDataMap(String tableName,String rowNumber){
 Map resultMap = new HashMap<>();
 Get get = new Get(Bytes.toBytes(rowNumber));
 Table table = getTable(tableName);
 try{
     Result result = table.get(get);
     if(result !=null && !result.isEmpty()){
  for(Cell cell : result.listCells()){
      resultMap.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()),
Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())
      );
  }
     }
     logger.info(MessageFormat.format("根据表名和行号查询数据:表名:{0},行号:{1},查询结果:{2}",tableName,rowNumber,JSON.toJSonString(resultMap)));
 }catch (Exception e){
     e.printStackTrace();
     logger.debug(MessageFormat.format("根据表名和行号查询数据失败:表名:{0},行号:{1},错误信息:{2}",tableName,rowNumber,e.getMessage()));
 }finally {
     close(null,null,table);
 }
 return resultMap;
    }

    
    public List> selectTableDataByFilter(String tableName,String columnFamily,List queryParam,String regex,boolean bool){
 Scan scan = new Scan();
 Table table = getTable(tableName);
 FilterList filterList = queryFilterData(columnFamily, queryParam, regex, bool);
 scan.setFilter(filterList);
 return queryData(table,scan);
    }

    
    public List> selectTableDataByFilterPage(String tableName,String columnFamily,List queryParam,String regex,boolean bool,int pageSize,String lastRow){
 Scan scan = new Scan();
 Table table = getTable(tableName);
 FilterList filterList = queryFilterData(columnFamily, queryParam, regex, bool);
 FilterList pageFilterList = handlePageFilterData(scan, pageSize, lastRow);
 pageFilterList.addFilter(filterList);
 scan.setFilter(pageFilterList);
 return queryData(table,scan);
    }

    
    private FilterList handlePageFilterData(Scan scan, int pageSize,String lastRowKey){
 Filter pageFilter = new PageFilter(pageSize);
 FilterList pageFilterList = new FilterList();
 pageFilterList.addFilter(pageFilter);
 if(!StringUtils.isEmpty(lastRowKey)){
     byte[] startRow = Bytes.add(Bytes.toBytes(lastRowKey), POSTFIX);
     scan.setStartRow(startRow);
 }
 return pageFilterList;
    }
    
    private FilterList queryFilterData(String columnFamily,List queryParam,String regex,boolean bool){
 FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
 if(!bool){
     filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
 }
 for(String param: queryParam){
     String[] queryArray = param.split(regex);
     SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes(columnFamily), Bytes.toBytes(queryArray[0]), CompareOperator.EQUAL,Bytes.toBytes(queryArray[1]));
     singleColumnValueFilter.setFilterIfMissing(true);
     filterList.addFilter(singleColumnValueFilter);
 }
 return filterList;
    }
    
    public List> selectColumnValueDataByFilter(String tableName,String columnFamily,List queryParam,String regex,String column,boolean bool){
 Scan scan = new Scan();
 FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
 if(!bool){
     filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
 }
 Table table = getTable(tableName);
 for(String param: queryParam){
     String[] queryArray = param.split(regex);
     SingleColumnValueExcludeFilter singleColumnValueExcludeFilter = new SingleColumnValueExcludeFilter(Bytes.toBytes(columnFamily), Bytes.toBytes(queryArray[0]), CompareOperator.EQUAL,Bytes.toBytes(queryArray[1]));
     filterList.addFilter(singleColumnValueExcludeFilter);
     scan.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(queryArray[0]));
 }
 scan.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(column));
 scan.setFilter(filterList);
 return queryData(table,scan);

    }

    
    public List> selectTableAllDataMap(String tableName){
 Table table = getTable(tableName);
 Scan scan = new Scan();
 return queryData(table,scan);
    }

    
    public List> selectTableAllDataMapPage(String tableName,int pageSize,String lastRow){
 Scan scan = new Scan();
 Table table = getTable(tableName);
 FilterList pageFilterList = handlePageFilterData(scan, pageSize, lastRow);
 scan.setFilter(pageFilterList);
 return queryData(table,scan);
    }


    
    public List> selectTableAllDataMap(String tableName,String columnFamily){
 Table table = getTable(tableName);
 Scan scan = new Scan();
 scan.addFamily(Bytes.toBytes(columnFamily));
 return queryData(table,scan);
    }

    
    public Map selectTableByRowNumberAndColumnFamily(String tableName,String rowNumber,String columnFamily){
 ResultScanner resultScanner = null;
 Map resultMap = new HashMap<>();
 Table table = getTable(tableName);
 try {
     Get get = new Get(Bytes.toBytes(rowNumber));
     get.addFamily(Bytes.toBytes(columnFamily));
     Result result = table.get(get);
     for(Cell cell :result.listCells()){
  resultMap.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()),
   Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())
  );
     }
     logger.info(MessageFormat.format("根据行号和列簇查询表中数据信息:表名:{0},行号:{1},列簇:{2},查询结果:{3}",tableName,rowNumber,columnFamily,JSON.toJSonString(resultMap)));
 } catch (IOException e) {
     e.printStackTrace();
     logger.info(MessageFormat.format("根据行号和列簇查询表中数据信息:表名:{0},行号:{1},列簇:{2},错误信息:{3}",tableName,rowNumber,columnFamily,e.getMessage()));
 }finally {
     close(null,resultScanner,table);
 }
 return resultMap;
    }

    
    public String selectColumnValue(String tableName,String rowNumber,String columnFamily,String column){
 Table table = getTable(tableName);
 try {
     Get get = new Get(Bytes.toBytes(rowNumber));
     get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
     Result result = table.get(get);
     logger.info(MessageFormat.format("根据表名、行号、列簇、列查询指定列的值:表名:{0},行号:{1},列簇:{2},列名:{3},查询结果:{4}",tableName,rowNumber,columnFamily,column,Bytes.toString(result.value())));
     return Bytes.toString(result.value());
 } catch (IOException e) {
     e.printStackTrace();
     logger.info(MessageFormat.format("根据表名、行号、列簇、列查询指定列的值:表名:{0},行号:{1},列簇:{2},列名:{3},错误信息:{4}",tableName,rowNumber,columnFamily,column,e.getMessage()));
     return "";
 }finally {
     close(null,null,table);
 }
    }

    private List> queryData(Table table,Scan scan){
 ResultScanner resultScanner =null;
 List> resultList = new ArrayList<>();
 try {
     resultScanner = table.getScanner(scan);
     for(Result result : resultScanner){
  logger.info(MessageFormat.format("查询每条Hbase数据的行号:{0}",Bytes.toString(result.getRow())));
  Map resultMap = new HashMap<>();
  resultMap.put("rowKey",Bytes.toString(result.getRow()));
  for(Cell cell :result.listCells()){
      resultMap.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()),
Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())
      );
  }
  resultList.add(resultMap);
     }
     logger.info(MessageFormat.format("查询指定表中数据信息:表名:{0},查询结果:{1}",Bytes.toString(table.getName().getName()),JSON.toJSonString(resultList)));
 } catch (Exception e) {
     e.printStackTrace();
     logger.debug(MessageFormat.format("查询指定表中数据信息:表名:{0},错误信息:{1}",Bytes.toString(table.getName().getName()),e.getMessage()));
 }finally {
     close(null,resultScanner,table);
 }
 return resultList;
    }
    
    public int getTableDataCount(String tableName){
 Table table = getTable(tableName);
 Scan scan = new Scan();
 return queryDataCount(table,scan);

    }

    
    public int getTableDataCount(String tableName,String columnFamily){
 Table table = getTable(tableName);
 Scan scan = new Scan();
 scan.addFamily(Bytes.toBytes(columnFamily));
 return queryDataCount(table,scan);
    }
    private int queryDataCount(Table table,Scan scan){
 scan.setFilter(new FirstKeyonlyFilter());
 ResultScanner resultScanner = null;
 int rowCount = 0;
 try {
     resultScanner = table.getScanner(scan);
     for(Result result : resultScanner){
  rowCount += result.size();
     }
     logger.info(MessageFormat.format("统计全表数据总数:表名:{0},查询结果:{1}",Bytes.toString(table.getName().getName()),rowCount));
     return rowCount;
 } catch (Exception e) {
     e.printStackTrace();
     logger.debug(MessageFormat.format("查询指定表中数据信息:表名:{0},错误信息:{1}",Bytes.toString(table.getName().getName()),e.getMessage()));
     return rowCount;
 }finally {
     close(null,resultScanner,table);
 }
    }
    
    private void close(Admin admin, ResultScanner rs, Table table){
 if(admin != null){
     try {
  admin.close();
     } catch (IOException e) {
  logger.error("关闭Admin失败",e);
     }
 }
 if(rs != null){
     rs.close();
 }
 if(table != null){
     try {
  table.close();
     } catch (IOException e) {
  logger.error("关闭Table失败",e);
     }
 }
    }
}

针对HbaseRepository通用方法的测试用例

@RunWith(SpringRunner.class)
@SpringBootTest
public class HbaseUtilsTest {

    @Autowired
    private HbaseRepository hbaseRepository;
	
    @Test
    public void initTable(){
 hbaseRepository.initRepository();
 Map> tableMap = new HashMap<>();
 tableMap.put("david_topic:actionFlow",Arrays.asList("info","logs"));
 tableMap.put("david_topic:recording",Arrays.asList("info","logs"));
 tableMap.put("david_topic:syncLog",Arrays.asList("info","logs"));
 tableMap.put("david_topic:uploadVersion",Arrays.asList("info","logs"));
 hbaseRepository.createManyTable(tableMap);
    }

    @Test
    public void createoneTable(){
 //在指定命名空间创建表
 hbaseRepository.createoneTable("david_topic:topictest","info","log");
    }
    @Test
    public void deleteTable(){
 hbaseRepository.deleteTable("david_topic:topictest");
    }
    @Test
    public void insertManyColumnRecords(){
 String tableName = "david_topic:topictest";
 String rowNumber  = "105155";
 String columnFamily = "info";
 List columns = Arrays.asList("id", "title", "poform", "dept", "level", "createUser");
 List values = Arrays.asList("105155", "瑞风S4品牌临时页面", "Mobile", "营销中心", "普通", "王丽");
 hbaseRepository.insertManyColumnRecords(tableName, rowNumber, columnFamily, columns, values);
    }
    @Test
    public  void selectTableByRowNumberAndColumnFamily(){
 String tableName = "david_topic:topictest";
 String rowNumber  = "105155";
 String columnFamily = "info";
 hbaseRepository.selectTableByRowNumberAndColumnFamily(tableName,rowNumber,columnFamily);
    }

    @Test
    public void getAllTableNames(){
 hbaseRepository.getAllTableNames();
    }

    @Test
    public void insertoneColumnRecords(){
 String tableName = "david_topic:topictest";
 String rowNumber  = "105155";
 String columnFamily = "info";
 hbaseRepository.insertoneColumnRecords(tableName, rowNumber, columnFamily, "delete", "否");
    }
    @Test
    public void deleteDataBycolumn(){
 String tableName = "david_topic:topictest";
 String rowNumber  = "105155";
 String columnFamily = "info";
 String column = "delete";
 hbaseRepository.deleteDataByColumn(tableName,rowNumber,columnFamily,column);
    }

    @Test
    public void getTableDataCount(){
 String tableName = "david_topic:topictest";
 hbaseRepository.getTableDataCount(tableName,"log");
    }

    @Test
    public void deleteDataByRowNumber(){
 String tableName = "david_topic:topictest";
 String rowNumber  = "105155";
 hbaseRepository.deleteDataByRowNumber(tableName, rowNumber);
    }


    @Test
    public void selectTableAllDataMap(){
 String tableName = "david_topic:topictest";
 hbaseRepository.selectTableAllDataMap(tableName);
    }

    @Test
    public void selectTableAllDataMapColumnFamily(){
 String tableName = "david_topic:topictest";
 String columnFamily = "log";
 hbaseRepository.selectTableAllDataMap(tableName,columnFamily);
    }

    @Test
    public void selectoneRowDataMap(){
 String tableName = "david_topic:topictest";
 String rowNumber  = "105155";
 hbaseRepository.selectoneRowDataMap(tableName,rowNumber);
    }

    @Test
    public void selectColumnValue(){
 String tableName = "david_topic:topictest";
 String rowNumber  = "105155";
 String columnFamily = "info";
 String column = "title";
 hbaseRepository.selectColumnValue(tableName,rowNumber,columnFamily,column);
    }


    @Test
    public void deleteDataByRowNumberAndColumnFamily(){
 String tableName = "david_topic:topictest";
 String columnFamily = "info";
 hbaseRepository.deleteDataByColumnFamily(tableName,columnFamily);
    }

    @Test
    public void selectTableDataByFilter(){
 String tableName = "david_topic:topictest";
 List queryParam = Arrays.asList("id,105155","dept,营销中心");
 hbaseRepository.selectTableDataByFilter(tableName, "info", queryParam, ",",true);
    }

    @Test
    public void selectColumnValueDataByFilter(){
 String tableName = "david_topic:topictest";
 List queryParam = Arrays.asList("topicFileId,6282");
 hbaseRepository.selectColumnValueDataByFilter(tableName, "info", queryParam, ",","id",true);
    }

    
    @Test
    public void selectTableAllDataMapAllPage(){
 String tableName ="david_topic:topictest";
 int pageSize = 3;
 String key = null;
 int dataCount = pageSize;
 while (dataCount == pageSize) {
     String mobileKey = null;
     if (key != null) {
  mobileKey = key;
     }
     List> result = hbaseRepository.selectTableAllDataMapPage(tableName, pageSize, mobileKey);
     if(result != null && result.size() > 0 ){
  dataCount = result.size();
  key = result.get(dataCount-1).get("rowKey").toString();
  System.out.println(key);
     }else{
  break;
     }
 }
    }

    
    @Test
    public void selectTableAllDataMapPage(){
 String tableName ="david_topic:topictest";
 int pageSize = 3;
 List> result = hbaseRepository.selectTableAllDataMapPage(tableName, pageSize, "32956703305764864");
 System.out.println(result);
    }

}

希望我文章能够对大家有所帮助,能够提高工作效率

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/234371.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号