栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

大数据学习教程SD版第十六篇【Hbase】

大数据学习教程SD版第十六篇【Hbase】

Hbase 列式数据库

分布式 可扩展 基于HDFS的海量数据存储

K-V 存储 ,更像多维Map,稀疏存储

底层全是字节码存储,所以数据没有类型一说

本身不能做数据分析

1. Hbase 简介
  • 逻辑结构

r列 --rf列族 表纵向切分 Store --rs行键 – region表横向切分 切片

  • 物理存储结构

RowKey 行键 --Column Family 列族 --Column Qualifier 列 --TimeStamp 时间戳 --Type 操作类型 --Value 值

  • 数据模型
  1. Name Space --> Database
  2. Region --> Table
  3. Row --> 一个Rowkey+ 多个列
  4. Column --> Column
  5. Time Stamp --> Version
  6. Ceil --> {rowkey,columnfamily,columnqualifier,timestamp} 最小单元
2. Hbase 简单架构
  1. RegionServer DML
    1. Data:get 、put、delete
    2. Region:split、compact
  2. Master DDL
    1. Table:create 、delete 、alter
    2. RegionServer : 分配 regions 到 每个 RgionServer
3. Hbase 安装

先要启动 HDFS 、ZK ,hbase-default.xml 在hbase-common的jar包下

  1. 下载并解压安装包
  2. 修改配置文件

hbase-env.sh

export JAVA_HOME=/opt/module/jdk1.8.0_144
#export Hbase_MASTER_OPTS="$Hbase_MASTER_OPTS -XX:PermSize=128m -XX:MaxPermSize=128m"
#export Hbase_REGIONSERVER_OPTS="$Hbase_REGIONSERVER_OPTS -XX:PermSize=128m -XX:MaxPermSize=128m"
export Hbase_MANAGES_ZK=false

hbase-site.xml

  
    hbase.rootdir
    hdfs://hadoop102:8020/hbase/
    
    
    hbase.cluster.distributed
    true
  
    
    hbase.zookeeper.quorum
    hadoop102,hadoop103,hadoop104
  
    
    hbase.zookeeper.property.dataDir
    /opt/module/zookeeper-3.5.7/zkData
  

regionservers

hadoop102
hadoop103
hadoop104
  1. 设置hadoop的core,hdfs-site.xml软连接到hbase下
ln -s /opt/module/hadoop-2.7.1/etc/hadoop/core-site.xml /opt/module/hbase-1.3.1/conf/
ln -s /opt/module/hadoop-2.7.1/etc/hadoop/hdfs-site.xml /opt/module/hbase-1.3.1/conf/
  1. 分发配置

  2. 启动

bin/start-hbase.sh
4. Hbase Shell
bin/hbase shell
# DDL
## 1.query
list
describe 'test1'
## 2.create
create 'test1','info'
## 3.update
alter 'test1', {NAME => 'info', VERSIONS => '3'}
## 4.delete
disable 'test1'
drop 'test1'
## ns 删除ns,必须先删除ns下所有table
list_namespace
list_namespace_tables 'default'

# DML
## 1.put
put 'test1','1001','info:id','001'
## 2.get、scan: 获取最新时间戳(版本)的数据
get 'test1','1001'
get 'test1','1001','info'
get 'test1','1001','info:id'
scan 'test1'
scan 'test1',{STARTROW => '1001',STOPROW => '1003'}
## 扫描10个版本内的数据
scan 'test1',{ RAW => true  ,VERSIONS => 10 }
## 3.update:加一条最新时间戳的数据,不会删除原来版本数据
put 'test1','1003','info:name','wangwuwu'
## 4.delete :标记删除
delete 'test1','1001','info:age'
deleteall 'test1','1001'
truncate 'test1'

##多 version 存储与查询
alter 'test2',{ NAME => 'info', VERSIONS => 3 }
get 'test2', '1001', {COLUMN => 'info:name',VERSIONS => 3}

COLUMN                                    CELL
 info:name                                timestamp=1640829889032, value=ccc
 info:name                                timestamp=1640829885058, value=bbb
 info:name                                timestamp=1640829878870, value=aaa
 
 # flush -> hdfs
 flush 'test2'
