1.
获取hbase配置文件的工具类
package com.dgindusoft.connector.util;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
@Order(value = 1)
public class GetPropertiesUtil {
private static String quorum;
@Value("${hbase.zookeeper.quorum}")
public void setProQuorum(String quorumTemp) {
quorum = quorumTemp;
}
public static String getQuorum() {
return quorum;
}
}
2.获取连接和关闭连接
package com.dgindusoft.connector.util;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Slf4j
@Component
@Order(value = 2)
public class HbaseConn {
private static final HbaseConn INSTANCE = new HbaseConn();
private static Configuration configuration;
private static Connection connection;
private HbaseConn() {
try {
if (configuration == null) {
configuration = HbaseConfiguration.create();//创建配置对象,获取hbase的连接
configuration.set("hbase.zookeeper.quorum", GetPropertiesUtil.getQuorum());
log.debug("=======================================");
log.debug(GetPropertiesUtil.getQuorum());
}
} catch (Exception e) {
e.printStackTrace();
}
}
//获取数据库的连接
private Connection getConnection() {
if (connection == null || connection.isClosed()) {
try {
connection = ConnectionFactory.createConnection(configuration);
} catch (Exception e) {
e.printStackTrace();
}
}
return connection;
}
public static Connection getHbaseConn() {
return INSTANCE.getConnection();
}
public static Table getTable(String tableName) throws IOException {
return INSTANCE.getConnection().getTable(TableName.valueOf(tableName));
}
public static void closeConn() {
if (connection != null) {
try {
connection.close();
} catch (IOException ioe) {
ioe.printStackTrace();
}
}
}
}
3.创建、删除表以及查看表是否存在、一次插入和批量插入
package com.dgindusoft.connector.util;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class HbaseUtil {
public static boolean createTable(String tableName, List cfs) {
try (HbaseAdmin admin = (HbaseAdmin) HbaseConn.getHbaseConn().getAdmin()) {//获取操作对象admin
if (admin.tableExists(TableName.valueOf(tableName))) {
return false;
}
List familyDescriptors = new ArrayList<>(cfs.size());
for (String column : cfs) {
familyDescriptors.add(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(column)).build());
}
TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))
.setColumnFamilies(familyDescriptors).build();
admin.createTable(tableDescriptor);
} catch (Exception e) {
e.printStackTrace();
}
return true;
}
public static boolean isTableExist(String tableName) {
try (HbaseAdmin admin = (HbaseAdmin) HbaseConn.getHbaseConn().getAdmin()) {
if (admin.tableExists(TableName.valueOf(tableName))) {
return true;
}
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
public static boolean deleteTable(String tableName) {
try (HbaseAdmin admin = (HbaseAdmin) HbaseConn.getHbaseConn().getAdmin()) {
admin.disableTable(TableName.valueOf(tableName));
admin.deleteTable(TableName.valueOf(tableName));
} catch (Exception e) {
e.printStackTrace();
}
return true;
}
public static boolean putRow(String tableName, String rowKey, String cfName, String qualifier,
String data) {
try (Table table = HbaseConn.getTable(tableName)) {
Put put = new Put(Bytes.toBytes(rowKey));//把字符串变成字节序列
put.addColumn(Bytes.toBytes(cfName), Bytes.toBytes(qualifier), Bytes.toBytes(data));//增加列
table.put(put);
} catch (IOException ioe) {
ioe.printStackTrace();
}
return true;
}
public static boolean putRows(String tableName, List puts) {
try (Table table = HbaseConn.getTable(tableName)) {
table.put(puts);
} catch (IOException ioe) {
ioe.printStackTrace();
}
return true;
}
}
4.hbase基本操作service层
package com.dgindusoft.connector.service;
import com.dgindusoft.connector.entity.Result;
import org.apache.hadoop.hbase.client.Put;
import java.util.List;
import java.util.Map;
public interface baseService {
Result createTable(String tableName);
Result insertData(String tableName, String deviceTime, String modelID, String deviceID, Map dataMap);
Result insertData(String tableName, List puts);
Result isTableExist(String tableName);
}
5.service层实现类
package com.dgindusoft.connector.service.impl;
import com.dgindusoft.connector.util.HbaseUtil;
import com.dgindusoft.connector.entity.Result;
import com.dgindusoft.connector.entity.ResultCodeEnum;
import com.dgindusoft.connector.service.baseService;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@Service
public class baseServiceImpl implements baseService {
@Override
public Result createTable(String tableName) {
List cfs = new ArrayList<>();
cfs.add("data");
if (HbaseUtil.createTable(tableName, cfs)) {
return Result.ok().message("表创建成功");
} else {
return Result.setResult(ResultCodeEnum.TABLE_EXIST_ERROR);
}
}
@Override
public Result insertData(String tableName, String deviceTime, String modelID, String deviceID, Map dataMap) {
// String rowKey = tableName + LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli();
String rowKey = modelID + deviceID + deviceTime;
List puts = new ArrayList<>();
Put put = new Put(Bytes.toBytes(rowKey));
dataMap.forEach((key, value) -> {
put.addColumn(Bytes.toBytes("data"), Bytes.toBytes(key),
Bytes.toBytes("" + value));
puts.add(put);
});
try {
HbaseUtil.putRows(tableName, puts);
} catch (Exception e) {
e.printStackTrace();
}
return Result.ok().message("插入成功");
}
@Override
public Result insertData(String tableName, List puts) {
try {
HbaseUtil.putRows(tableName, puts);
} catch (Exception e) {
e.printStackTrace();
}
return Result.ok().message("插入成功");
}
@Override
public Result isTableExist(String tableName) {
Boolean result = HbaseUtil.isTableExist(tableName);
if (result) {
return Result.setResult(ResultCodeEnum.TABLE_EXIST_ERROR);
} else {
return Result.ok().message("该表不存在");
}
}
}



