栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

08-Hbase

08-Hbase

08-Hbase 1.目标概述

了解Hbase基本概念了解Hbase架构掌握Hbase集群环境搭建掌握Hbase中shell操作掌握Hbase中java API操作掌握rowKey的设计原则【延伸】MapReduce操作Hbase【延伸】spark操作Hbase 2.Hbase基础 2.1.基本概念 2.1.1.介绍

官网地址:http://hbase.apache.org/

Hbase在Hadoop之上提供了类似于Bigtable的能力,是BigTable的开源实现版本,通过java编程语言实现 。Hbase不同于关系数据库,它是一个适合于非结构化数据存储的数据库 。

Hbase是一个分布式的、面向列的开源数据库 ,是一个NoSQL的分布式数据存储系统,实现对大型数据的实时、随机读写访问。

Hbase利用Hadoop HDFS作为文件存储系统,利用Hadoop MapReduce来处理Hbase中的海量数据 。

Hbase依赖于hdfs做底层的数据存储Hbase依赖于mapReduce做数据计算Hbase依赖于zookeeper做服务协调

2.1.2.特点

提供高可靠、高性能、列存储、可伸缩、实时读写的nosql数据库系统

介于NoSQL和RDBMS之间,仅能通过主键(row key)和主键的范围(range)来检索数据主要用来存储非结构化和半结构化的松散数据Hbase中底层数据存储都是字节数组:byte[]

Hbase中表的特点

大:一个表可以有上十亿行,上百万列

面向列:面向**列(族)**的存储和权限控制,列(簇)独立检索

稀疏:空(null)列不占用存储空间,表可以设计的非常稀疏

无模式:每行都有一个可排序的主键和任意多的列,列可以动态增加,不同的行可以有不同的列

和行式数据库的区别

行式数据库在分析的时候,操作整行数据。比如将字段id,name,age,sex,score等完整的信息读入内存,造成大量的内存和IO资源浪费列式数据库是把行式数据库全部拆开,按照列的方式重新组合存储,一列所有的行的数据存放在一起。带来的好处就是,比如要分析男女信息,就直接访问所有的男女信息;要分析销售额,就直接访问消费额相关的数据。

2.2.Hbase表结构

表(table):用于存储管理数据,具有稀疏的、面向列的特点。Hbase中的每一张表,就是所谓的大表(Bigtable),可以有上十亿行,上百万列。对于值为空的列,并不占用存储空间,因此表可以设计的非常稀疏

行键(RowKey):类似于mysql中的主键,Hbase根据行键来快速检索数据,一个行键对应一条记录。

列簇(ColumnFamily):是列的集合。列簇在表定义时需要指定,列在插入数据时动态指定。列中的数据都是以二进制形式存在,没有数据类型。在物理存储结构上,每个表中的每个列簇单独以一个文件存储,一个表可以有多个列簇(官方推荐小于等于3个)

时间戳(TimeStamp):是列的一个属性,是一个64位整数。由行键和列确定的单元格(Cell),可以存储多个数据,每个数据含有时间戳属性,数据具有版本特性。可根据版本(VERSIONS)或时间戳来指定查询历史版本数据,如果都不指定,则默认返回最新版本的数据

参考案例:

mysql数据库表:

转换对应成Hbase表:

2.3.Hbase整体架构

2.3.1.Client

通过 RPC机制与HMaster和HRegionServer进行通信Client与HMaster进行通信进行管理类操作Client与HRegionServer进行数据读写类操作 2.3.2.Zookeeper

保证任何时候,集群中只有一个运行状态的 Master,避免脑裂问题存储所有Region的寻址入口,包括**-ROOT-**表地址、HMaster地址实时监控Region Server的状态,将Region server的上线和下线信息,实时通知给Master 2.3.3.HMaster

可以启动多个HMaster,通过Zookeeper的Master Election机制保证有且仅有一个Master提供服务作用:

为Region server分配region负责region server的负载均衡发现失效的region serve并重新分配其上的regionHDFS上的垃圾文件回收 2.3.4.HRegionServer

Hbase中最核心的模块,主要负责响应用户I/O请求,向HDFS文件系统中读写数据作用:

维护Master分配给它的region,处理对这些region的IO请求负责切分在运行过程中变得过大的regionHRegionServer管理一些列HRegion对象,每个HRegion对应Table中一个Region,HRegion由多个HStore组成,每个HStore对应Table中一个Column Family的存储,Column Family就是一个集中的存储单元集 2.3.5.HStore

Hbase存储的核心,由MemStore和StoreFile组成。

用户写入数据的流程为:client写入 -> 存入MemStore,一直到MemStore满 -> Flush成一个StoreFile,直至增长到一定阈值 -> 触发Compact合并操作 -> 多个StoreFile合并成一个StoreFile,同时进行版本合并和数据删除 -> 当StoreFiles Compact后,逐步形成越来越大的StoreFile -> 单个StoreFile大小超过一定阈值后,触发Split操作,把当前Region Split成2个Region,原先的Region会下线,新Split出的2个孩子Region会被HMaster分配到相应的HRegionServer上,使得原先1个Region的压力得以分流到2个Region上。如图所示:

2.3.6.HRegion

一个表最开始存储的时候,是一个Region。

一个Region中会有个多个store,每个store用来存储一个列簇。如果只有一个column family,就只有一个store。Region会随着插入的数据越来越多,会进行拆分。默认大小是10G一个。

2.3.7.HLog

在分布式系统环境中,无法避免系统出错或者宕机,一旦HRegionServer意外退出,MemStore中的内存数据就会丢失,引入HLog就是防止这种情况。

工作机制:每个HRegionServer中都会有一个HLog对象,HLog是一个实现Write Ahead Log的类,每次用户操作写入Memstore的同时,也会写一份数据到HLog文件,HLog文件定期会滚动出新,并删除旧的文件(已持久化到StoreFile中的数据)。

当HRegionServer意外终止后,HMaster会通过Zookeeper感知,HMaster首先处理遗留的HLog文件,将不同Region的log数据拆分,分别放到相应Region目录下,然后再将失效的Region重新分配,领取到这些Region的HRegionServer在Load Region的过程中,会发现有历史HLog需要处理,因此会Replay HLog中的数据到MemStore中,然后flush到StoreFiles,完成数据恢复。

