栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink写数据(Sink)到HBASE报错解决

Flink写数据(Sink)到HBASE报错解决

最近在做一个实时数仓的项目,需要把维度数据打入到Hbase中,以便实时分析时使用。在写入数据到Hbase中时,遇到了以下问题:

1、连接报错:

        根据提示,应该是连接被关闭了

Caused by: java.lang.IllegalArgumentException: Connection is null or closed.
	at org.apache.hadoop.hbase.client.HTable.(HTable.java:308)
	at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.getTable(ConnectionManager.java:747)
	at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.getTable(ConnectionManager.java:729)
	at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.getTable(ConnectionManager.java:553)
	at cn.naixue.dwd.dim.DimHbaseSink$HBseSink.invoke(DimHbaseSink.java:115)

代码中看起来没什么问题:

 //初始化hbase连接conf和conn
        private static org.apache.hadoop.conf.Configuration conf;
        private static Connection conn;
       
        @Override
        public void open(Configuration parameters) throws Exception {
            //连接配置,设置zk
            conf = HbaseConfiguration.create();
            conf.set("hbase.zookeeper.quorum", "192.168.233.1");
            conf.set("hbase.zookeeper.property.clientPort", "2181");

                //进行连接
            conn = ConnectionFactory.createConnection(conf);
            }
       

     

解决方法:找了别人的例子做为参考,有一个解决mysql连接的思路:

“使用连接池,每次使用的时候是从连接池获取连接,open 方法不是用来获取连接的而是初始化连接池,你应该在你的 invoke 里面 getConnection 然后用完之后 close” 见RichSinkFunction close只有任务结束时候才会去调用,但是数据库连接一直拿着,最后-问答-阿里云开发者社区-阿里云

只理解了后半段,把数据库连接放在invoke方法中实现;

2、连接次数过多报错:

接上一个问题,把数据库连接放在invoke方法中实现后,连接没问题,但是程序处理部分数据后就开始报错了“ERROR o.a.h.h.c.AsyncProcess  - Cannot get replica 0 location for 【数据块内容】”

这个问题经查询,应该是连接次数过多的问题。那么怎么办呢?该回去吧

3、继续跟着第一个问题的解题思路,使用连接池。

public void open(Configuration parameters) throws Exception {
            //连接配置,设置zk
            conf = HbaseConfiguration.create();
            conf.set("hbase.zookeeper.quorum", "192.168.233.1");
            conf.set("hbase.zookeeper.property.clientPort", "2181");

                //进行连接
            conn = ConnectionFactory.createConnection(conf,
                    new ThreadPoolExecutor(
                            20,100,60,
                            TimeUnit.MINUTES,
                            new linkedBlockingDeque<>(20)
                    ));

这样处理,看起来连接数不成问题了,但第一个问题仍在出现。

这时候看日志发现,在open方法中,初始化了多次Hbase连接;但实际上我们只需要一个写入,这时候有了一个大胆的设想,把Hbase的SINK并行度设置为1 ,系统正常运行了。

总结:这个应该是Hbase的SINK的并行度的问题,我理解这种方式设置多并行度无法支撑多个连接导致,所以这个时候不应该用分布式写入,具体需要看看官方是否支持。

下面附本Sink代码:

        思路简述:

                有多张维度表写入到hbase中,所有的数据通过maxwell打入到一个topic中,现在从中取出维度表的数据(在main方法中过滤了),这里处理逻辑中,

                1、将id做为rowkey;

                2、将整体数据做为一个String存放到Hbase中,到使用的时候获取到进行解析处理。

                3、由于是按表写入到hbase中,故没有精细化处理表中字段(如果处理的话可以节省存储开支,但需要每个表进行定义)

    public static class HBseSink extends RichSinkFunction{
        //初始化hbase连接conf和conn
        private static org.apache.hadoop.conf.Configuration conf;
        private static Connection conn;

        @Override
        public void open(Configuration parameters) throws Exception {
            //连接配置,设置zk
            conf = HbaseConfiguration.create();
            conf.set("hbase.zookeeper.quorum", "192.168.233.1");
            conf.set("hbase.zookeeper.property.clientPort", "2181");
                //进行连接
            conn = ConnectionFactory.createConnection(conf,
                    new ThreadPoolExecutor(
                            20,100,60,
                            TimeUnit.MINUTES,
                            new linkedBlockingDeque<>(20)
                    ));
            System.out.println(conn.isClosed() +":::"+conn  );
        }

        @Override
        public void invoke(String element, Context context) throws Exception {
            MaxwellEntity maxwellEntity = JSONObject.parseObject(element, MaxwellEntity.class);
            Map jsonMap = maxwellEntity.getData();
            String tableName = maxwellEntity.getTable();
            String rowKey = jsonMap.get("id").toString();
            String columnFamily = "dim";  //默认维度表不区分
            //开始进入处理
            System.out.println(tableName+":::"+conn);
            System.out.println(conn.isClosed());
            Table table = conn.getTable(TableName.valueOf(tableName));
            Put put = new Put(Bytes.toBytes(rowKey));    put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes("value"),Bytes.toBytes(jsonMap.toString()));
            
            table.put(put);
        }
        //关闭操作
        @Override
        public void close() throws Exception {
            if(conn != null){conn.close();}
        }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/784569.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号