最近在做一个实时数仓的项目,需要把维度数据打入到Hbase中,以便实时分析时使用。在写入数据到Hbase中时,遇到了以下问题:
1、连接报错:
根据提示,应该是连接被关闭了
Caused by: java.lang.IllegalArgumentException: Connection is null or closed. at org.apache.hadoop.hbase.client.HTable.(HTable.java:308) at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.getTable(ConnectionManager.java:747) at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.getTable(ConnectionManager.java:729) at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.getTable(ConnectionManager.java:553) at cn.naixue.dwd.dim.DimHbaseSink$HBseSink.invoke(DimHbaseSink.java:115)
代码中看起来没什么问题:
//初始化hbase连接conf和conn
private static org.apache.hadoop.conf.Configuration conf;
private static Connection conn;
@Override
public void open(Configuration parameters) throws Exception {
//连接配置,设置zk
conf = HbaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "192.168.233.1");
conf.set("hbase.zookeeper.property.clientPort", "2181");
//进行连接
conn = ConnectionFactory.createConnection(conf);
}
解决方法:找了别人的例子做为参考,有一个解决mysql连接的思路:
“使用连接池,每次使用的时候是从连接池获取连接,open 方法不是用来获取连接的而是初始化连接池,你应该在你的 invoke 里面 getConnection 然后用完之后 close” 见RichSinkFunction close只有任务结束时候才会去调用,但是数据库连接一直拿着,最后-问答-阿里云开发者社区-阿里云
只理解了后半段,把数据库连接放在invoke方法中实现;
2、连接次数过多报错:
接上一个问题,把数据库连接放在invoke方法中实现后,连接没问题,但是程序处理部分数据后就开始报错了“ERROR o.a.h.h.c.AsyncProcess - Cannot get replica 0 location for 【数据块内容】”
这个问题经查询,应该是连接次数过多的问题。那么怎么办呢?该回去吧
3、继续跟着第一个问题的解题思路,使用连接池。
public void open(Configuration parameters) throws Exception {
//连接配置,设置zk
conf = HbaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "192.168.233.1");
conf.set("hbase.zookeeper.property.clientPort", "2181");
//进行连接
conn = ConnectionFactory.createConnection(conf,
new ThreadPoolExecutor(
20,100,60,
TimeUnit.MINUTES,
new linkedBlockingDeque<>(20)
));
这样处理,看起来连接数不成问题了,但第一个问题仍在出现。
这时候看日志发现,在open方法中,初始化了多次Hbase连接;但实际上我们只需要一个写入,这时候有了一个大胆的设想,把Hbase的SINK并行度设置为1 ,系统正常运行了。
总结:这个应该是Hbase的SINK的并行度的问题,我理解这种方式设置多并行度无法支撑多个连接导致,所以这个时候不应该用分布式写入,具体需要看看官方是否支持。
下面附本Sink代码:
思路简述:
有多张维度表写入到hbase中,所有的数据通过maxwell打入到一个topic中,现在从中取出维度表的数据(在main方法中过滤了),这里处理逻辑中,
1、将id做为rowkey;
2、将整体数据做为一个String存放到Hbase中,到使用的时候获取到进行解析处理。
3、由于是按表写入到hbase中,故没有精细化处理表中字段(如果处理的话可以节省存储开支,但需要每个表进行定义)
public static class HBseSink extends RichSinkFunction{
//初始化hbase连接conf和conn
private static org.apache.hadoop.conf.Configuration conf;
private static Connection conn;
@Override
public void open(Configuration parameters) throws Exception {
//连接配置,设置zk
conf = HbaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "192.168.233.1");
conf.set("hbase.zookeeper.property.clientPort", "2181");
//进行连接
conn = ConnectionFactory.createConnection(conf,
new ThreadPoolExecutor(
20,100,60,
TimeUnit.MINUTES,
new linkedBlockingDeque<>(20)
));
System.out.println(conn.isClosed() +":::"+conn );
}
@Override
public void invoke(String element, Context context) throws Exception {
MaxwellEntity maxwellEntity = JSONObject.parseObject(element, MaxwellEntity.class);
Map jsonMap = maxwellEntity.getData();
String tableName = maxwellEntity.getTable();
String rowKey = jsonMap.get("id").toString();
String columnFamily = "dim"; //默认维度表不区分
//开始进入处理
System.out.println(tableName+":::"+conn);
System.out.println(conn.isClosed());
Table table = conn.getTable(TableName.valueOf(tableName));
Put put = new Put(Bytes.toBytes(rowKey)); put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes("value"),Bytes.toBytes(jsonMap.toString()));
table.put(put);
}
//关闭操作
@Override
public void close() throws Exception {
if(conn != null){conn.close();}
} 