2.4.查询路由

Hbase中存有两张特殊的表,-ROOT-和.meta.:

.meta.:记录了用户表的Region信息,.meta.可以有多个Regoin-ROOT-:记录了.meta.表的Region信息,-ROOT-只有一个Region。Zookeeper中记录了-ROOT-表的location

客户端访问数据流程:

访问zookeeper->访问-ROOT-表->访问.meta.表->找到用户数据的位置,再去访问数据

3.Hbase集群部署 3.1.选择jdk版本和hadoop版本

参考地址:http://hbase.apache.org/1.2/book.html

jdk版本:

hadoop版本:

3.2.下载Hbase

参考地址:http://mirrors.hust.edu.cn/apache/hbase/

3.3.上传Hbase到linux主机,并解压
cd /export/softwares

tar -zxvf hbase-1.3.1-bin.tar.gz -C /export/servers

#解压完成后删除压缩包
rm -rf hbase-1.3.1-bin.tar.gz

#重新命名hbase
cd /export/servers
mv hbase-1.3.1 hbase131

3.4.修改配置文件 3.4.1.regionservers

说明:配置HRegionservers节点列表。

cd /export/servers/hbase131/conf
vi regionservers

node02
node03
3.4.2.hbase-env.sh

说明:配置jdk环境变量,配置集群通过zookeeper管理,不由hbase自己管理。

#配置jdk环境变量
export JAVA_HOME=/export/servers/jdk1.8.0_141

#配置集群交给zookeeper管理
export Hbase_MANAGES_ZK=false
3.4.3.hbase-site.xml

说明:hbase的核心配置文件。

vim hbase-site.xml


  
  
    hbase.rootdir
    hdfs://node01:8020/hbase
  
  
  
     hbase.cluster.distributed
     true
  
  
  
     hbase.master.port
     16000
  
  
  
    hbase.zookeeper.quorum
    node01,node02,node03
  
  
  
    hbase.zookeeper.property.clientPort
    2181
  

3.4.4. backup-masters

说明:配置备用master节点。backup-masters需要手动创建。

vim backup-masters

node02
3.4.5.拷贝hadoop中hdfs的核心配置文件到hbase
cp /export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop/hdfs-site.xml /export/servers/hbase131/conf/
cp /export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop/core-site.xml /export/servers/hbase131/conf/

3.5.把配置好的Hbase远程拷贝到其它主机节点
scp -r /export/servers/hbase131 root@node02:/export/servers

scp -r /export/servers/hbase131 root@node03:/export/servers

3.6.配置环境变量
vim /etc/profile

export Hbase_HOME=/export/servers/hbase131
export PATH=$PATH:${Hbase_HOME}/bin

#让环境变量生效
source /etc/profile
3.7.启动Hbase集群 3.7.1.系统时钟同步

说明:Hbase对于集群时间的同步要求非常严格,要求相差不要超过30秒。启动集群前一定要同步系统时间。

ntpdate -u 0.uk.pool.ntp.org
3.7.2.启动zookeeper集群
 /export/servers/zookeeper-3.4.5-cdh5.14.0/bin/zkServer.sh start

3.7.3.启动hdfs 3.7.4.启动hbase集群
/export/servers/hbase131/bin/start-hbase.sh 

3.7.5.访问hbase的web界面

地址:http://192.168.53.100:16010/master-status

3.7.6.如何启动单节点进程

说明:如果某些主机节点进程未启动正常,可以通过以下方式进行单独启动。

#单独启动master
hbase-daemon.sh start master

#单独启动regionserver
hbase-daemon.sh start regionserver
4.Hbase shell操作 4.1.shell交互界面整体操作 4.1.1.启动shell交互界面
cd /export/servers/hbase131/bin
#启动shell交互界面
hbase shell

#查看集群状态信息
status

#查看版本信息
version
4.1.2.退出shell交互界面
exit

4.1.3.查看帮助信息
#查看全部帮助信息
help

#查看指定的命令帮助信息
help "create"

图一:

图二:

4.2.操作名称空间namespace

说明:在hbase中,namespace相当于关系数据库的database。用于从逻辑上对表进行分组。hbase系统默认有两个缺省的名称空间:

hbase:系统内建表,包括namespace和meta表default:创建表时,如果不指定namespace的表 ,则在default名称空间下

#列出所有名称空间
list_namespace

#查看指定名称空间
describe_namespace 'hbase'

#创建名称空间
create_namespace 'my_ns'

#删除名称空间
drop_namespace 'my_ns'

4.3.操作表table 4.3.1.案例

说明:将关系数据库中的表,转换成hbase中的表进行操作。

mysql数据库表:

hbase表:

4.3.2.命令操作
#列出所有表
list

#创建表(create '名称空间:表名称','列簇')
create 'my_ns:user','base_info'

#插入数据(put '名称空间:表名称','行键','列簇:列名','数据')
put 'my_ns:user','rowkey_10','base_info:username','张三'
put 'my_ns:user','rowkey_10','base_info:birthday','2014-07-10'
put 'my_ns:user','rowkey_10','base_info:sex','1'
put 'my_ns:user','rowkey_10','base_info:address','北京市'

put 'my_ns:user','rowkey_16','base_info:username','张小明'
put 'my_ns:user','rowkey_16','base_info:birthday','2014-07-10'
put 'my_ns:user','rowkey_16','base_info:sex','1'
put 'my_ns:user','rowkey_16','base_info:address','北京'

put 'my_ns:user','rowkey_22','base_info:username','陈小明'
put 'my_ns:user','rowkey_22','base_info:birthday','2014-07-10'
put 'my_ns:user','rowkey_22','base_info:sex','1'
put 'my_ns:user','rowkey_22','base_info:address','上海'

put 'my_ns:user','rowkey_24','base_info:username','张三丰'
put 'my_ns:user','rowkey_24','base_info:birthday','2014-07-10'
put 'my_ns:user','rowkey_24','base_info:sex','1'
put 'my_ns:user','rowkey_24','base_info:address','河南'