5. Hbase 详细架构

  • 三大组件
  1. HDFS

    • HDFS Client 把StoreFile 存储在DataNode
    • HDFS DataNode 实际存储数据
  2. Hbase

  • HMaster 元数据管理,RegionServer集群管理
  • HRegionServer
    • HLog 预写日志,记录数据操作
    • HRegion 表级数据,表的切片
      • Store 列族级别数据
        • MemoStore 内存数据
        • StoreFile 磁盘数据
          • HFile 存储格式K-V
  1. ZK
    • 与Client交互,对接数据的操作
    • 与HMater交互 ,存储元数据位置
6. Hbase 读写流程

读比写慢

6.1 Hbase 写数据

如果缓存中有对应信息的话,直接进行写操作即可

  1. Client 向ZK 请求 meta表 位置
  2. Client 获取到meta 所在节点,请求meta 信息,数据存在哪个节点,写入缓存
  3. Client 获取到 put数据 所在节点,请求put数据
  4. put 数据的节点 把数据先写入WAL和MemoStore,并返回ack
6.2 Hbase Flush

从 memostore 刷写到 Storefile,刷写时机

  1. Regionserver大小级别:默认 headp 0.4 , 0.95
        hbase.regionserver.global.memstore.size  
         
        hbase.regionserver.global.memstore.size.lower.limit  
          
  1. Region大小级别:默认128M
        hbase.hregion.memstore.flush.size  
        134217728  
  1. 时间级别:默认1h
        hbase.regionserver.optionalcacheflushinterval  
        3600000  
6.3 Hbase 读数据
  1. Clinet 向 ZK 请求 meta 位置
  2. Client 向meta 所在节点请求 数据所在节点
  3. Client 向数据所在节点请求 读操作
  4. 读的是memostore+storefile 数据,只不过会把storefile 加载进block cache,与memostore 数据比较时间戳,返回最大时间戳的数据
6.4 Hbase Compact

Hfile 文件合并

  1. Minor Compactions 大量小文件合并,不会删除小版本数据
  2. Major Compactions 大文件合并,会删除数据 默认7天
6.5 Hbase Split

Hbase 自己切分会导致数据倾斜 64M -> 256M -> …… 10G 官方建议一个列族,防止出现小文件

7. Hbase API 7.1 DDL API
package com.ipinyou.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;

import java.io.IOException;

public class APITest {

    private static Connection conn;
    private static Admin admin;

