栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

HBase(3)- HBase API操作

HBase(3)- HBase API操作

1 Hbase API操作 1.1 环境准备

新建项目后在pom.xml中添加依赖:

        
            org.apache.hbase
            hbase-server
            1.4.13
        

        
            org.apache.hbase
            hbase-client
            1.4.13
        
        
            junit
            junit
            4.12
        
1.2 HbaseAPI代码编写
package com.jackyan.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class TestHbaseApi {
    Connection connection = null;
    Configuration configuration = null;
    String tableName = "jackyan:emp";

    
    @Before
    public void getConnect() throws IOException {
        configuration = HbaseConfiguration.create();
        connection = ConnectionFactory.createConnection(configuration);
        System.out.println(connection);
    }

    
    @Test
    public void createTable() throws IOException {

        HbaseAdmin admin = (HbaseAdmin) connection.getAdmin();
        boolean b = admin.tableExists(tableName);

        if (b) {
            System.out.println("表已存在!!!");
        }
        // 表不存在则创建表
        if(!b) {
            TableName table = TableName.valueOf(tableName);
            HTableDescriptor hTableDescriptor = new HTableDescriptor(table);
            HColumnDescriptor family = new HColumnDescriptor("basic");
            hTableDescriptor.addFamily(family);
            family = new HColumnDescriptor("info");
            hTableDescriptor.addFamily(family);

            try {
                admin.createTable(hTableDescriptor);
            } catch (NamespaceNotFoundException e) {
                // 命名空间不存在则先创建命名空间
                System.out.println("创建命名空间。。。");
                NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create("jackyan").build();
                admin.createNamespace(namespaceDescriptor);

                // 再创建表
                admin.createTable(hTableDescriptor);
            }
            System.out.println("表创建成功!!!");
        }
        admin.close();
    }

    
    @Test
    public void deleteTable() throws IOException {
        // 判断表是否存在
        HbaseAdmin admin = (HbaseAdmin) connection.getAdmin();
        // 表如果存在,则删除表
        if (admin.tableExists(tableName)) {
            // 先disable表
            admin.disableTable(tableName);
            // 删除表
            admin.deleteTable(tableName);
            System.out.println("表 " + tableName + " 删除成功!");
        } else {
            System.out.println("表 " + tableName + " 不存在!");
        }

        admin.close();
    }

    
    @Test
    public void addRowData() throws IOException {
        // 获取表对象
        Table table = connection.getTable(TableName.valueOf(tableName));
        String rowkey = "100001";
        Put put = new Put(Bytes.toBytes(rowkey));
        String family = "basic";
        String column = "name";
        String value = "tom";
        put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(value));
        family = "basic";
        column = "age";
        value = "28";
        put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(value));
        family = "basic";
        column = "sex";
        value = "male";
        put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(value));
        table.put(put);
        rowkey = "100002";
        put = new Put(Bytes.toBytes(rowkey));
        family = "basic";
        column = "name";
        value = "jack";
        put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(value));
        family = "basic";
        column = "age";
        value = "26";
        put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(value));
        family = "basic";
        column = "sex";
        value = "male";
        put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(value));
        family = "info";
        column = "email";
        value = "12345678@qq.com";
        put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(value));
        family = "info";
        column = "phone";
        value = "12345678";
        put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(value));
        table.put(put);
        table.close();
        System.out.println("插入数据成功!");
        table.close();
    }

    
    @Test
    public void getAllRowData() throws IOException {
        // 获取用于获取region数据的scan对象
        Scan scan = new Scan();

        // 获取表对象
        Table table = connection.getTable(TableName.valueOf(tableName));

        // 获取查询结果集
        ResultScanner resultScanner = table.getScanner(scan);

        for (Result result : resultScanner) {
            // 获取单元格数据
            Cell[] cells = result.rawCells();
            for (Cell cell : cells) {
                System.out.println("rowkey:" + Bytes.toString(CellUtil.cloneRow(cell)));
                System.out.println("family:" + Bytes.toString(CellUtil.cloneFamily(cell)));
                System.out.println("column:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
                System.out.println("value:" + Bytes.toString(CellUtil.clonevalue(cell)));
            }
        }
        table.close();
    }

    
    @Test
    public void getoneRowData() throws IOException {

        String rowkey = "100002";
        Get get = new Get(Bytes.toBytes(rowkey));

        // 获取表对象
        Table table = connection.getTable(TableName.valueOf(tableName));

        // 获取查询结果集;
        Result result = table.get(get);

        // 获取单元格数据
        Cell[] cells = result.rawCells();
        for (Cell cell : cells) {
            System.out.println("rowkey:" + Bytes.toString(CellUtil.cloneRow(cell)));
            System.out.println("family:" + Bytes.toString(CellUtil.cloneFamily(cell)));
            System.out.println("column:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
            System.out.println("value:" + Bytes.toString(CellUtil.clonevalue(cell)));
        }
        table.close();
    }

    
    @Test
    public void getRowQualifier() throws IOException {

        String rowkey = "100001";
        Get get = new Get(Bytes.toBytes(rowkey));
        String family = "basic";
        String qualifier = "name";
        get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));

        // 获取表对象
        Table table = connection.getTable(TableName.valueOf(tableName));

        // 获取查询结果集;
        Result result = table.get(get);

        // 获取单元格数据
        Cell[] cells = result.rawCells();
        for (Cell cell : cells) {
            System.out.println("rowkey:" + Bytes.toString(CellUtil.cloneRow(cell)));
            System.out.println("family:" + Bytes.toString(CellUtil.cloneFamily(cell)));
            System.out.println("column:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
            System.out.println("value:" + Bytes.toString(CellUtil.clonevalue(cell)));
        }
        table.close();
    }

    
    @Test
    public void deleteRowData() throws IOException {
        Table table = connection.getTable(TableName.valueOf(tableName));

        List deleteList = new ArrayList();
        String rowkey = "100001";
        Delete delete = new Delete(Bytes.toBytes(rowkey));
        deleteList.add(delete);

        rowkey = "100002";
        delete = new Delete(Bytes.toBytes(rowkey));
        deleteList.add(delete);
        table.delete(deleteList);
        table.close();
        System.out.println("删除数据成功!");
    }

    
    @After
    public void close() throws IOException {
        if (connection != null) {
            connection.close();
        }
        System.out.println("关闭资源。。。");
    }
}
1.3 HbaseUtil
package com.jackyan.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.Map;
import java.util.Set;