put 'my_ns:user','rowkey_25','base_info:username','陈大明'
put 'my_ns:user','rowkey_25','base_info:birthday','2014-07-10'
put 'my_ns:user','rowkey_25','base_info:sex','1'
put 'my_ns:user','rowkey_25','base_info:address','西安'

#查询表中的数据
scan 'my_ns:user'

#统计表中行的数量
count  'my_ns:user'

#获取指定行键的数据
get 'my_ns:user','rowkey_10'

#获取指定列簇的数据
get 'my_ns:user','rowkey_10','base_info'

#获取指定列簇中某一列的数据
get 'my_ns:user','rowkey_10','base_info:username'

#获取指定列簇中多列数据
get 'my_ns:user', 'rowkey_10', {COLUMN => ['base_info:username','base_info:sex']}

#删除指定行键,指定列的数据
delete 'my_ns:user','rowkey_10','base_info:username'

#删除指定行键的所有数据
deleteall 'my_ns:user','rowkey_10'

#清空表的数据(慎用)
truncate 'my_ns:user'

#浏览表结构
describe 'my_ns:user'

#操作列簇
alter 'my_ns:user',NAME=>'ext_info'
alter 'my_ns:user','delete'=>'ext_info'

#测试表是否存在
exists 'my_ns:user'

#删除表(首先要禁用表)
disable 'my_ns:user'
drop 'my_ns:user'

5.Hbase Java API操作 5.1.搭建环境 5.1.1.创建项目

5.1.2.导入依赖


    4.0.0

    cn.liny
    spark-teach-day08-01hbase
    1.0-SNAPSHOT
    jar

    
        
        1.3.1
    

    
        
        
            org.apache.hbase
            hbase-client
            ${hbase.version}
        
    

    

5.1.3.准备hbase-site.xml配置文件

说明:在resources目录下创建hbase-site.xml,方便通过配置文件信息创建Hbase连接对象。




    
    
        hbase.zookeeper.quorum
        node01,node02,node03
    

5.2封装HbaseUtil工具类
package cn.liny.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.List;


public class HbaseUtil {

    // 集群配置信息
    private static final String ZK_CONNECT_KEY = "hbase.zookeeper.quorum";
    private static final String ZK_CONNECT_VALUE = "node01:2181,node02:2181,node03:2181";

    // 操作集群对象
    private static Connection conn = null;
    private static Admin admin = null;

    //集群环境信息=============================================================================start

    
    public static Connection getConnection(){
        // 1.创建一个可以用来管理hbase配置信息的conf对象
        Configuration conf = new Configuration();
        conf.set(ZK_CONNECT_KEY,ZK_CONNECT_VALUE);

        // 2.创建返回Connection对象
        try {
            if(conn == null){
                conn = ConnectionFactory.createConnection(conf);
            }
        } catch (IOException e) {
            System.out.println("创建Connection连接对象发生异常:"+e.getMessage());
        }

        return conn;
    }

    
    public static Admin getAdmin(){
        try {
            admin = conn.getAdmin();
        } catch (IOException e) {
            System.out.println("获取管理员Admin对象发生异常:"+e.getMessage());
        }

        return admin;
    }

    
    public static void closeConnection(){
        if(conn !=null){
            try {
                conn.close();
            } catch (IOException e) {
                System.out.println("释放Connection对象发生异常:"+e.getMessage());
            }
        }
    }

    //集群环境信息=============================================================================end

    //操作名称空间=============================================================================start
    
    public static void createNameSpace(String ns_name){
        // 标识名称空间是否存在,默认
        boolean isExists = false;

        try {
            // 查询已经存在的名称空间
            NamespaceDescriptor[] list = admin.listNamespaceDescriptors();
            for(NamespaceDescriptor ns:list){
                if(ns.getName().equals(ns_name)){
                    System.out.println("目标名称空间:"+ns_name+"已经存在");
                    isExists = true;
                    break;
                }
            }

            // 名称空间不存在,则创建新的名称空间
            if(!isExists){
                System.out.println("目标名称空间:"+ns_name+"不存在,正在准备创建");
                NamespaceDescriptor newNameSpace = NamespaceDescriptor.create(ns_name).build();
                admin.createNamespace(newNameSpace);
            }

        } catch (IOException e) {
            System.out.println("创建名称空间发生异常:"+e.getMessage());
        }

    }

    
    public static void deleteNameSpace(String ns_name){
        System.out.println("正在准备删除名称空间:"+ns_name);
        try {
            admin.deleteNamespace(ns_name);
        } catch (IOException e) {
            System.out.println("删除名称空间发生异常:"+e.getMessage());
        }
    }

    //操作名称空间=============================================================================end

    //表DDL操作=============================================================================start
    
