栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Hbase 基础第二讲-javaApi

Hbase 基础第二讲-javaApi

java API

1.java api 操作hbase2.过滤器查询3.Hbase整合Hive

3.1 准备Hbase表和数据 4.HbaseToHDFS:Hbase表数据写入HDFS

1.java api 操作hbase
public class HbaseDemoTest {
    // 声明静态配置
    static Configuration conf = null;
    private static final String ZK_CONNECT_STR =
            "bigdata02:2181,bigdata03:2181,bigdata04:2181,bigdata05:2181";

    static {
        conf = HbaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", ZK_CONNECT_STR);
    }

    
    public static void creatTable(String tableName, String[] family) throws Exception {
        HbaseAdmin admin = new HbaseAdmin(conf);
        HTableDescriptor desc = new HTableDescriptor(tableName);
        for (int i = 0; i < family.length; i++) {
            desc.addFamily(new HColumnDescriptor(family[i]));
        }
        if (admin.tableExists(tableName)) {
            System.out.println("table Exists!")
            System.exit(0);
        } else {
            admin.createTable(desc);
            System.out.println("create table Success!");
        }
    }

    
    public static void addData(
            String rowKey, String tableName, String[] column1, String[] value1, String[] column2, String[] value2) throws IOException {
        // 设置rowkey
        Put put = new Put(Bytes.toBytes(rowKey));
        // HTabel负责跟记录相关的操作如增删改查等//
        HTable table = new HTable(conf, Bytes.toBytes(tableName));
        // 获取所有的列簇
        HColumnDescriptor[] columnFamilies = table.getTableDescriptor().getColumnFamilies();
        for (int i = 0; i < columnFamilies.length; i++) {
            // 获取列簇名
            String familyName = columnFamilies[i].getNameAsString();
            // article列簇put数据
            if (familyName.equals("article")) {
                for (int j = 0; j < column1.length; j++) {
                    put.add(Bytes.toBytes(familyName),
                            Bytes.toBytes(column1[j]), Bytes.toBytes(value1[j]));
                }
            }
            // author列簇put数据
            if (familyName.equals("author")) {
                for (int j = 0; j < column2.length; j++) {
                    put.add(Bytes.toBytes(familyName),
                            Bytes.toBytes(column2[j]), Bytes.toBytes(value2[j]));
                }
            }
        }
        table.put(put);
        System.out.println("add data Success!");
    }


    
    public static void getResultScann(String tableName) throws IOException {
        Scan scan = new Scan();
        ResultScanner rs = null;
        HTable table = new HTable(conf, Bytes.toBytes(tableName));
        try {
            rs = table.getScanner(scan);
            for (Result r : rs) {
                for (KeyValue kv : r.list()) {
                    printKeyValue(kv);
                }
            }
        } finally {
            rs.close();
        }
    }

    
    public static void getResultScann(String tableName, String start_rowkey,
                                      String stop_rowkey) throws IOException {
        Scan scan = new Scan();
        scan.setStartRow(Bytes.toBytes(start_rowkey));
        scan.setStopRow(Bytes.toBytes(stop_rowkey));
        ResultScanner rs = null;
        HTable table = new HTable(conf, Bytes.toBytes(tableName));
        try {
            rs = table.getScanner(scan);
            printResultScanner(rs);
        } finally {
            rs.close();
        }
    }

    
    public static void getResultByColumn(String tableName, String rowKey, String
            familyName, String columnName) throws IOException {
        HTable table = new HTable(conf, Bytes.toBytes(tableName));
        Get get = new Get(Bytes.toBytes(rowKey));
        // 获取指定列簇和列修饰符对应的列
        get.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName));
        Result result = table.get(get);
        for (KeyValue kv : result.list()) {
            printKeyValue(kv);
        }
    }

    
    public static void updateTable(
            String tableName, String rowKey, String familyName, String
            columnName, String value) throws IOException {
        HTable table = new HTable(conf, Bytes.toBytes(tableName));
        Put put = new Put(Bytes.toBytes(rowKey));
        put.add(Bytes.toBytes(familyName), Bytes.toBytes(columnName),
                Bytes.toBytes(value));
        table.put(put);
        System.out.println("update table Success!");
    }

    
    public static void getResultByVersion(String tableName, String rowKey,
                                          String familyName, String columnName) throws IOException {
        HTable table = new HTable(conf, Bytes.toBytes(tableName));
        Get get = new Get(Bytes.toBytes(rowKey));
        get.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName));
        get.setMaxVersions(5);
        Result result = table.get(get);
        for (KeyValue kv : result.list()) {
            printKeyValue(kv);
        }
    }

    
    public static void deleteAllColumn(String tableName, String rowKey) throws
            IOException {
        HTable table = new HTable(conf, Bytes.toBytes(tableName));
        Delete deleteAll = new Delete(Bytes.toBytes(rowKey));
        table.delete(deleteAll);
        System.out.println("all columns are deleted!");
    }

    public static void deleteTable(String tableName) throws IOException {
        HbaseAdmin admin = new HbaseAdmin(conf);
        admin.disableTable(tableName);
        admin.deleteTable(tableName);
        System.out.println(tableName + "is deleted!");
    }
}
2.过滤器查询

