栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Hbase——javaAPI

Hbase——javaAPI

上一篇文章Hbase主要讲解了Hbase的核心架构基础理论知识,本篇文章是相关Hbase——JavaAPI的使用


DDL
package com.tommy.test;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;

import java.io.IOException;



public class TestAPI {

    private static Connection connection = null;
    private static Admin admin = null;
//------------------------------------------------------------------------------------
    static {    

        try {
            //1. get configuration information
            Configuration configuration = HbaseConfiguration.create();
            configuration.set("hbase.zookeeper.quorum", "Hadoop104");

            //2. create connection
            connection = ConnectionFactory.createConnection(configuration);

            //3. create admin
            admin = connection.getAdmin();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
//------------------------------------------------------------------------------------
    // 1. 判断表是否存在
    public static boolean isTableExist(String tableName) throws IOException {

        boolean exists = admin.tableExists(TableName.valueOf(tableName));

        return exists;
    }
//------------------------------------------------------------------------------------
    // 2. create table
    public static void createTable(String tableName, String... cfs) throws IOException {

        //1. 判断是否存在列族信息
        if (cfs.length <= 0) {
            System.out.println("column is not exists!!");
            return;
        }

        // 2. table if exists
        if (isTableExist(tableName)) {
            System.out.println(tableName + " is already exists!");
        }

        // 3.create table's description
        HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));

        // 4.循环添加列族信息
        for (String cf : cfs) {

            //5.创建列族描述器
            HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(cf);

            // 6.添加具体的列族信息
            hTableDescriptor.addFamily(hColumnDescriptor);
        }

        // 7.create table
        admin.createTable(hTableDescriptor);
    }
//------------------------------------------------------------------------------------
    // 3. 删除表
    public static void drop(String tableName) throws IOException {

        if (!isTableExist(tableName)) {
            System.out.println(tableName + " is not exists!");
            return;
        }

        // disable table
        admin.disableTable(TableName.valueOf(tableName));

        // drop table
        admin.deleteTable(TableName.valueOf(tableName));

        System.out.println("drop table successful!!");
    }
//------------------------------------------------------------------------------------
    // 4. 创建命名空间
    public static void createNameSpace(String ns) {

        // 1. create namespace inspector
        // 私有化的构造方法中都有Builder类
        NamespaceDescriptor build = NamespaceDescriptor.create(ns).build();

        // 2、创建命名空间
        try {
            admin.createNamespace(build);
        } catch (NamespaceExistException e) {
            System.out.println(ns + "命名空间已经存在");
        }
        catch (IOException e) {
            e.printStackTrace();
        }
        System.out.println("Hello");
    }
//------------------------------------------------------------------------------------
// 关闭资源
    public static void close() {

        if (admin != null) {
            try {
                admin.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        if (connection != null) {
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
//------------------------------------------------------------------------------------
    public static void main(String[] args) throws IOException {

        createTable("0408:stu5", "info1", "info2");
        close();
    }
}


DML
	public static void putData(String tableName, String rowKey, String cf, String cn, String value)
            throws IOException {

        // 1. 获取表对象
        Table table = connection.getTable(TableName.valueOf(tableName));

        // 2. create put
        Put put = new Put(Bytes.toBytes(rowKey));

        // 3.给Put对象赋值
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn), Bytes.toBytes(value));
        // List puts
		
        // 4. insert value
        table.put(put);   // table.put(puts)

        // 5. close
        table.close();
    }
//---------------------------------------------------------------------------------------------
// 测试:
    public static void main(String[] args) throws IOException {

        putData("stu4", "1001", "info", "name", "zhangsan");
        close();
    }

DML get
  • 方式一:指定到rowkey:
    public static void getData(String tableName, String rowKey, String cf, String cn)
            throws IOException {

        Table table = connection.getTable(TableName.valueOf(tableName));

        // 2. get "GET" object
        Get get = new Get(Bytes.toBytes(rowKey));

        Result result = table.get(get);

        for (Cell cell : result.rawCells()) {
            System.out.println("CF->" + Bytes.toString(CellUtil.cloneFamily(cell)) +
                    "CN->" + Bytes.toString(CellUtil.cloneQualifier(cell)) +
                    "Value->" + Bytes.toString(CellUtil.clonevalue(cell)));
        }
    }
  • 方式二:指定到ColumnFamily
    public static void getData(String tableName, String rowKey, String cf, String cn)
            throws IOException {

        Table table = connection.getTable(TableName.valueOf(tableName));

        // 2. get "Get" Object
        Get get = new Get(Bytes.toBytes(rowKey));

        // 3. 指定到列族
        Get get1 = get.addFamily(Bytes.toBytes(cf));

        Result result = table.get(get1);

        for (Cell cell : result.rawCells()) {
            System.out.println("RowKey--> " + Bytes.toString(CellUtil.cloneRow(cell))
             + "Column Family--> " + Bytes.toString(CellUtil.cloneFamily(cell))
             + "CN--> " + Bytes.toString(CellUtil.cloneFamily(cell))
             + "Value--> " + Bytes.toString(CellUtil.clonevalue(cell)));
        }

        table.close();
    }
  • 方式三:指定到列:
    public static void getData(String tableName, String rowKey, String cf, String cn)
            throws IOException {

        Table table = connection.getTable(TableName.valueOf(tableName));

        Get get = new Get(Bytes.toBytes(rowKey));

        Get get1 = get.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn));