    public static void createTable(String t_name,String ...family){
        // 创建表,必须要指定列簇
        if(family.length<=0){
            System.out.println("创建表,必须要指定列簇");
            return;
        }

        // 定义表名称
        TableName tableName = TableName.valueOf(t_name);
        try {
            // 判断表是否存在
            if(admin.tableExists(tableName)){
                System.out.println("表"+t_name+"已经存在,不需要创建");

                return;
            }

            // 如果表不存在,则执行创建
            System.out.println("表"+t_name+"不存在,正在创建");

            // 定义表的列簇集
            HTableDescriptor htd = new HTableDescriptor(tableName);
            // 添加列簇
            HColumnDescriptor tmpColumn = null;
            for(String hc:family){
                tmpColumn= new HColumnDescriptor(hc);
                htd.addFamily(tmpColumn);
            }

            // 创建表
            admin.createTable(htd);

        } catch (IOException e) {
            System.out.println("创建表发生异常:"+e.getMessage());
        }
    }

    
    public static void getTableInfo(String t_name) throws Exception{
        // 定义表名称对象
        TableName tableName = TableName.valueOf(t_name);

        // 判断表是否存在
        if(admin.tableExists(tableName)){
            // 获取表的列簇集
            HTableDescriptor htd = admin.getTableDescriptor(tableName);

            // 获取表的列簇
            HColumnDescriptor[] columnFamilies = htd.getColumnFamilies();

            // 打印表的详情信息
            for(HColumnDescriptor hcd:columnFamilies){
                System.out.println(hcd.getNameAsString());
            }
        }else{
            System.out.println("表"+tableName.getNameAsString()+"不存在。");
        }

    }

    
    public static void updateTable(String t_name,String family,boolean addOrDelete){

        // 定义表名称对象
        TableName tableName = TableName.valueOf(t_name);
        try{
            // 判断表是否存在
            if(admin.tableExists(tableName)){

                // 细节:对表进行修改,需要先禁用表,修改完成后再启用表
                admin.disableTable(tableName);

                // 获取表描述对象
                HTableDescriptor htd = admin.getTableDescriptor(tableName);

                // 判断是添加还是删除
                if(addOrDelete){
                    // 正在添加列簇
                    System.out.println("正在添加列簇:"+family);
                    // 定义列簇对象
                    HColumnDescriptor hcd = new HColumnDescriptor(family);
                    htd.addFamily(hcd);
                }else{
                    // 正在删除列簇
                    System.out.println("正在删除列簇:"+family);

                    htd.removeFamily(Bytes.toBytes(family));
                }

                // 修改表
                admin.modifyTable(tableName,htd);

            }else{
                System.out.println("表【"+tableName.getNameAsString()+"】不存在.");
            }
        }catch(Exception e){
            System.out.println("修改表发生异常:"+e.getMessage());
        }finally {
            // 重新启用表
            try {
                admin.enableTable(tableName);
            } catch (IOException e) {
                System.out.println("重新启用表发生异常:"+e.getMessage());
            }
        }

    }

    
    public static void deleteTable(String t_name){

        // 定义表名称对象
        TableName tableName = TableName.valueOf(t_name);

        try {
            // 判断表是否存在
            if(admin.tableExists(tableName)){

                // 禁用表
                admin.disableTable(tableName);

                // 删除表
                admin.deleteTable(tableName);

            }else{
                System.out.println("表"+tableName.getNameAsString()+"不存在.");
            }
        } catch (IOException e) {
            System.out.println("删除表发生异常:"+e.getMessage());
        }

    }

    //表DDL操作=============================================================================end

    //表DML操作=============================================================================start
    
    public static void put(String t_name,String rowKey,String family,String[] ck,String[] cv){

        // 定义表的名称对象
        TableName tableName = TableName.valueOf(t_name);


        try {
            // 获取表
            Table table = conn.getTable(tableName);

            // 根据行键创建Put对象
            
            Put put = new Put(Bytes.toBytes(rowKey));

            // 准备数据
            for(int i=0;i cells = result.listCells();
            for(Cell c:cells){
                //打印rowKey,family,qualifier,value
                System.out.println(Bytes.toString(CellUtil.cloneRow(c))// 行键
                                    + "==> " + Bytes.toString(CellUtil.cloneFamily(c))// 列簇
                                    + "{" + Bytes.toString(CellUtil.cloneQualifier(c))// 列
                                    + ":" + Bytes.toString(CellUtil.clonevalue(c)) + "}");// 值

            }

            // 释放资源
            table.close();

        } catch (IOException e) {
            System.out.println("根据rowKey获取数据发生异常:"+e.getMessage());
        }

    }

    
    public static void scan(String t_name,String ... start_end_rowKey){
        // 定义表的名称
        TableName tableName = TableName.valueOf(t_name);

        try {
            // 获取表
            Table  table = conn.getTable(tableName);

            // 定义扫描器
            Scan scan = new Scan();

            // 设置扫描范围
            if(start_end_rowKey.length>1){
                scan.setStartRow(Bytes.toBytes(start_end_rowKey[0]));
                scan.setStopRow(Bytes.toBytes(start_end_rowKey[1]));
            }

            // 获取扫描结果集
            ResultScanner scanner = table.getScanner(scan);
            Result result = null;
            while ((result = scanner.next()) != null){
                // 获取单元格
                List cells = result.listCells();

                // 打印 rowKey,family,qualifier,value
                for(Cell c:cells){
                    System.out.println(Bytes.toString(CellUtil.cloneRow(c))
                                        + "==> " + Bytes.toString(CellUtil.cloneFamily(c))
                                        + "{" + Bytes.toString(CellUtil.cloneQualifier(c))
                                        + ":" + Bytes.toString(CellUtil.clonevalue(c)) + "}");
                }

            }

            // 释放资源
            table.close();

        } catch (IOException e) {
            System.out.println("范围扫描数据发生异常:"+e.getMessage());
        }

    }

    
    public static void del(String t_name,String rowKey,String ... family_qualifier){
        // 定义表名称
        TableName tableName=TableName.valueOf(t_name);

        try {
            // 获取表
            Table  table = conn.getTable(tableName);

            // 创建删除对象
            Delete delete = new Delete(Bytes.toBytes(rowKey));

            // 明确删除某一列
            if(family_qualifier.length>1){
                delete.addColumn(Bytes.toBytes(family_qualifier[0]),Bytes.toBytes(family_qualifier[1]));
            }

            // 执行删除
            table.delete(delete);

            // 释放资源
            table.close();

        } catch (IOException e) {
            System.out.println("删除表数据发生异常:"+e.getMessage());
        }

    }

    //表DML操作=============================================================================end

    //Filter查询===========================================================================start
    
    public static void scanByFilter(String t_name,Filter filter){
        // 定义表的名称
        TableName tableName = TableName.valueOf(t_name);

        try {
            // 获取表
            Table table = conn.getTable(tableName);

            // 定义扫描器
            Scan scan = new Scan();
            scan.setFilter(filter);

            // 执行扫描
            ResultScanner scanner = table.getScanner(scan);
            Result result = null;
            while ((result = scanner.next()) != null){
                // 获取单元格
                List cells = result.listCells();

                // 打印 rowKey,family,qualifier,value
                for(Cell c:cells){
                    System.out.println(Bytes.toString(CellUtil.cloneRow(c))
                            + "==> " + Bytes.toString(CellUtil.cloneFamily(c))
                            + "{" + Bytes.toString(CellUtil.cloneQualifier(c))
                            + ":" + Bytes.toString(CellUtil.clonevalue(c)) + "}");
                }

            }

            // 释放资源
            table.close();

        } catch (IOException e) {
            System.out.println("过滤查询数据发生异常:"+e.getMessage());
        }

    }


    //Filter查询=============================================================================end

}
5.3.操作名称空间
package cn.liny.hbase;

import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;


public class HbaseNameSpace {