public class HbaseUtil {

    // 为了线程安全,故使用ThreadLocal
    private static ThreadLocal connThreadLocal = new ThreadLocal();

    
    public static void getConnection() {
        
        // 从ThreadLocal中获取连接对象
        Connection conn = connThreadLocal.get();

        // 如果获取不到连接,则创建连接并放入ThreadLocal中
        if (conn == null) {
            Configuration conf = HbaseConfiguration.create();
            try {
                conn = ConnectionFactory.createConnection(conf);
                connThreadLocal.set(conn);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    
    public static void put(String tablename, String rowkey, String famliyname, String column, String value) throws Exception {
        // 获取连接
        Connection conn = connThreadLocal.get();
        // 获取表
        Table table = conn.getTable(TableName.valueOf(tablename));
        // 将字符串转换成byte[]
        byte[] rowkeybyte = Bytes.toBytes(rowkey);
        Put put = new Put(rowkeybyte);
        put.addColumn(Bytes.toBytes(famliyname), Bytes.toBytes(column), Bytes.toBytes(value));
        table.put(put);
        table.close();
        System.out.println("ok");
    }

    
    public static void put(String tablename, String rowkey, String famliyname, Map datamap) throws Exception {
        // 获取连接
        Connection conn = connThreadLocal.get();
        // 获取表
        Table table = conn.getTable(TableName.valueOf(tablename));
        // 将字符串转换成byte[]
        byte[] rowkeybyte = Bytes.toBytes(rowkey);
        Put put = new Put(rowkeybyte);
        if(datamap != null){
            Set> set = datamap.entrySet();
            for(Map.Entry entry : set){
                String column = entry.getKey();
                String value = entry.getValue();
                put.addColumn(Bytes.toBytes(famliyname), Bytes.toBytes(column), Bytes.toBytes(value));
            }
        }
        table.put(put);
        table.close();
        System.out.println("ok");
    }

    
    public static String getdata(String tablename, String rowkey, String famliyname, String colum) throws Exception {
        // 获取连接
        Connection conn = connThreadLocal.get();
        // 获取表
        Table table = conn.getTable(TableName.valueOf(tablename));
        // 将字符串转换成byte[]
        byte[] rowkeybyte = Bytes.toBytes(rowkey);
        Get get = new Get(rowkeybyte);
        Result result =table.get(get);
        byte[] resultbytes = result.getValue(famliyname.getBytes(),colum.getBytes());
        if(resultbytes == null){
            return null;
        }

        return new String(resultbytes);
    }

    
    public static void putdata(String tablename, String rowkey, String famliyname,String colum,String data) throws Exception {
        // 获取连接
        Connection conn = connThreadLocal.get();
        // 获取表
        Table table = conn.getTable(TableName.valueOf(tablename));
        Put put = new Put(rowkey.getBytes());
        put.addColumn(famliyname.getBytes(),colum.getBytes(),data.getBytes());
        table.put(put);
    }

    
    public static void close() {
        Connection conn = connThreadLocal.get();
        if (conn != null) {
            try {
                conn.close();
                connThreadLocal.remove();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
1.4 MapReduce

通过Hbase的相关JavaAPI,我们可以实现伴随Hbase操作的MapReduce过程,比如使用MapReduce将数据从本地文件系统导入到Hbase的表中,比如我们从Hbase中读取一些原始数据后使用MapReduce做数据分析。

1.4.1 官方Hbase-MapReduce

1.查看Hbase的MapReduce任务的执行

$ bin/hbase mapredcp

2.环境变量的导入
(1)执行环境变量的导入(临时生效,在命令行执行下述操作)

$ export Hbase_HOME=/opt/module/hbase
$ export HADOOP_HOME=/opt/module/hadoop
$ export HADOOP_CLASSPATH=`${Hbase_HOME}/bin/hbase mapredcp`

(2)永久生效:在/etc/profile配置

export Hbase_HOME=/opt/module/hbase
export HADOOP_HOME=/opt/module/hadoop

并在hadoop-env.sh中配置:(注意:在for循环之后配)

export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase/lib

        //将读取到的每行数据写入到context中作为map的输出
        context.write(key, put);
    }
}

WriteDataToTableMRReducer

public class WriteDataToTableMRReducer extends TableReducer {
    @Override
    protected void reduce(ImmutableBytesWritable key, Iterable values, Context context) throws IOException, InterruptedException {

        for (Put put : values) {
            context.write(NullWritable.get(), put);
        }
    }
}

2、打包
此处需要打成可运行jar包
选中需要打包的项目或模块-->Project Structure-->Artifacts-->'+'-->JAR-->From modules with dependcies...

选择需要打包的模块,选择main函数所在的主类,选择copy to the output...,最后选择打包配置信息存放的目录,一般选择resoureces目录

然后选择Build-->Build Artifacts...

最后build,build成功之后的jar包在主项目根目录下的out目录下

3、将打包后的jar目录上传到linux环境运行
运行命令

yarn jar jar/hbase_jar/hbase.jar
1.4.3 从hdfs上的文件将数据插入到hbase表中

代码编写
File2TableApplication

public class File2TableApplication {

    public static void main(String[] args) throws Exception{

        // TooRunner可以运行mapreduce作业
        ToolRunner.run(new File2TableMapReduceTool(), args);
    }
}

File2TableMapReduceTool

public class File2TableMapReduceTool implements Tool {
    public int run(String[] args) throws Exception {

        // 获取作业
        Job job = Job.getInstance();
        job.setJarByClass(File2TableMapReduceTool.class);

        Path path = new Path("hdfs://hadoop101:9000/user.csv");

        // 设置FileInputFormat
        FileInputFormat.addInputPath(job, path);
//        FileInputFormat.setInputPaths(job, path);

        // mapper
        job.setMapperClass(ReadDataFromFileMapper.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);

        //reducer
        TableMapReduceUtil.initTableReducerJob(
                "user",
                WriteDataToTableMRReducer.class,
                job
        );

        boolean b = job.waitForCompletion(true);
        return b ? JobStatus.State.SUCCEEDED.getValue() : JobStatus.State.FAILED.getValue();
    }

    public void setConf(Configuration conf) {

    }

    public Configuration getConf() {
        return null;
    }
}

ReadDataFromFileMapper

public class ReadDataFromFileMapper extends Mapper {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        String[] fields = value.toString().split(",");

        String rowkey = fields[0];
        String name = fields[1];
        String age = fields[2];

        // 初始化rowkey
        ImmutableBytesWritable rowkeyWritable = new ImmutableBytesWritable(Bytes.toBytes(rowkey));

        // 创建put对象
        Put put = new Put(Bytes.toBytes(rowkey));

        // 增加列族:列:值
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name));
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(age));

        context.write(rowkeyWritable, put);
    }
}

WriteDataToTableMRReducer

public class WriteDataToTableMRReducer extends TableReducer {
    @Override
    protected void reduce(ImmutableBytesWritable key, Iterable values, Context context) throws IOException, InterruptedException {

        for (Put put : values) {
            context.write(NullWritable.get(), put);
        }
    }
}

接下来是打可运行jar包及运行,打包步骤及运行方法详见上一个示例。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/688519.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号