比较过滤器,专用过滤器
二.比较器

LESS <
LESS_OR_EQUAL <=
EQUAL =
NOT_EQUAL <>
GREATER_OR_EQUAL >=
GREATER >
NO_OP 不做任何操作

三.过滤器

BinaryComparator 按字节索引顺序比较指定字节数组,采用
Bytes.compareTo(byte[])
BinaryPrefixComparator 跟前面相同,只是比较左端的数据是否相同
NullComparator 判断给定的是否为空
BitComparator 按位比较
RegexStringComparator 提供一个正则的比较器,仅支持 EQUAL 和 NOT_EQUAL
SubstringComparator 判断提供的子串是否出现在 value 中,仅支持 EQUAL 和
NOT_EQUAL

四.比较过滤器
行键过滤器RowFilter

Filter filter1 = new RowFilter(CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("user0000")));
scan.setFilter(filter1);

列簇过滤器FamilyFilter

Filter filter1 = new FamilyFilter(CompareOp.LESS,
new BinaryComparator(Bytes.toBytes("base_info")));
scan.setFilter(filter1)

列过滤器QualifierFilter

Filter filter = new QualifierFilter(CompareOp.LESS_OR_EQUAL,
new BinaryComparator(Bytes.toBytes("name")));
scan.setFilter(filter1);

值过滤器 ValueFilter

Filter filter = new ValueFilter(CompareOp.EQUAL, new
SubstringComparator("zhangsan") );
scan.setFilter(filter1);

时间戳过滤器 TimestampsFilter

List tss = new ArrayList();
tss.add(1495398833002l);
Filter filter1 = new TimestampsFilter(tss);
scan.setFilter(filter1);

专用过滤器
单列值过滤器 SingleColumnValueFilter ----会返回满足条件的整行

SingleColumnValueFilter filter = new SingleColumnValueFilter(
Bytes.toBytes("colfam1"),
Bytes.toBytes("col-5"),
CompareFilter.CompareOp.NOT_EQUAL,
new SubstringComparator("val-5"));
filter.setFilterIfMissing(true); //如果不设置为true,则那些不包含指定column的行也会返回
scan.setFilter(filter1);

单列值排除器SingleColumnValueExcludeFilter -----返回排除了该列的结果

SingleColumnValueFilter filter = new SingleColumnValueFilter(
        Bytes.toBytes("colfam1"),
        Bytes.toBytes("col-5"),
        CompareFilter.CompareOp.NOT_EQUAL,
        new SubstringComparator("val-5"));
       filter.setFilterIfMissing(true); //如果不设置为true,则那些不包含指定column的行也会返回 
       scan.setFilter(filter1);

前缀过滤器 PrefixFilter----针对行键

Filter filter = new PrefixFilter(Bytes.toBytes("row1"));
scan.setFilter(filter1);

列前缀过滤器 ColumnPrefixFilter