    public static void main(String[] args) throws Exception{
       // 1.获取连接对象
        Connection conn = HbaseUtil.getConnection();

        // 2.获取管理员对象Admin,执行管理任务操作
        Admin admin = HbaseUtil.getAdmin();

        // 3.操作名称空间namespace
        // 3.1.列出所有的名称空间
        System.out.println("-----------------3.1.列出所有名称空间--------------------");
        NamespaceDescriptor[] list = admin.listNamespaceDescriptors();
        for(NamespaceDescriptor ns:list){
            System.out.println("名称空间:"+ns.getName());
        }

        // 3.2.创建名称空间
        System.out.println("-----------------3.2.创建名称空间-------------------------");
        HbaseUtil.createNameSpace("my_ns1");

        // 3.3.删除名称空间
        System.out.println("-----------------3.3.删除名称空间-------------------------");
        HbaseUtil.deleteNameSpace("my_ns2");
        
        // 2.关闭连接对象
        HbaseUtil.closeConnection();

    }
}
5.4.操作表

5.4.1.表DDL操作
package cn.liny.hbase;

import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;


public class HbaseTableDDL {

    public static void main(String[] args) throws Exception {

        // 1.获取连接对象
        Connection conn = HbaseUtil.getConnection();

        // 2.获取管理员对象Admin,执行管理任务操作
        Admin admin = HbaseUtil.getAdmin();

        // 3.表DDL操作
        // 3.1.查询所有表
        System.out.println("----------------------3.1.查询所有表------------------------");
        TableName[] tableNames = admin.listTableNames();
        for(TableName t:tableNames){
            System.out.println("表名称:"+t.getNameAsString());
        }

        // 3.2.创建表
        System.out.println("----------------------3.2.创建表-----------------------------");
        HbaseUtil.createTable("student","base_info");

        // 3.3.获取表的详情信息
        System.out.println("----------------------3.3.获取表的详情信息---------------------");
        HbaseUtil.getTableInfo("student");

        // 3.4.修改表
        System.out.println("----------------------3.4.修改表-----------------------------");
        HbaseUtil.updateTable("student","ext_info",true);

        // 3.5.删除表
        System.out.println("----------------------3.5.删除表-----------------------------");
        HbaseUtil.deleteTable("student");


        // 关闭连接对象
        HbaseUtil.closeConnection();

    }
}
5.4.2.表DML操作
package cn.liny.hbase;

import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;


public class HbaseTableDML {

    public static void main(String[] args) throws Exception{

        // 1.获取连接对象
        Connection conn = HbaseUtil.getConnection();

        // 2.获取管理员对象Admin,执行管理任务操作
        Admin admin = HbaseUtil.getAdmin();

        // 3.表DML操作
        // 3.1.创建表
        System.out.println("----------------------3.1.创建表-----------------------------");
        HbaseUtil.createTable("my_ns:user","base_info");

        // 3.2.添加(更新)数据
        System.out.println("----------------------3.2.添加(更新)数据---------------------");
        // 准备数据
        String row_key="rowkey_10";
        String[] ck = {"username","sex","address","birthday"};
        String[] cv = {"张三","1","北京市","2014-07-10"};

        HbaseUtil.put("my_ns:user",row_key,"base_info",ck,cv);

        // 3.3.根据rowKey获取数据
        System.out.println("----------------------3.3.根据rowKey获取数据---------------------");
        HbaseUtil.get("my_ns:user",row_key);

        // 3.4.范围扫描数据
        System.out.println("----------------------3.4.范围扫描数据----------------------------");
        //HbaseUtil.scan("my_ns:user");// 全表扫描
        HbaseUtil.scan("my_ns:user","rowkey_16","rowkey_26");// 指定rowKey范围扫描

        // 3.5.删除数据
        System.out.println("----------------------3.5.删除数据----------------------------");
       // HbaseUtil.del("my_ns:user","rowkey_16");// 删除整行
        HbaseUtil.del("my_ns:user","rowkey_22","base_info","address");// 明确删除列
        
        // 关闭连接对象
        HbaseUtil.closeConnection();
    }
}
5.5.Hbase过滤查询 5.5.1.关于过滤查询

基础API中的查询操作,在面对大量数据的时候是非常苍白的,Hbase提供了高级的查询方法:Filter。

Filter根据簇、列、版本等更多的条件来对数据进行过滤,基于Hbase提供的三维有序(主键有序、列有序、版本有序),这些Filter可以高效的完成查询过滤的务。

要完成过滤的操作,至少需要两个参数:

抽象操作符

LESS :<

LESS_OR_EQUAL: <=

EQUAL: =

NOT_EQUAL: <>

GREATER :>

GREATER_OR_EQUAL :>=

NO_OP :排除所有

比较器

BinaryComparator :按字节索引顺序比较指定字节数组,采用 Bytes.compareTo(byte[])

BinaryPrefixComparator: 与BinaryComparator相同,仅比较左端的数据是否相同

NullComparator :判断给定的是否为空

BitComparator: 按位比较

SubstringComparator: 判断提供的子串是否出现在 value 中

5.5.2.过滤查询案例

以下案例演示了常用过滤器的使用:

行键过滤器:RowFilter列簇过滤器:FamilyFilter列过滤器:QualifierFilter值过滤器:ValueFilter行键前缀过滤器:PreFixFilter列前缀过滤器:ColumnPrefixFilter

package cn.liny.hbase;

import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.filter.*;


public class HbaseFilter {

    public static void main(String[] args) {

        // 1.获取连接对象
        Connection conn = HbaseUtil.getConnection();

        // 2.行键过滤器:RowFilter
        System.out.println("----------------------2.行键过滤器:RowFilter-----------------------------");
        // 定义过滤器
        Filter rowFilter = new RowFilter(CompareFilter.CompareOp.GREATER, new BinaryComparator("rowkey_10".getBytes()));
        HbaseUtil.scanByFilter("my_ns:user",rowFilter);

        //3.列簇过滤器: FamilyFilter
       System.out.println("----------------------3.列簇过滤器: FamilyFilter-----------------------------");
        // 定义过滤器
        Filter familyFilter = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator("ext_info".getBytes()));
        HbaseUtil.scanByFilter("my_ns:user",familyFilter);