    static {
        Configuration conf = HbaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104");
        try {
            conn = ConnectionFactory.createConnection(conf);
        } catch (IOException e) {
            e.printStackTrace();
        }
        try {
            admin = conn.getAdmin();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    
    public static boolean isTableExists(String tableName) throws IOException {
        boolean exists = admin.tableExists(TableName.valueOf(tableName));
        return exists;
    }

    
    public static void createTable(String tableName, String... family) throws IOException {

        if (isTableExists(tableName)) {
            return;
        }

        HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
        for (String f : family) {
            HColumnDescriptor familyDesc = new HColumnDescriptor(f);
            tableDesc.addFamily(familyDesc);
        }
        admin.createTable(tableDesc);
    }

    
    public static void dropTable(String tableName) throws IOException {
        if (!isTableExists(tableName)) {
            return;
        }
        admin.disableTable(TableName.valueOf(tableName));
        admin.deleteTable(TableName.valueOf(tableName));
    }

    
    public static void createNameSpace(String ns) {
        NamespaceDescriptor nsDesc = NamespaceDescriptor.create(ns).build();
        try {
            admin.createNamespace(nsDesc);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    public static void main(String[] args) throws IOException {
        System.out.println(isTableExists("bigdata:test4"));
//        createTable("bigdata:test4", "info1", "info2");
//        dropTable("test4");
//        createNameSpace("bigdata");
        System.out.println(isTableExists("bigdata:test4"));

    }

}
7.2 DML API

关于删除级别与删除标记:

  1. RowKey级别 DeleteFamily
  2. Rowkey+CF级别 DF
  3. RowKey+CF+CN级别 DeleteColumn
package com.ipinyou.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;

public class APITest2 {

    private static Connection conn;

    static {
        Configuration conf = HbaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104");
        try {
            conn = ConnectionFactory.createConnection(conf);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    
    public static void putData(String tableName, String rowKey, String family, String column, String value) throws IOException {
        Table table = conn.getTable(TableName.valueOf(tableName));

        Put put = new Put(Bytes.toBytes(rowKey));
        put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(value));

        table.put(put);
        table.close();
    }

    
    public static void getData(String tableName, String rowKey, String family, String column) throws IOException {
        Table table = conn.getTable(TableName.valueOf(tableName));
        Get get = new Get(Bytes.toBytes(rowKey));
//        get.addFamily(Bytes.toBytes(family));
        get.addColumn(Bytes.toBytes(family), Bytes.toBytes(column));
        Result result = table.get(get);
        Cell[] cells = result.rawCells();
        for (Cell cell : cells) {
            String cf = Bytes.toString(CellUtil.cloneFamily(cell));
            String c = Bytes.toString(CellUtil.cloneQualifier(cell));
            String v = Bytes.toString(CellUtil.clonevalue(cell));
            System.out.println("Family:" + cf + " Column:" + c + " Value:" + v);
        }
    }

    
    public static void scanData(String tableName) throws IOException {
        Table table = conn.getTable(TableName.valueOf(tableName));
        Scan scan = new Scan();
        ResultScanner resultScanner = table.getScanner(scan);
        for (Result result : resultScanner) {
            Cell[] cells = result.rawCells();
            for (Cell cell : cells) {
                String row = Bytes.toString(CellUtil.cloneRow(cell));
                String cf = Bytes.toString(CellUtil.cloneFamily(cell));
                String c = Bytes.toString(CellUtil.cloneQualifier(cell));
                String v = Bytes.toString(CellUtil.clonevalue(cell));
                System.out.println("Rowkey:" + row + " Family:" + cf + " Column:" + c + " Value:" + v);
            }
        }
    }

    
    public static void deleteData(String tableName, String rowKey, String cf, String column) throws IOException {
        Table table = conn.getTable(TableName.valueOf(tableName));
        Delete delete = new Delete(Bytes.toBytes(rowKey));

        table.delete(delete);
    }


    public static void main(String[] args) throws IOException {
//        putData("bigdata:test4", "1001", "info1", "age", "18");
//        getData("bigdata:test4", "1001", "info1", "name");
        scanData("bigdata:test4");
//        deleteData("bigdata:test4", "1003", "", "");
//        scanData("bigdata:test4");
    }

}
8. Hbase MR

Hbase 借助 MapReduce 来实现数据分析

# 1.准备工作

# 1.1 在 hadoop-env.sh hadoop_classpath for循环下面添加
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase-1.3.1/lib/*
# 1.2 加一个参数变量
export Hbase_HOME=/opt/module/hbase-1.3.1

# 重启集群
  1. 统计表行数(官方自带)
yarn jar lib/hbase-server-1.3.1.jar rowcounter {tablename}
  1. 导入数据到Hbase Table(官方自带)
# 需要创建table
create 'fruit','info'

yarn jar lib/hbase-server-1.3.1.jar importtsv -Dimporttsv.columns=Hbase_ROW_KEY,info:name,info:color fruit hdfs://hadoop102:8020/test.tsv
  1. 当然也可以自定义MapReduce程序
# 按照官方文档实例编写
https://hbase.apache.org/book.html#mapreduce
9. Hbase Hive

Hbase 借助于Hive来实现数据分析

# 1.copy hbase jar -> hive 
# 2. 把zk信息配置到hive-site.xml中

 
    hive.zookeeper.quorum
    hadoop102,hadoop103,hadoop104
  
  
    hive.zookeeper.client.port
    2181
  

# 3. 在hive中创建hbase关联的表

create table hive_hbase_test(id string,name string, age int) 
stored by 'org.apache.hadoop.hive.hbase.HbaseStorageHandler' 
with serdeproperties("hbase.columns.mapping" = ":key,cf:name,cf:age")
tblproperties ("hbase.table.name" = "hive_hbase_test");

# 4.在hive表中进行查询分析即可
10. Hbase 存储优化

Hbase 自带高可用,后续启动的Mater后自动变成Standby状态

可以在conf下新建 backup-masters文件,写入hadoop103 hadoop104,会在启动时启动三个Master,两个处于备份状态

10.1 建表预分区

按照字典排序和比较

# 分5个区:根据数据量适当分
create 't1', 'f1', SPLITS => ['10', '20', '30', '40']
10.2 Rowkey设计

设计Rowkey 尽量均匀散列分布到每个预分区中

  • 散列Rowkey
    1. 随机数、hash
    2. 反转、拼接(加盐)
10.3 基础参数
  1. Hstore大小,默认10G
  2. flush,compact,split大小
    eHandler’
    with serdeproperties(“hbase.columns.mapping” = “:key,cf:name,cf:age”)
    tblproperties (“hbase.table.name” = “hive_hbase_test”);
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/698960.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号