Filter filter = new ColumnPrefixFilter(Bytes.toBytes("qual2"));
scan.setFilter(filter1);
3.Hbase整合Hive

Hive 与 Hbase 利用两者本身对外的 API 来实现整合,主要是靠 HbaseStorageHandler 进行通信。Hive 与 Hbase 利用两者本身对外的 API 来实现整合,主要是靠HbaseStorageHandler 进行通信。Hive 与 Hbase 利用两者本身对外的 API 来实现整合,主要是靠 HbaseStorageHandler 进行通信。HiveHbaseTableInputFormat 完成对 Hbase 表的切分,获取 RecordReader 对象来读取数据。对 Hbase 按Regions,确定MapReduce 中就有多少个MapTask

3.1 准备Hbase表和数据

准备Hbase表和数据

create 'mingxing', {NAME => 'base_info',VERSIONS => 1},{NAME =>
'extra_info',VERSIONS => 1}

插入准备数据:

put 'mingxing','rk001','base_info:name','huangbo'
put 'mingxing','rk001','base_info:age','33'
put 'mingxing','rk001','extra_info:math','44'
put 'mingxing','rk001','extra_info:province','beijing'
put 'mingxing','rk002','base_info:name','xuzheng'
put 'mingxing','rk002','base_info:age','44'
put 'mingxing','rk003','base_info:name','wangbaoqiang'
put 'mingxing','rk003','base_info:age','55'
put 'mingxing','rk003','base_info:gender','male'
put 'mingxing','rk004','extra_info:math','33'
put 'mingxing','rk004','extra_info:province','tianjin'
put 'mingxing','rk004','extra_info:children','3'
put 'mingxing','rk005','base_info:name','liutao'
put 'mingxing','rk006','extra_info:name','liujialing'

Hive端操作

指定hbase所使用的zookeeper集群的地址:默认端口是2181,可以不写
set hbase.zookeeper.quorum=bigdata02:2181,bigdata03:2181,bigdata04:2181;
指定hbase在zookeeper中使用的根目录
set zookeeper.znode.parent=/hbase;
加入指定的处理jar
add jar /home/bigdata/apps/apache-hive-2.3.6-bin/lib/hive-hbase-handler-
2.3.6.jar;

所有列簇:

create external table mingxing(rowkey string, base_info map,
extra_info map)
row format delimited fields terminated by 't'
stored by 'org.apache.hadoop.hive.hbase.HbaseStorageHandler'
with serdeproperties ("hbase.columns.mapping" = ":key,base_info:,extra_info:")
tblproperties("hbase.table.name"="mingxing","hbase.mapred.output.outputtable"="mingxing");

列族 列
hbase.columns.mapping" = ":key,base_info:,extra_info:
部分列簇部分列