        //4.列过滤器 :QualifierFilter
        System.out.println("----------------------4.列过滤器 :QualifierFilter-----------------------------");
        // 定义过滤器
        Filter qualifierFilter = new QualifierFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator("username".getBytes()));
        HbaseUtil.scanByFilter("my_ns:user",qualifierFilter);

        //5.值过滤器 :ValueFilter
        System.out.println("----------------------5.值过滤器 :ValueFilter-----------------------------");
        // 定义过滤器
        Filter valueFilter = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("张三"));
        HbaseUtil.scanByFilter("my_ns:user",valueFilter);

        // 6.前缀过滤器:PrefixFilter----针对行键
        System.out.println("----------------------6.前缀过滤器:PrefixFilter----针对行键-----------------------------");
        // 定义过滤器
        Filter prefixFilter = new PrefixFilter("rowkey_1".getBytes());
        HbaseUtil.scanByFilter("my_ns:user",prefixFilter);

        // 7.列前缀过滤器:ColumnPrefixFilter
        System.out.println("----------------------7.列前缀过滤器:ColumnPrefixFilter-----------------------------");
        // 定义过滤器
        Filter columnPrefixFilter = new ColumnPrefixFilter("a".getBytes());
        HbaseUtil.scanByFilter("my_ns:user",columnPrefixFilter);

        // 关闭连接对象
        HbaseUtil.closeConnection();
    }
}
6.Hbase的rowkey设计原则

Hbase是三维有序存储的,通过rowkey(行键),column key(column family和qualifier)和TimeStamp(时间戳)这个三个维度可以对Hbase中的数据进行快速定位。

Hbase中rowkey可以唯一标识一行记录,三种查询方式

通过get方式,指定rowkey获取唯一一条记录通过scan方式,设置startRow和stopRow参数进行范围匹配全表扫描,即直接扫描整张表中所有行记录 6.1.Rowkey长度原则

rowkey是一个二进制码流,可以是任意字符串,最大长度64kb,实际应用中一般为10-100bytes,以byte[]形式保存,一般设计成定长。

建议越短越好,不要超过16个字节

案例说明:

数据的持久化文件HFile中是按照Key/Value存储的,如果rowkey过长,比如超过100字节,1000w行数据,光rowkey就要占用100*1000w=10亿个字节,将近1G数据,这样会极大影响HFile的存储效率;MemStore将缓存部分数据到内存,如果rowkey字段过长,内存的有效利用率就会降低,系统不能缓存更多的数据,这样会降低检索效率。

目前操作系统都是64位系统,内存8字节对齐,控制在16个字节,8字节的整数倍利用了操作系统的最佳特性。

6.2.Rowkey 散列原则

如果rowkey按照时间戳的方式递增,不要将时间放在二进制码的前面,建议将rowkey的高位作为散列字段,由程序随机生成,低位放时间字段,这样将提高数据均衡分布在每个RegionServer,以实现负载均衡的几率。

如果没有散列字段,首字段直接是时间信息,所有的数据都会集中在一个RegionServer上,这样在数据检索的时候负载会集中在个别的RegionServer上,造成热点问题,会降低查询效率。

6.3.Rowkey唯一原则

必须在设计上保证其唯一性,rowkey是按照字典顺序排序存储的,因此,设计rowkey的时候,要充分利用这个排序的特点,将经常读取的数据存储到一块,将最近可能会被访问的数据放到一块。但是这里的量不能太大,如果太大需要拆分到多个节点上去 。

6.4.具体设计参考 6.4.1.加盐

加盐指的是在rowkey的前面分配随机数,当给rowkey随机前缀后,它就能分布到不同的region中,这里的前缀应该和你想要数据分散的不同的region的数量有关。

案例需求

以电信公司为例 。当我们去电信公司打印通话记录。对于通话记录来说,每个人每月可能都有很多通话记录,同时电信的用户以亿计。这样的海量信息,可以考虑存入hbase当中 。那么在hbase中如何设计这样一张表呢?

确定需要保存的信息

对于通话记录 ,有主叫和被叫 信息,有通话时长,有通话时间 ,有主叫的位置信息,和被叫的位置信息 。那么可以确定要保存的信息:主叫、被叫、时长、时间、主叫位置、被叫位置

如何设计hbase表

hbase表是依靠rowkey来定位的,我们应该尽可能多的将查询的信息编入rowkey当中。设计通话记录的rowkey时,需要将能唯一确定该条记录的数据编入rowkey当中:主叫、被叫、时间

17766657999 18699886666 201806121502 #主叫,被叫,时间

该rowkey合理吗

该rowkey可能会造成热点问题。例如电话以177开头,hbase集群有100台,那么该数据可能只被存入一台或者两台机器的region中。当需要打印通话记录时,就只有一台机器提供服务。而如果数据均匀分散到100机器中,则是整个集群提供服务。两者之间效率速度差了不止一个数量级 。

因此我们可以考虑:hbase集群有100台,可以把在0-99之间中的随机数字,添加到rowkey首部 。

10 17766657999 18699886666 201806121502 #随机数,主叫,被叫,时间
6.4.2.预分区

hbase会自动处理region拆分,当region的大小到达一定阈值后,region将被拆分成两个。这个过程当中,会出现两个问题:

第一点还是热点问题,数据会继续往一个region中写,出现写热点问题第二点是拆分合并风暴,当用户的region大小以恒定的速度增长,region的拆分会在同一时间发生,因为同时需要压缩region中的存储文件,这个过程会重写拆分后的region,这将会引起磁盘I/O上升 。

要解决以上问题,预分区(预先创建hbase表分区 )是一种很好的方案,可以和加盐结合起来使用。使用预分区需要明确rowkey的取值范围和构成逻辑 。

案例需求:

还是上面电信电话详单表 ,通过加盐我们得到的 rowkey构成是:随机数+主叫+被叫+时间 ,如果当前并没有100台机器,只有10台。但是预计未来将扩展到100台的规模。那么仍然设计0到99的随机数,将以主叫177开头的通话记录分配到十个region当中,将随机数均分成十个区域,范围如下:

