1、在构建实时数仓的时候,通常会把dim层的数据存入hbase,这样做的好处一个是利用hbase的幂等性的功能,维度表基本上都会有数据唯一性,第二个在实时性要求的场景下,可以做数据点查关联,效率上得到一定的保障。
部分sink代码如下:
private transient Connection hbaseConnection;
private transient Connection hbaseConnection;
private Admin hbaseAdmin;
private Table dribaseInfoResTable;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
org.apache.hadoop.conf.Configuration configuration = HbaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", "192.168.11.1,192.168.11.2,192.168.11.3");
hbaseConnection = ConnectionFactory.createConnection(configuration);
hbaseAdmin = hbaseConnection.getAdmin();
if (!hbaseAdmin.tableExists(TableName.valueOf("test:dim_dri_base_info"))) {
log.error("hbase table not exists: {}", "test:dim_dri_base_info");
}
}
@Override
public Tuple6 map(UpOrderInfo value) throws Exception {
Tuple6 tuple6 = new Tuple6();
Integer idCardTaurus = -100;
Integer vehTaurus = -100;
String time = value.getUpDataTime();
byte[] opDriverUuid = value.getDriverUuid().concat("_op").getBytes();
try {
Get getResOp = new Get(opDriverUuid);
getResOp.addFamily("CF1".getBytes());
Result resultResOp = driResOPTable.get(getResOp);
List columnResOpCells = resultResOp.getColumnCells("CF1".getBytes(), "res_op".getBytes());
String resUuid = "";
for (Cell cell : columnResOpCells) {
resUuid = Bytes.toString(CellUtil.clonevalue(cell));
}
Get driTaurus = new Get(resUuid.getBytes());
driTaurus.addFamily("CF1".getBytes());
Result resultbaseInfo = dribaseInfoResTable.get(driTaurus);
List columnTaurusStatusCells = resultbaseInfo.getColumnCells("CF1".getBytes(), "driver_taurus_status".getBytes());
for (Cell cell : columnTaurusStatusCells) {
value.setIdCardTaurusStatus(Integer.parseInt(Bytes.toString(CellUtil.clonevalue(cell))));
}
Get getVehNo = new Get(Utils.reverse(value.getVehicleNo()).getBytes());
getVehNo.addFamily("CF1".getBytes());
Result resultVehNo = vehStatusTable.get(getVehNo);
List columnVehNoCells = resultVehNo.getColumnCells("CF1".getBytes(), "taurus_status".getBytes());
for (Cell cell : columnVehNoCells) {
value.setVehicleTaurusStatus(Integer.parseInt(Bytes.toString(CellUtil.clonevalue(cell))));
}
List| columnLocalVehNoCells = resultVehNo.getColumnCells("CF1".getBytes(), "status_".getBytes());
for (Cell cell : columnLocalVehNoCells) {
value.setVehicleGovStatus(Integer.parseInt(Bytes.toString(CellUtil.clonevalue(cell))));
}
idCardTaurus = value.getIdCardTaurusStatus() != null ? value.getIdCardTaurusStatus() : -100;
vehTaurus = value.getVehicleTaurusStatus() != null ? value.getVehicleTaurusStatus() : -100;
} catch (Exception e) {
log.error("操作hbase出的错误为:" + e);
}
try {
tuple6.setFields(time, value.getUploadStatus(), value.getIsIntercept(), idCardTaurus, vehTaurus, 1L);
} catch (Exception e) {
log.error("错误日志为:" + e);
log.error("错误的数据为:" + value.toString());
}
return tuple6;
}
@Override
public void close() throws Exception {
if (dribaseInfoResTable != null) {
dribaseInfoResTable.close();
}
if (hbaseAdmin != null) {
hbaseAdmin.close();
}
if (hbaseConnection != null) {
hbaseConnection.close();
}
super.close();
} | | | |