        Result result = table.get(get1);

        for (Cell cell : result.rawCells()) {
            System.out.println("RK--> " + Bytes.toString(CellUtil.cloneRow(cell))
             + "CF--> " + Bytes.toString(CellUtil.cloneFamily(cell))
             + "CN--> " + Bytes.toString(CellUtil.cloneQualifier(cell))
             + "Value--> " + Bytes.toString(CellUtil.clonevalue(cell)));
        }
        
        table.close();
    }
scan:

仅需传一个参数即可TableName

 

删除操作:

限定保存两个版本的数据,即使两条数据在同一个内存中,在flush后也不会被删除,两个版本的数据均存在。
在命令行中无法做到删除单个版本,即使该列族指定了多个版本,在命令行中只能全删除。

  • 指定rowkey删除:
    public static void delete(String tableName, String rowKey, String cf, String cn) throws IOException {

        // 1. 获取到表对象
        Table table = connection.getTable(TableName.valueOf(tableName));

        // 2. get delete object
        Delete delete = new Delete(Bytes.toBytes(rowKey));

        // 3. 执行删除操作
        table.delete(delete);

        table.close();

    }

该操作能杀出指定rowkey的数据。

addColumns and addColumn问题:
删除操作使用指定到列名:

    public static void delete(String tableName, String rowKey, String cf, String cn) throws IOException {

        // 1. 获取到表对象
        Table table = connection.getTable(TableName.valueOf(tableName));

        // 2. get delete object
        Delete delete = new Delete(Bytes.toBytes(rowKey));

		delete.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn));  ///

        // 3. 执行删除操作
        table.delete(delete);

        table.close();

    }

利用delete.addColumns(Bytes.toBytes(cf), Bytes.toBytes(cn));将会删除所有的版本数据,并不会保留任何,删除数据时建议使用这种,对于默认一个版本的列族删除数据不会出现任何问题;
利用delete.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn))删除数据,假设在同一cell下put了两个不同的数据,版本数为1,将会进行覆盖,假设还没达到刷写的条件,所以两个数据都在内存中,指定删除后,只会删除最新版本的数据,查看数据时将会出现老版本的数据,从而出现了错误,正是因为Hbase使用时间戳作为数据的标识导致的错误,所以在删除单个版本的数据建议使用addColumns

指定列族删除
添加:delete.addFamily(Bytes.toBytes(cf));
注:在命令行中无法实现列族的删除。

删除操作总结:

  • 指定到rowkey:删除标记:deleteFamily,删除所有版本
  • 指定到rowkey + ColumnFamily:只能在API中实现,删除标记:deleteFamily,删除所有的版本,可以指定时间戳,指定后删除时间戳小于等于指定时间戳的所有数据。
  • 指定到rowkey + columnFamily + cn:指定到列:
    • addColumns():删除指定列的所有版本,传时间戳后删除小于等于该时间戳的所有数据,删除标记:DeleteColumn
    • addColumn():删除的是单个版本,不传时间戳删除最新版的数据,指定时间戳后删除指定时间戳的数据,删除标记:delete,问题比较大,覆盖数据删除最新版,不应该出现的数据会出来,尽量少用,或者建表的时候尽量用一个版本,或者使用多个版本不能和该删除操作结合用。
与Mapreduce进行整合

作为Hbase 的分析引擎。
MapReduce读取Hbase的数据,所以为Hadoop应该有Hbase的jar包。

环境变量的配置:

/etc/profile:添加
export Hbase_HOME=/home/moudel/hbase-1.3.1
export HADOOP_HOME=/home/moudel/hadoop-2.7.3
/home/moudel/hadoop-2.7.3/etc/hadoop/hadoop-env.sh:
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/home/moudel/hbase-1.3.1/lib/*

官方案例一:
统计表中数据行数:rowcounter

{$HADOOP_HOME}/bin/hadoop jar {$Hbase_HOME}/lib/hbase-server-1.3.1.jar rowcounter stu3

官方案例二:
将本地文件上传到hdfs中,将hdfs中的文件保存到hbase的表格中。

{$HADOOP_HOME}/bin/yarn jar{$Hbase_HOME}/lib/hbase-server-1.3.1.jar importtsv -Dimporttsv.columns=Hbase_ROW_KEY,info:name,info:color fruit hdfs://Hadoop104:9000/fruit.tsv

自定义样例:利用MR向Hbase中写入数据
代码:

// Driver
package com.tommy.mapred1;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class FruitDriver implements Tool {

    // 定义一个Configuration
    private Configuration configuration = null;

    @Override
    public int run(String[] args) throws Exception {

        // get job
        Job job = Job.getInstance(configuration);

        // set jar path
        job.setJarByClass(FruitDriver.class);

        // set mapper's K V
        job.setMapperClass(FruitMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);

        // set reducer class
        TableMapReduceUtil.initTableReducerJob(args[1], FruitReducer.class, job);

        // set input params
        FileInputFormat.setInputPaths(job, new Path(args[0]));


        // commit job
        boolean b = job.waitForCompletion(true);

        return b ? 0 : 1;
    }

    @Override
    public void setConf(Configuration conf) {
        configuration = conf;
    }

    @Override
    public Configuration getConf() {
        return configuration;
    }

    public static void main(String[] args) {

        try {
            Configuration configuration = new Configuration();
            int run = ToolRunner.run(configuration, new FruitDriver(), args);

            System.exit(run);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

// Mapper
package com.tommy.mapred1;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FruitMapper extends Mapper {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        context.write(key, value);
    }
}

// Reducer
package com.tommy.mapred1;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;

import java.io.IOException;

public class FruitReducer extends TableReducer {

    @Override
    protected void reduce(LongWritable key, Iterable values, Context context) throws IOException, InterruptedException {

        // 遍历values: 1001 apple red
        for (Text value : values) {

            // 获取每一行的数据
            String[] split = value.toString().split("t");

            // create Put 没有空参构造
            Put put = new Put(Bytes.toBytes(split[0]));

            // set value to put obj
            put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(split[1]));
            put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(split[2]));

            // write
            context.write(NullWritable.get(), put);
        }
    }
}

自定义样例:从Hbase中读取数据写入Hbase的另一张表中:

// Driver
package com.tommy.mapred2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Fruit2Driver implements Tool {

    private Configuration conf = null;

    @Override
    public int run(String[] args) throws Exception {

        Configuration conf = new Configuration();

        // 1.get job
        Job job = Job.getInstance(conf);

        // set main Loc
        job.setJarByClass(Fruit2Driver.class);

        // set Mapper && output KV
        // params:  table_name, new Scan(), mapper.class, K_type.class, V_type.class, job
        TableMapReduceUtil.initTableMapperJob(
                "fruit1",
                new Scan(),
                Fruit2Mapper.class,
                ImmutableBytesWritable.class,
                Put.class,
                job
        );

        // set reducer && output table  利用工具类
        // params:  outputTableName, reducer.class, job
        TableMapReduceUtil.initTableReducerJob(
                "fruit2",
                Fruit2Reducer.class,
                job
        );

        // submit job
        boolean result = job.waitForCompletion(true);

        return result ? 0 : 1;
    }

    @Override
    public void setConf(Configuration configuration) {
        conf = configuration;
    }

    @Override
    public Configuration getConf() {
        return conf;
    }

    public static void main(String[] args) {

        try {
//            Configuration conf = new Configuration();
            Configuration configuration = HbaseConfiguration.create();
            ToolRunner.run(configuration, new Fruit2Driver(), args);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
// Reducer
package com.tommy.mapred2;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;

import java.io.IOException;

public class Fruit2Reducer extends TableReducer {

    @Override
    protected void reduce(ImmutableBytesWritable key, Iterable values, Context context) throws IOException, InterruptedException {

        for (Put value : values) {   // MR对Java迭代器的优化:每次的对象存储于同一个地址,
            // BeanUtil提供相关的迭代器拷贝当前Reducer对象
            context.write(NullWritable.get(), value);
        }
    }
}


/mapper的注解:
mapper是从Hbase中读取数据,故要继承自TableMapper而并非Mapper,此时输入的泛型已经指定,输出的泛型需要自定义,输出的泛型:ImmutableBytesWritable代表的是RowKey,Put指的是封装好的结果类型,提前在Mapper中封装好,防止在Reducer中负载过大。Put中需要指定行键Put put = new Put(key.get());

Reducer中指定的三个泛型:
三个指定的泛型:

public abstract class TableReducer extends Reducer {
    public TableReducer() {
    }
}

其中最后一个泛型是指定好的:Mutation,代表的是抽象类,其实现方法为:

因为最终输出的位置时Hbase所以在这利用Reducer的子类TableReducer进行定义。
由于最终的输出位置是指定的,所以就不需要用户自定义输出的value类型了。
同理可理解Mapper中的泛型机制,如果对于Mapper而言的输入数据来自于Hbase表格,需要继承Mapper的子类而并非TableMapper,从而输入数据的泛型也是同样给指定好的。

// Mapper

package com.tommy.mapred2;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;

public class Fruit2Mapper extends TableMapper {

    // read from hbase---> ImmutableBytesWritable, Put

    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {

        Put put = new Put(key.get());

        // 对象的封装在mapper完成
        for (Cell cell : value.rawCells()) {
            if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
                put.add(cell);
            }
        }
        context.write(key, put);
    }
}

注解:
在本地运行时,使用Hbase的Configuration:HbaseConfiguration,加载本地的配置文件,从而在本地访问到集群。
将配置文件hbase-site.xml放到resources目录下,本地运行会利用此配置文件实现连接。
要想在集群中运行jar包,首先要把hbase导入到hadoop的环境变量中,在本地模式下则不需要进行此步骤,因为在本地模式下用maven导入hbase的client和server依赖jar中已经包含了运行MR的所有配置,既hbase的这两个依赖jar中包含hadoop相关配置信息。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/327209.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号