-10,10-20,20-30,30-40,40-50,50-60,60-70,70-80,80-90,90-

将预分区存入数组当中,当插入数据时,先根据插入数据的首部随机数,判断分区位置,再进行插入数据。这样也能使得各台节点负载均衡。

6.4.3.哈希

哈希会使同一行永远用一个前缀加盐。哈希也可以使负载分散到整个集群。同时读是可以预测的。使用确定的哈希可以让客户端重构完整的rowkey,可以使用get操作准确获取某一个行数据

6.4.4.反转

对于手机号码,比较固定开头的rowkey(177,159,188),而后几位都是随机的,没有规律的。可以将手机号反转之后作为rowkey,避免热点问题 。

这就是rowkey设计的另一种方法:反转。通过反转固定长度或者数字格式的rowkey ,把rowkey中经常改变的部分(最没有意义的部分)放在前面 。可以有效的随机rowkey ,不过这样一来就牺牲了rowkey的有序性 。

7.MapReduce操作Hbase 7.1.创建项目

7.2.导入依赖


    4.0.0

    cn.liny
    spark-teach-day08-02hbase-hdfs
    1.0-SNAPSHOT

    jar

    
        
        1.3.1
    

    
        
        
            org.apache.hbase
            hbase-client
            ${hbase.version}
        
        
            org.apache.hbase
            hbase-server
            ${hbase.version}
        
    

    

7.3.MapReduce从hdfs读取数据到Hbase 7.3.1.案例代码
package cn.liny.hbase.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;


public class HbaseFromHDFS {

    public static void main(String[] args) throws Exception{

        // 创建配置对象
        Configuration conf = HbaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181");

        // 创建Job
        Job job = Job.getInstance(conf,"HbaseFromHDFS");
        job.setJarByClass(HbaseFromHDFS.class);

        // 设置map
        job.setMapperClass(HDFSToHbaseMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        // 设置输入目录
        FileInputFormat.addInputPath(job, new Path(args[0]));
        TableMapReduceUtil.initTableReducerJob(args[1],HDFSToHbaseReducer.class, job);
        job.waitForCompletion(true);

    }

    // HDFSToHbaseMapper
    public static class HDFSToHbaseMapper extends Mapper {

        private Text outKey = new Text();
        private Text outValue = new Text();

        // map函数
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] splits = value.toString().split(" ");


            outKey.set(splits[0]);
            outValue.set(splits[1]+" "+splits[2]+" "+splits[3]+" "+splits[4]);

            context.write(outKey, outValue);
        }

    }

    // HDFSToHbaseReducer
    public static class HDFSToHbaseReducer extends TableReducer {

        // reduce函数
        protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

            Put put = new Put(key.getBytes());

            String family="base_info";
            for (Text v : values) {
                String[] split = v.toString().split(" ");

                if(split[0]!=null && !"NULL".equals(split[0])){
                    put.addColumn(family.getBytes(),"username".getBytes(),split[0].getBytes());
                }

                if(split[1]!=null && !"NULL".equals(split[1])){
                    put.addColumn(family.getBytes(),"sex".getBytes(),split[1].getBytes());

                }

                if(split[2]!=null && !"NULL".equals(split[2])){
                    put.addColumn(family.getBytes(),"birthday".getBytes(),split[2].getBytes());

                }

                if(split[3]!=null && !"NULL".equals(split[3])){
                    put.addColumn(family.getBytes(),"address".getBytes(),split[3].getBytes());

                }

            }

            // 输出
           context.write(NullWritable.get(),put);
        }
    }
}
7.3.2.配置执行环境

准备数据文件

user.txt:

rowkey_10 张三 1 2014-07-10 北京市
rowkey_16 张小明 1 2014-07-10 北京
rowkey_22 陈小明 1 2014-07-10 上海
rowkey_24 张三丰 1 2014-07-10 河南
rowkey_25 陈大明 1 2014-07-10 西安

上传到hdfs:

#创建hdfs目录
hdfs dfs -mkdir -p /anan/hbase/input

#上传文件
hdfs dfs -put /usr/local/develop/testdata/user.txt /anan/hbase/input

创建hbase表

create 'my_ns:huser','base_info'

配置hadoop依赖hbase相关包

vi /usr/local/develop/hadoop274/etc/hadoop/hadoop-env.sh

#配置HADOOP_CLASSPATH环境变量,指定hbase依赖包目录
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/usr/local/develop/hbase131/lib
public class HbaseToHDFS {

    public static void main(String[] args) throws Exception{
        // 创建配置对象
        Configuration conf = HbaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181");

        // 创建Job
        Job job = Job.getInstance(conf,"HbaseToHDFS");
        job.setJarByClass(HbaseToHDFS.class);

        Scan scan = new Scan();
        TableMapReduceUtil.initTableMapperJob(args[0], scan, HbaseToHDFSMapper.class, Text.class, Text.class, job);

        // 设置输入目录
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.waitForCompletion(true);

    }

    // mapper
    public static class HbaseToHDFSMapper extends TableMapper {

        Text outKey = new Text();
        Text outValue = new Text();

        
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {

            StringBuilder builder =new  StringBuilder();

            boolean isContainsColumn = value.containsColumn("base_info".getBytes(),"username".getBytes());
            if(isContainsColumn) {
                List listCells = value.getColumnCells("base_info".getBytes(), "username".getBytes());
                Cell cell = listCells.get(0);

                byte[] clonevalue = CellUtil.clonevalue(cell);
                builder.append(Bytes.toString(clonevalue)).append(" ");

            }

           isContainsColumn = value.containsColumn("base_info".getBytes(),"sex".getBytes());
            if(isContainsColumn) {
                List listCells = value.getColumnCells("base_info".getBytes(), "sex".getBytes());
                Cell cell = listCells.get(0);

                byte[] clonevalue = CellUtil.clonevalue(cell);
                builder.append(Bytes.toString(clonevalue)).append(" ");

            }

            isContainsColumn = value.containsColumn("base_info".getBytes(),"birthday".getBytes());
            if(isContainsColumn) {
                List listCells = value.getColumnCells("base_info".getBytes(), "birthday".getBytes());
                Cell cell = listCells.get(0);

                byte[] clonevalue = CellUtil.clonevalue(cell);
                builder.append(Bytes.toString(clonevalue)).append(" ");

            }

            isContainsColumn = value.containsColumn("base_info".getBytes(),"address".getBytes());
            if(isContainsColumn) {
                List listCells = value.getColumnCells("base_info".getBytes(), "address".getBytes());
                Cell cell = listCells.get(0);

                byte[] clonevalue = CellUtil.clonevalue(cell);
                builder.append(Bytes.toString(clonevalue)).append(" ");

            }

            // 输出
            outKey.set(Bytes.toString(key.get()));
            outValue.set(builder.toString());
            context.write(outKey,outValue);

        }

    }

