栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

java api 操作 hbase,插入数据(单行,多行,批量插入)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

java api 操作 hbase,插入数据(单行,多行,批量插入)

java api 操作 hbase (主要批量存放数据)
操作步骤如下:
#创建配置对象,创建连接对象
Configuration config = HbaseConfiguration.create();
config.set("hbase.zookeeper.quorum","single01:2181 ");
Connection hbaseCon=ConnectionFactory.createConnection(config);
//操作对象
Admin admin=hbaseCon.getAdmin();
admin.xxx(tableName)
//创建表名对象
 final String HTable="kb16nb:student";
TableName tableName = TableName.valueOf(HTable);

//操作数据
Table table=hbaseCon.getTable(tableName);
//单行添加数据 
Put row = new Put(byte[] rowkey);
row.addColumn(byte[] columnfamily,byte[] column,byte[] value)
...
table.put(row);

//多行(少量)
List rows =new ArrayList();
rows.add(row);
...
table.put(rows);

//批处理
  //lambada 创建hbase 批量插入数据异常侦听对象
  //java中函数式接口,可以用lambda表达式写
BufferedMutator.ExceptionListener listener=(e,mutator)->{
    //异常信息(原因)
    String msg = e.getMessage();
    //出异常的行数
    int numExceptions = e.getNumExceptions();
    //记录出异常的行的行键,以便事后检查并再处理
    //用Log4j记录
    logger.error("Hbase MUTATE EXCEPTION : "+msg+","+numExceptions);
    if(numExceptions>0){
        StringBuilder builder=new StringBuilder();
        builder.append(Bytes.toString(e.getRow(0).getRow()));
        final String SEP=",";
        for (int i = 0; i  list=new ArrayList<>(BUFFER_SIZE);
...
//放入数据
mutator.mutate(list);

log4j日志
#1导入jar包
#2做配置信息 resources/log4j.properties 
log4j.rootLogger=INFO, stdout, logfile
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=log/hd.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
#3创建对象
private static final Logger logger=Logger.getLogger(App.class);

代码如下:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;


public class App 
{
    private static final Logger logger=Logger.getLogger(App.class);
    static void close(AutoCloseable...closes){
        for (AutoCloseable close : closes) {
            if(null!=close){
                try {
                    close.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

        }
    }

    public static void main( String[] args ) {
        Configuration config = HbaseConfiguration.create();
        config.set("hbase.zookeeper.quorum","single01:2181 ");
        //base连接对象
        Connection hbaseCon=null;
        //admin操作对象(命名空间,数据表...)
        Admin admin =null;
        //table操作表数据
        Table table =null;
        try {
             hbaseCon = ConnectionFactory.createConnection(config);
             admin=hbaseCon.getAdmin();

             final String HTable="kb16nb:student";
            TableName tableName = TableName.valueOf(HTable);
            if (admin.tableExists(tableName)) {

                table=hbaseCon.getTable(tableName);

                //lambda 创建hbase 批量插入数据的异常侦听对象
                //java中一个接口只有一个方法,可以用lambda表达式写
                BufferedMutator.ExceptionListener listener=(e,mutator)->{
                    //异常信息(原因)
                    String msg = e.getMessage();
                    //出异常的行数
                    int numExceptions = e.getNumExceptions();
                    //用Log4j记录,记录出异常的行的行键,以便事后检查并再处理
                    logger.error("Hbase MUTATE EXCEPTION : "+msg+","+numExceptions);
                    if(numExceptions>0){
                        StringBuilder builder=new StringBuilder();
                        builder.append(Bytes.toString(e.getRow(0).getRow()));
                        final String SEP=",";
                        for (int i = 1; i  list=new ArrayList<>(BUFFER_SIZE);
                Random rand=new Random();
                int sum = 0;
                for (int i = 0; i < 100000; i++) {
                    Put put = new Put(Bytes.toBytes(1    + i + ""));
                    put.addColumn(Bytes.toBytes("base"),Bytes.toBytes("name"),Bytes.toBytes("henry"+i));
                    put.addColumn(Bytes.toBytes("base"),Bytes.toBytes("age"),Bytes.toBytes(18+rand.nextInt(1)));
                    int product =rand.nextInt(2);
                    String columnFamily=product ==0?"bigdata":"cloud";
                    String subject01=product ==0 ? "hive":"net";
                    String subject02=product ==0 ? "hbase":"shell";
                    int socre01=55+rand.nextInt(45),socre02=55+rand.nextInt(45);
                    put.addColumn(Bytes.toBytes("base"),Bytes.toBytes("product"),Bytes.toBytes(columnFamily));
                    put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(subject01),Bytes.toBytes(socre01));
                    put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(subject02),Bytes.toBytes(socre02));
                    list.add(put);
                    if(list.size()>=BUFFER_SIZE){
                        mutator.mutate(list);
                        sum+=list.size();
                        System.out.println("CURRENT"+list.size()+",SUM"+sum);
                        list.clear();
                    }
                }

                if (!list.isEmpty()){
                    mutator.mutate(list);
                    sum+=list.size();
                    System.out.println("CURRENT"+list.size()+",SUM"+sum);
                    list.clear();
                }


                //单行添加数据
                // 通过rowkey创建 Put(ROW行) 对象
                //Bytes.toBytes("Object") 把任意对象转变为字节数组
                

            }else{
                System.out.println(HTable+" NOT EXISTE");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally{
            close(table,admin,hbaseCon);

        }
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/731845.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号