栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Java Api实现HBase的增删改查(工具类)

Java Api实现HBase的增删改查(工具类)

1.

获取hbase配置文件的工具类
package com.dgindusoft.connector.util;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;


@Component
@Order(value = 1)
public class GetPropertiesUtil {

    private static String quorum;

    @Value("${hbase.zookeeper.quorum}")
    public void setProQuorum(String quorumTemp) {
        quorum = quorumTemp;
    }

    public static String getQuorum() {
        return quorum;
    }
}

2.获取连接和关闭连接

package com.dgindusoft.connector.util;

import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import java.io.IOException;


@Slf4j
@Component
@Order(value = 2)
public class HbaseConn {

    private static final HbaseConn INSTANCE = new HbaseConn();
    private static Configuration configuration;
    private static Connection connection;

    private HbaseConn() {
        try {
            if (configuration == null) {
                configuration = HbaseConfiguration.create();//创建配置对象,获取hbase的连接
                configuration.set("hbase.zookeeper.quorum", GetPropertiesUtil.getQuorum());
                log.debug("=======================================");
                log.debug(GetPropertiesUtil.getQuorum());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    //获取数据库的连接
    private Connection getConnection() {
        if (connection == null || connection.isClosed()) {
            try {
                connection = ConnectionFactory.createConnection(configuration);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return connection;
    }

    public static Connection getHbaseConn() {
        return INSTANCE.getConnection();
    }

    public static Table getTable(String tableName) throws IOException {
        return INSTANCE.getConnection().getTable(TableName.valueOf(tableName));
    }

    public static void closeConn() {
        if (connection != null) {
            try {
                connection.close();
            } catch (IOException ioe) {
                ioe.printStackTrace();
            }
        }
    }
}

3.创建、删除表以及查看表是否存在、一次插入和批量插入

package com.dgindusoft.connector.util;

import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;


public class HbaseUtil {

    
    public static boolean createTable(String tableName, List cfs) {
        try (HbaseAdmin admin = (HbaseAdmin) HbaseConn.getHbaseConn().getAdmin()) {//获取操作对象admin
            if (admin.tableExists(TableName.valueOf(tableName))) {
                return false;
            }
            List familyDescriptors = new ArrayList<>(cfs.size());
            for (String column : cfs) {
                familyDescriptors.add(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(column)).build());
        }
        TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))
                .setColumnFamilies(familyDescriptors).build();
        admin.createTable(tableDescriptor);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return true;
    }

    
    public static boolean isTableExist(String tableName) {
        try (HbaseAdmin admin = (HbaseAdmin) HbaseConn.getHbaseConn().getAdmin()) {
            if (admin.tableExists(TableName.valueOf(tableName))) {
                return true;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }


    
    public static boolean deleteTable(String tableName) {
        try (HbaseAdmin admin = (HbaseAdmin) HbaseConn.getHbaseConn().getAdmin()) {
            admin.disableTable(TableName.valueOf(tableName));
            admin.deleteTable(TableName.valueOf(tableName));
        } catch (Exception e) {
            e.printStackTrace();
        }
        return true;
    }

    
    public static boolean putRow(String tableName, String rowKey, String cfName, String qualifier,
                                 String data) {
        try (Table table = HbaseConn.getTable(tableName)) {
            Put put = new Put(Bytes.toBytes(rowKey));//把字符串变成字节序列
            put.addColumn(Bytes.toBytes(cfName), Bytes.toBytes(qualifier), Bytes.toBytes(data));//增加列
            table.put(put);
        } catch (IOException ioe) {
            ioe.printStackTrace();
        }
        return true;
    }

    public static boolean putRows(String tableName, List puts) {
        try (Table table = HbaseConn.getTable(tableName)) {
            table.put(puts);
        } catch (IOException ioe) {
            ioe.printStackTrace();
        }
        return true;
    }
            }

4.hbase基本操作service层

package com.dgindusoft.connector.service;


import com.dgindusoft.connector.entity.Result;
import org.apache.hadoop.hbase.client.Put;

import java.util.List;
import java.util.Map;


public interface baseService {

    
    Result createTable(String tableName);

    
    Result insertData(String tableName, String deviceTime, String modelID, String deviceID, Map dataMap);

    
    Result insertData(String tableName, List puts);
    
    Result isTableExist(String tableName);
}

5.service层实现类

package com.dgindusoft.connector.service.impl;

import com.dgindusoft.connector.util.HbaseUtil;
import com.dgindusoft.connector.entity.Result;
import com.dgindusoft.connector.entity.ResultCodeEnum;
import com.dgindusoft.connector.service.baseService;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.stereotype.Service;


import java.util.ArrayList;
import java.util.List;
import java.util.Map;


@Service
public class baseServiceImpl implements baseService {

    @Override
    public Result createTable(String tableName) {
        List cfs = new ArrayList<>();
        cfs.add("data");
        if (HbaseUtil.createTable(tableName, cfs)) {
            return Result.ok().message("表创建成功");
        } else {
            return Result.setResult(ResultCodeEnum.TABLE_EXIST_ERROR);
        }
    }

    @Override
    public Result insertData(String tableName, String deviceTime, String modelID, String deviceID, Map dataMap) {
//        String rowKey = tableName + LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli();
        String rowKey = modelID + deviceID + deviceTime;
        List puts = new ArrayList<>();
        Put put = new Put(Bytes.toBytes(rowKey));
        dataMap.forEach((key, value) -> {
            put.addColumn(Bytes.toBytes("data"), Bytes.toBytes(key),
                    Bytes.toBytes("" + value));
            puts.add(put);
        });
        try {
            HbaseUtil.putRows(tableName, puts);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return Result.ok().message("插入成功");
    }



    @Override
    public Result insertData(String tableName, List puts) {
        try {
            HbaseUtil.putRows(tableName, puts);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return Result.ok().message("插入成功");
    }

    @Override
    public Result isTableExist(String tableName) {
        Boolean result = HbaseUtil.isTableExist(tableName);
        if (result) {
            return Result.setResult(ResultCodeEnum.TABLE_EXIST_ERROR);
        } else {
            return Result.ok().message("该表不存在");
        }
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/583958.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号