    //reducer
    public static class HbaseToHDFSReducer extends Reducer {

        Text outValue = new Text();

        protected void reduce(Text key, Iterable values,Context context)
                throws IOException, InterruptedException {

            outValue.set(values.toString());
            context.write(key, outValue);
        }

    }
}
7.4.2.打包,提交任务执行

hadoop提交任务:

hadoop jar /usr/local/develop/testjar/spark-teach-day08-02hbase-hdfs-1.0-SNAPSHOT.jar 
cn.itheima.hbase.hdfs.HbaseToHDFS 
my_ns:huser /anan/hbase/output

执行结果:

hdfs dfs -cat /anan/hbase/output/part-r-00000

8.spark操作Hbase 8.1.创建项目

8.2.导入依赖


    4.0.0

    cn.liny
    spark-teach-day08-03hbase-spark
    1.0-SNAPSHOT

    jar

    
        2.11.8
        2.0.2
        
        1.3.1
    

    
        
        
            org.scala-lang
            scala-library
            ${scala.version}
        
        
        
            org.apache.spark
            spark-streaming_2.11
            ${spark.version}
        
        
        
            org.apache.hbase
            hbase-client
            ${hbase.version}
        
        
            org.apache.hbase
            hbase-server
            ${hbase.version}
        
    

    
        
            
            
                org.scala-tools
                maven-scala-plugin
                2.15.2
                
                    
                        
                            compile
                        
                    
                
            
            
            
                maven-assembly-plugin
                
                    
                        
                            cn.itheima.hbase.spark.SparkToHbase
                        
                    
                    
                        jar-with-dependencies
                    
                
            
        
    

    

8.3.spark写入数据到Hbase 8.3.1.编写案例代码
package cn.liny.hbase.spark

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HbaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object SparkToHbase {

  // 执行入口
  def main(args: Array[String]): Unit = {

    // 1.创建Hbase配置
    val conf: Configuration = HbaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181")

    // 2.指定输出格式和输出表名
    val jobConf: JobConf = new JobConf(conf,this.getClass)
    jobConf.setOutputFormat(classOf[TableOutputFormat])
    jobConf.set(TableOutputFormat.OUTPUT_TABLE,"my_ns:s_user")

    // 3.创建SparkContext对象
    // 本地测试使用
    val sparkConf: SparkConf = new SparkConf().setAppName("SparkToHbase").setMaster("local[2]")
    //打包到集群环境使用
    //val sparkConf: SparkConf = new SparkConf().setAppName("SparkToHbase")
    val sc: SparkContext = new SparkContext(sparkConf)
    sc.setLogLevel("WARN")

    // 构建RDD数据
    val dataRDD = List(("rowkey_1","lilei","16"), ("rowkey_2","hanmeimei","17"), ("rowkey_3","lucy","18"))
    val resultRDD: RDD[(ImmutableBytesWritable, Put)] = sc.parallelize(dataRDD).map(convert)

    // 写入Hbase表
    resultRDD.saveAsHadoopDataset(jobConf)

  }

  // 定义元组转换函数
  // (1,"lilei",18)
  def convert(tuple: (String, String, String)) = {
    // 定义put
    val put = new Put(Bytes.toBytes(tuple._1))

    // 添加列数据
    put.addColumn(Bytes.toBytes("base_info"),Bytes.toBytes("name"),Bytes.toBytes(tuple._2))
    put.addColumn(Bytes.toBytes("base_info"),Bytes.toBytes("age"),Bytes.toBytes(tuple._3))

    // 返回
    (new ImmutableBytesWritable, put)
  }

}
8.3.2.配置执行环境

创建hbase表

create 'my_ns:s_user','base_info'

#查询数据
scan 'my_ns:s_user'

如果要在集群环境使用,则需要启动spark集群,提交命令如下:

spark-submit --class cn.itheima.hbase.spark.SparkToHbase 
--master spark://hadoop01:7077 
--executor-memory 1g --total-executor-cores 1 
/usr/local/develop/testjar/spark-teach-day08-03hbase-spark-1.0-SNAPSHOT-jar-with-dependencies.jar
8.3.3.执行结果

8.4.spark从Hbase读取数据 8.4.1.编写案例代码
package cn.liny.hbase.spark

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HbaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object SparkFromHbase {

  // 执行入口
  def main(args: Array[String]): Unit = {

    // 1.创建Hbase配置
    val conf: Configuration = HbaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", "hadoop01:2181,hadoop02:2181,hadoop03:2181")

    // 2.设置查询的表名
    conf.set(TableInputFormat.INPUT_TABLE, "my_ns:s_user")

    // 3.创建SparkContext对象
    val sparkConf: SparkConf = new SparkConf().setAppName("SparkFromHbase").setMaster("local[2]")
    // 设置序列化
    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    sparkConf.registerKryoClasses(Array(classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable]))

    val sc: SparkContext = new SparkContext(sparkConf)
    sc.setLogLevel("WARN")

    // 4.通过SparkContext 提供的newAPIHadoopRDD获取内容
    val dataRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
      classOf[ImmutableBytesWritable],
      classOf[Result])

    //5.遍历输出
    dataRDD.foreach{ case (_,result) =>
      val key = Bytes.toString(result.getRow)
      val name = Bytes.toString(result.getValue("base_info".getBytes,"name".getBytes))
      val age = Bytes.toString(result.getValue("base_info".getBytes,"age".getBytes))

      println("Row key:"+key+",name="+name+",age="+age)
    }

  }

}
8.4.2.执行结果

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/775106.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号