create external table mingxing1(rowkey string, name string, province string)
row format delimited fields terminated by 't'
stored by 'org.apache.hadoop.hive.hbase.HbaseStorageHandler'
with serdeproperties ("hbase.columns.mapping" =
":key,base_info:name,extra_info:province")
tblproperties("hbase.table.name"="mingxing","hbase.mapred.output.outputtable"="m
ingxing");

部分参数解释:

org.apache.hadoop.hive.hbase.HbaseStorageHandler:处理hive到hbase转换关系的处理器
hbase.columns.mapping:定义hbase的列簇和列到hive的映射关系
hbase.table.name:hbase表名
4.HbaseToHDFS:Hbase表数据写入HDFS

从Hbase中读取数据,分析之后然后写入HDFS,代码实现

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.List;

public class HbaseDataToHDFSMR {
    public static final String ZK_ConNECT =
            "bigdata02:2181,bigdata03:2181,bigdata04:2181";
    public static final String ZK_CONNECT_KEY = "hbase.zookeeper.quorum";public static final String HDFS_ConNECT = "hdfs://hadoop277ha/";
    public static final String HDFS_CONNECT_KEY = "fs.defaultFS";
    public static void main(String[] args) throws Exception {
   // 把Hadoop集群的配置文件:core-site.xml 和 hdfs-site.xml 放入 resources 目录中。
        Configuration conf = HbaseConfiguration.create();
        conf.set(ZK_CONNECT_KEY, ZK_CONNECT);
        conf.set(HDFS_CONNECT_KEY, HDFS_CONNECT);
        System.setProperty("HADOOP_USER_NAME", "bigdata");
        Job job = Job.getInstance(conf);
        job.setJarByClass(HbaseDataToHDFSMR.class);
       // 输入数据来源于hbase的user_info表
        Scan scan = new Scan();
        TableMapReduceUtil.initTableMapperJob("user_info", scan,
                HbaseDataToHDFSMRMapper.class, Text.class,
                NullWritable.class, job);
        // RecordReader --- TableRecordReader
        // InputFormat ----- TextInputFormat
        // 数据输出到hdfs
        FileOutputFormat.setOutputPath(job, new Path("/hbase2hdfs/output2"));
        boolean waitForCompletion = job.waitForCompletion(true);
        System.exit(waitForCompletion ? 0 : 1);
    }
    
    class HbaseDataToHDFSMRMapper extends TableMapper {
        
        @Override
        protected void map(ImmutableBytesWritable key, Result value,
                           Mapper.Context context) throws IOException,
                InterruptedException {
             // byte[] rowkey = Bytes.copy(key, 0, key.getLength());
            String rowkey = Bytes.toString(key.copyBytes());
            List listCells = value.listCells();
            Text text = new Text();
          // 最后输出格式是: rowkey, base_info:name-huangbo, base-info:age-34
            for (Cell cell : listCells) {
                String family = new String(CellUtil.cloneFamily(cell));
                String qualifier = new String(CellUtil.cloneQualifier(cell));
                String v = new String(CellUtil.clonevalue(cell));
                long ts = cell.getTimestamp();
                text.set(rowkey + "t" + family + "t" + qualifier + "t" + v +
                        "t" + ts);
                context.write(text, NullWritable.get());
            }
        }
    }
}

HDFSToHbase:HDFS数据写入Hbase
从HDFS从读入数据,处理之后写入Hbase,代码实现:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;

public class HDFSDataToHbaseMR extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        int run = ToolRunner.run(new HDFSDataToHbaseMR(), args);
        System.exit(run);
    } 
    @Override
    public int run(String[] arg0) throws Exception {
        Configuration config = HbaseConfiguration.create();
        config.set("hbase.zookeeper.quorum",
                "bigdata02:2181,bigdata03:2181,bigdata04:2181");
        System.setProperty("HADOOP_USER_NAME", "bigdata");
        Job job = Job.getInstance(config, "HDFSDataToHbaseMR");
        job.setJarByClass(HDFSDataToHbaseMR.class);
        job.setMapperClass(HDFSDataToHbase_Mapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
// 设置数据的输出组件
        TableMapReduceUtil.initTableReducerJob("student",
                HDFSDataToHbase_Reducer.class, job);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Put.class);
        FileInputFormat.addInputPath(job, new Path("/bigdata/student/input"));
        boolean isDone = job.waitForCompletion(true);
        return isDone ? 0 : 1;
    } 
    static class HDFSDataToHbase_Mapper extends Mapper {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws
                IOException,
                InterruptedException {
            context.write(value, NullWritable.get());
        }
    } 
    static class HDFSDataToHbase_Reducer extends TableReducer {
        protected void reduce(Text key, Iterable values,
                              Reducer.Context context) throws IOException,
                InterruptedException {
            String[] split = key.toString().split(",");
            Put put = new Put(split[0].getBytes());
            put.addColumn("info".getBytes(), "name".getBytes(),
                    split[1].getBytes());
            put.addColumn("info".getBytes(), "sex".getBytes(),
                    split[2].getBytes());
            put.addColumn("info".getBytes(), "age".getBytes(),
                    split[3].getBytes());
            put.addColumn("info".getBytes(), "department".getBytes(),
                    split[4].getBytes());
            context.write(NullWritable.get(), put);
        }
    }}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/761450.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号