栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

大数据高级开发工程师——HBase学习笔记(2)

大数据高级开发工程师——HBase学习笔记(2)

文章目录
  • 大数据数据库之Hbase
    • Hbase架构原理
      • Hbase的数据存储原理
      • Hbase读数据流程
      • Hbase写数据流程
    • Hbase的flush、compact机制
      • Flush触发条件
        • 1. MemStore级别限制
        • 2. Region级别限制
        • 3. HRegionServer级别限制
        • 4. HLog数量上限
        • 5. 定期刷新MemStore
        • 6. 手动Flush
      • Flush流程
      • Compact合并机制
        • 1. minor compaction 小合并
        • 2. major compaction 大合并
    • Hbase表的预分区
      • 为何要预分区?
      • 预分区原理
      • 手动指定预分区
        • 方式一
        • 方式二
        • 方式三
    • Region 合并
      • region 合并说明
      • 如何进行region合并
        • 1. 通过Merge类冷合并Region
        • 2. 通过online_merge热合并Region
    • Hbase集成MapReduce
      • 实战一
        • 1. 需求
        • 2. 实现
      • 实战二:hdfs => hbase
        • 1. 需求
        • 2. 编码实现
      • 实战三:bulkload
        • 1. 需求
        • 2. 实现原理
        • 3. 编码实现
    • Hbase集成Hive
      • Hbase与Hive的对比
      • 整合配置
        • 1. 拷贝jar包
        • 2. 修改hive配置文件
        • 3. 修改hive-env.sh配置文件
      • 需求一:将hive表当中分析的结果保存到hbase表当中
        • 1. hive中创建表
        • 2. 准备数据
        • 3. 创建hive管理表与Hbase进行映射
        • 4. hbase当中查看表hbase_score
      • 需求二:创建hive外部表,映射Hbase当中已有的表模型
        • 1. Hbase当中创建表并手动插入加载一些数据
        • 2. 建立hive的外部表,映射Hbase当中的表以及字段
    • Hbase表的rowkey设计
      • rowkey长度原则
      • rowkey散列原则
      • rowkey唯一原则
    • Hbase表的热点
      • 什么是热点?
      • 热点的解决方案
        • 1. 预分区
        • 2. 加盐
        • 3. 哈希
        • 4. 反转
    • region分裂策略
      • ConstantSizeRegionSplitPolicy
      • IncreasingToUpperBoundRegionSplitPolicy
      • SteppingSplitPolicy
      • KeyPrefixRegionSplitPolicy
      • DelimitedKeyPrefixRegionSplitPolicy
      • DisabledRegionSplitPolicy

大数据数据库之Hbase Hbase架构原理 Hbase的数据存储原理

  • 一个 HRegionServer 会负责管理多个 Region;
  • 一个 Region 包含很多个 Store:
    • 一个列族就划分成一个 store;
    • 如果一个表中只有一个列族,那么这个表的每一个 region 中只有一个 store;
    • 如果一个表中有 N 个列族,那么这个表的每一个 region 中就有 N 个 store;
  • 一个 store 里面只有一个 memstore,memestore 是一块内存区域,写入数据会先写入 memstore 进行缓冲,然后再把数据刷到磁盘;
  • 一个 store 里面有很多 storeFile,最后数据是以很多 HFile 这种数据结构的文件保存在 HDFS 上
    • StoreFile 是 HFile 的抽象对象,如果说到 StoreFile 就等于 HFile;
    • 每次 memstore 刷写数据到磁盘,就生成对应的一个新的 HFile 文件出来。

Hbase读数据流程
  • 1、客户端与 ZK 进行连接:

    • 从 ZK 找到 meta 表的 region 位置,即 meta 表的数据(在 HRegionServer 上);
    • 客户端与此 HRegionServer 建立连接,然后读取 meta 表中的数据;
    • meta 表中存储了所有用户表的 region 信息,我们可以通过 scan 'hbase:meta' 来查看 meta 表的信息;

    Hbase集群,只有一张meta表,此表只有一个region,该region数据保存在一个HRegionServer上

  • 2、根据要查询的 namespace、表名和 rowkey 信息,找到写入数据对应的 region 信息;

  • 3、找到这个 region 对应的 regionserver,然后发送请求;

  • 4、查找并定位到对应的 region;

  • 5、先从 memstore 查找数据,如果没有,再从 BlockCache 上读取:

    • Hbase 上 HRegionServer 的内存分为两部分:一部分作为 MemStore,主要用来写;另外一部分作为 BlockCache,主要用于读数据;
  • 6、如果 BlockCache 中也没有找到,再到 StoreFile 上进行读取:

    • 从 StoreFile 中读取到数据之后,不是直接把结果返回给客户端,而是把数据先写入到 BlockCache 中,目的是为了加快后续的查询,然后返回结果给客户端。
Hbase写数据流程
  • 1、客户端首先从 ZK 找到 meta 表的 region 位置,然后读取 meta 表中的数据,meta 表中存储了用户表的 region 信息;
  • 2、根据 namespace、表名和 rowkey 信息,找到写入数据对应的 region 信息;
  • 3、找到这个 region 对应的 regionserver,然后发送请求;
  • 4、把数据分别写到 HLog(write ahead log)和 memstore 各一份;
  • 5、memstore 达到阈值后把数据刷到磁盘,生成 storeFile 文件;

HLog:也称为WAL意为Write ahead log,类似mysql中的binlog,用来做灾难恢复时用,HLog记录数据的所有变更,一旦数据修改,就可以从log中进行恢复。

Hbase的flush、compact机制

Flush触发条件 1. MemStore级别限制
  • 当Region中任意一个MemStore的大小达到了上限(hbase.hregion.memstore.flush.size,默认128MB),会触发 MemStore 刷新。

	hbase.hregion.memstore.flush.size
	134217728

2. Region级别限制
  • 当Region中所有Memstore的大小总和达到了上限(hbase.hregion.memstore.block.multiplier * hbase.hregion.memstore.flush.size,默认 2 * 128M = 256M),会触发 MemStore 刷新。

	hbase.hregion.memstore.flush.size
	134217728


	hbase.hregion.memstore.block.multiplier
	4
   
3. HRegionServer级别限制
  • 当一个 Region Server 中所有 Memstore 的大小总和超过低水位阈值hbase.regionserver.global.memstore.size.lower.limit * hbase.regionserver.global.memstore.size(前者默认值0.95),RegionServer 开始强制 flush;
  • 先 Flush MemStore 最大的Region,再执行次大的,依次执行;
  • 如写入速度大于 flush 写出的速度,导致总 MemStore 大小超过高水位阈值hbase.regionserver.global.memstore.size(默认为JVM内存的40%),此时 RegionServer 会阻塞更新并强制执行 flush,直到总 MemStore 大小低于低水位阈值。

	hbase.regionserver.global.memstore.size.lower.limit
	0.95


	hbase.regionserver.global.memstore.size
	0.4

4. HLog数量上限
  • 当一个Region Server中 HLog 数量达到上限(可通过参数 hbase.regionserver.maxlogs 配置)时,系统会选取最早的一个 HLog 对应的一个或多个 Region 进行 flush。
5. 定期刷新MemStore
  • 默认周期为1小时,确保 MemStore 不会长时间没有持久化。为避免所有的 MemStore 在同一时间都进行flush导致的问题,定期的 flush 操作有 20000ms 左右的随机延时。
6. 手动Flush
  • 用户可以通过shell命令flush 'tablename'或者flush 'region name'分别对一个表或者一个 Region 进行flush。
Flush流程

为了减少flush过程对读写的影响,将整个flush过程分为三个阶段

  • prepare 阶段:
    • 遍历当前 Region 中所有的 MemStore,将 MemStore 中当前数据集 CellSkipListSet 做一个快照 Snapshot,然后再新建一个 CellSkipListSet。后续写入的数据都会写入到新的 CellSkipListSet 中。
    • prepare 阶段需要加一把 updateLock 对写请求阻塞,结束之后释放该锁,因此此阶段没有任何费时操作,因此持锁时间很短。
  • flush 阶段:
    • 遍历所有 MemStore,将 prepare 阶段生成的 snapshot 持久化为临时文件,临时文件会统一放到目录 .tmp 下,这个过程因为涉及到磁盘 IO 操作,因此相对比较耗时。
  • commit 阶段:
    • 遍历所有的 MemStore,将 flush 阶段生成的临时文件移到指定的 ColumnFamily 目录下,针对 HFile 生成对应的 StoreFile 和 Reader,把 StoreFile 添加到 HStore 的 storeFiles 列表中,最后再清空 prepare 阶段生成的 snapshot。
Compact合并机制
  • hbase为了防止小文件过多,以保证查询效率,hbase需要在必要的时候将这些小的store file合并成相对较大的store file,这个过程就称之为compaction。
1. minor compaction 小合并
  • 在将Store中多个HFile合并为一个HFile,在这个过程中会选取一些小的、相邻的StoreFile将他们合并成一个更大的StoreFile,这种合并的触发频率很高。
  • minor compaction触发条件由以下几个参数共同决定:

	hbase.hstore.compactionThreshold
	3




	hbase.hstore.compaction.max
	10




	hbase.hstore.compaction.min.size
	134217728




	hbase.hstore.compaction.max.size
	9223372036854775807

2. major compaction 大合并
  • 合并Store中所有的HFile为一个HFile,将所有的StoreFile合并成一个StoreFile,这个过程还会清理三类无意义数据:被删除的数据、TTL过期数据、版本号超过设定版本号的数据。合并频率比较低,默认7天执行一次,并且性能消耗非常大,建议生产关闭(设置为0),在应用空闲时间手动触发。一般可以是手动控制进行合并,防止出现在业务高峰期。
  • major compaction触发时间条件:

	hbase.hregion.majorcompaction
	604800000

  • 手动触发:
# 使用major_compact命令
major_compact tableName 
Hbase表的预分区
  • 当一个table刚被创建的时候,Hbase默认的分配一个region给table。也就是说这个时候,所有的读写请求都会访问到同一个regionServer的同一个region中,这个时候就达不到负载均衡的效果了,集群中的其他regionServer就可能会处于比较空闲的状态。
  • 解决这个问题可以用 pre-splitting,在创建table的时候就配置好,生成多个region。
为何要预分区?
  • 增加数据读写效率
  • 负载均衡,防止数据倾斜
  • 方便集群容灾调度region
  • 优化Map数量
预分区原理
  • 每一个 region 维护着 startRowKey 与 endRowKey,如果加入的数据符合某个 region 维护的 rowKey 范围,则该数据交给这个 region 维护。
手动指定预分区 方式一
create 'person','info1','info2',SPLITS => ['1000','2000','3000','4000']

  • http://node01:60010/table.jsp?name=person
方式二
  • 也可以把分区规则创建于文件中
vim /bigdata/install/split.txt
# 文件内容
aaa
bbb
ccc
ddd
  • hbase shell中,执行命令
create 'student','info',SPLITS_FILE => '/bigdata/install/split.txt'
  • 成功后查看web界面:http://node01:60010/table.jsp?name=student

方式三
  • HexStringSplit 算法:会将数据从“00000000”到“FFFFFFFF”之间的数据长度按照n等分之后算出每一段的起始rowkey和结束rowkey,以此作为拆分点。
  • 例如:
create 'mytable', 'base_info',' extra_info', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'}
  • http://node01:60010/table.jsp?name=mytable
Region 合并 region 合并说明
  • Region的合并不是为了性能, 而是出于便于运维的目的;
  • 比如删除了大量的数据,这个时候每个Region都变得很小 ,储多个Region就浪费了,这个时候可以把Region合并起来,进而可以减少一些Region服务器节点。
如何进行region合并 1. 通过Merge类冷合并Region
  • 执行合并前,需要先关闭hbase集群
  • 创建一张hbase表:
create 'test','info1',SPLITS => ['1000','2000','3000']
  • 查看表region:http://node01:60010/table.jsp?name=test

  • 需求:把 test 表中的2个 region 数据进行合并:
    test,1639907811059.ecf5efacd44e3562da5e5da5623f4d65.
    test,1000,1639907811059.e5d4094ab6680946d4fb6f21051c7b86.

  • 这里通过 org.apache.hadoop.hbase.util.Merge 类来实现,不需要进入hbase shell,直接执行(需要先关闭hbase集群):

hbase org.apache.hadoop.hbase.util.Merge test test,,1639907811059.ecf5efacd44e3562da5e5da5623f4d65. test,1000,1639907811059.e5d4094ab6680946d4fb6f21051c7b86.
  • 可能会遇到如下报错:原因是我们使用的 hbase版本是 2.2.6 已经将不推荐这种方式了, org.apache.hadoop.hbase.util.Merge 这个类也被删除了。
错误: 找不到或无法加载主类 org.apache.hadoop.hbase.util.Merge
2. 通过online_merge热合并Region
  • 不需要关闭hbase集群,在线进行合并
  • 与冷合并不同的是,online_merge的传参是Region的hash值,而Region的hash值就是Region名称的最后那段在两个.之间的字符串部分。
  • 需要进入hbase shell 执行命名:
merge_region 'ecf5efacd44e3562da5e5da5623f4d65', 'e5d4094ab6680946d4fb6f21051c7b86'
  • 执行命令成功后,到web页面查看:
Hbase集成MapReduce
  • Hbase表中的数据最终都是存储在HDFS上,Hbase天生的支持MR的操作,我们可以通过MR直接处理Hbase表中的数据,并且MR可以将处理后的结果直接存储到Hbase表中。
  • 参考地址:http://hbase.apache.org/book.html#mapreduce
实战一 1. 需求
  • 读取 Hbase 当中 myuser 这张表的 f1:name、f1:age 数据,将数据写入到另外一张 myuser2 表的 f1 列族里面去
2. 实现
  • 第一步::创建 myuser2 这张 hbase 表,列族的名字与 myuser 表的列族名字相同
create 'myuser2','f1'
  • 第二步:创建Maven工程,并导入 jar 包依赖,主要需要导 hbase-mapreduce 的依赖

    org.apache.hbase
    hbase-mapreduce
    2.2.6

  • 第三步:开发 MR 程序实现功能
public class Hbase2HbaseMain extends Configured implements Tool {
    private static class HbaseReadMapper extends TableMapper {
        
        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
            byte[] rowKeyBytes = key.get();
            // 输出数据 => 写数据 => Put
            // 构建Put对象
            Put put = new Put(rowKeyBytes);
            // 获取一行中所有的 Cell 对象
            Cell[] cells = value.rawCells();
            // 将f1 : name& age输出
            for (Cell cell : cells) {
                String family = Bytes.toString(CellUtil.cloneFamily(cell));
                if (!"f1".equals(family)) continue;
                String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
                if (StringUtils.equalsAny(qualifier, "name", "age")) put.add(cell);
            }
            if (!put.isEmpty()) context.write(new Text(Bytes.toString(rowKeyBytes)), put);
        }
    }

    private static class HbaseWriteReducer extends TableReducer {
        
        @Override
        protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            ImmutableBytesWritable writable = new ImmutableBytesWritable();
            // rowKey
            writable.set(key.toString().getBytes());

            // 遍历 put 对象,并输出
            for (Put put : values) context.write(writable, put);
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(super.getConf());
        job.setJarByClass(Hbase2HbaseMain.class);

        // Mapper
        TableMapReduceUtil.initTableMapperJob(TableName.valueOf("myuser"), new Scan(),
                HbaseReadMapper.class, Text.class, Put.class, job);
        // Reducer
        TableMapReduceUtil.initTableReducerJob("myuser2", HbaseWriteReducer.class, job);

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = HbaseConfiguration.create();
        configuration.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181");

        int run = ToolRunner.run(configuration, new Hbase2HbaseMain(), args);
        System.exit(run);
    }
}
  • 运行程序成功后,在 hbase shell 查看输出结果:

实战二:hdfs => hbase 1. 需求
  • 读取hdfs上面的数据,写入到hbase表里面去,node03执行以下命令准备数据文件,并将数据文件上传到HDFS上面去
  • 在 /bigdata/install/hbasedatas 目录,创建 user.txt 文件,内容如下:
0007	zhangsan	18
0008	lisi		25
0009	wangwu		20
  • 将文件上传到hdfs的路径下面去
hdfs dfs -mkdir -p /hbase/input
hdfs dfs -put /bigdata/install/hbasedatas/user.txt /hbase/input/

2. 编码实现
public class Hdfs2HbaseMain extends Configured implements Tool {
    private static class HdfsReadMapper extends Mapper {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 数据原样输出
            context.write(value, NullWritable.get());
        }
    }

    private static class HbaseWriteReducer extends TableReducer {
        @Override
        protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            String[] slices = key.toString().split("t");
            Put put = new Put(Bytes.toBytes(slices[0]));
            put.addColumn("f1".getBytes(), "name".getBytes(), slices[1].getBytes());
            put.addColumn("f1".getBytes(), "age".getBytes(), slices[2].getBytes());

            context.write(new ImmutableBytesWritable(Bytes.toBytes(slices[0])), put);
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(super.getConf());
        job.setJarByClass(Hdfs2HbaseMain.class);

        // 输入文件路径
        FileInputFormat.addInputPath(job, new Path("hdfs://node01:8020/hbase/input"));

        job.setMapperClass(HdfsReadMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        // 指定输出到hbase表名
        TableMapReduceUtil.initTableReducerJob("myuser2", HbaseWriteReducer.class, job);

        // 设置reduce个数
        job.setNumReduceTasks(1);

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = HbaseConfiguration.create();
        configuration.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181");

        int run = ToolRunner.run(configuration, new Hdfs2HbaseMain(), args);
        System.exit(run);
    }
}

实战三:bulkload 1. 需求
  • 通过bulkload的方式批量加载数据到Hbase表中;
  • 将我们 hdfs 上面的这个路径 /hbase/input/user.txt 的数据文件,转换成 HFile 格式,然后 load 到 myuser2 这张表里面去
    • 加载数据到Hbase当中去的方式多种多样,我们可以使用 Hbase 的 javaAPI 或者使用 sqoop 将我们的数据写入或者导入到Hbase当中去,但是这些方式不是最佳的,因为在导入的过程中占用 Region 资源导致效率低下;
    • 我们也可以通过MR的程序,将我们的数据直接转换成 Hbase 的最终存储格式HFile,然后直接 load 数据到Hbase当中去即可
2. 实现原理
  • Hbase数据正常写流程回顾
  • bulkload方式的处理示意图

  • 好处
    • 导入过程不占用Region资源
    • 能快速导入海量的数据
    • 节省内存
3. 编码实现
  • 开发生成HFile文件的代码:
public class BulkLoadMain extends Configured implements Tool {
    private static class BulkLoadMapper extends Mapper {

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] slices = value.toString().split("t");
            Put put = new Put(slices[0].getBytes());
            put.addColumn("f1".getBytes(), "name".getBytes(), slices[1].getBytes());
            put.addColumn("f1".getBytes(), "age".getBytes(), slices[2].getBytes());

            context.write(new ImmutableBytesWritable(slices[0].getBytes()), put);
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = super.getConf();
        Job job = Job.getInstance(conf);
        job.setJarByClass(BulkLoadMain.class);

        FileInputFormat.addInputPath(job, new Path("hdfs://node01:8020/hbase/input"));
        job.setMapperClass(BulkLoadMapper.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);

        Connection connection = ConnectionFactory.createConnection(conf);
        Table table = connection.getTable(TableName.valueOf("myuser2"));

        // 使MR可以向myuser2表中,增量增加数据
        HFileOutputFormat2.configureIncrementalLoad(job, table,
                connection.getRegionLocator(TableName.valueOf("myuser2")));
        // 数据写回到HDFS,写成HFile -> 所以指定输出格式为HFileOutputFormat2
        job.setOutputFormatClass(HFileOutputFormat2.class);
        HFileOutputFormat2.setOutputPath(job, new Path("hdfs://node01:8020/hbase/out_hfile"));

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = HbaseConfiguration.create();
        configuration.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181");

        int run = ToolRunner.run(configuration, new BulkLoadMain(), args);
        System.exit(run);
    }
}
  • 查看HDFS上输出的结果:
  • 加载HFile文件到hbase表中
public class LoadData {

    public static void main(String[] args) throws Exception {
        Configuration configuration = HbaseConfiguration.create();
        configuration.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181");

        BulkLoadHFiles load = BulkLoadHFiles.create(configuration);
        load.bulkLoad(TableName.valueOf("myuser2"),
                new Path("hdfs://node01:8020/hbase/out_hfile"));
    }
}
Hbase集成Hive
  • Hive提供了与Hbase的集成,使得能够在Hbase表上使用hive sql 语句进行查询、插入操作以及进行Join和Union等复杂查询,同时也可以将hive表中的数据映射到Hbase中
Hbase与Hive的对比
HbaseHive
数据库管理系统
是一种面向列存储的非关系型数据库。
数据仓库管理系统
Hive的本质其实就相当于将HDFS中已经存储的文件在Mysql中做了一个双射关系,以方便使用HQL去管理查询。
用于存储结构化和非结构话的数据
适用于单表非关系型数据的存储,不适合做关联查询,类似JOIN等操作。
用于数据分析、清洗
Hive适用于离线的数据分析和清洗,延迟较高
基于HDFS
数据持久化存储的体现形式是Hfile,存放于DataNode中,被ResionServer以region的形式进行管理。
基于HDFS、MapReduce(或者其他计算引擎如:Tez、Spark)
Hive存储的数据依旧在DataNode上,编写的HQL语句终将是转换为MapReduce代码执行。
延迟较低,接入在线业务使用
面对大量的企业数据,Hbase可以直线单表大量数据的存储,同时提供了高效的数据访问速度。
  • Hive和Hbase是两种基于Hadoop的不同技术,Hive是一种类SQL的引擎,并且运行MapReduce任务,Hbase是一种在Hadoop之上的NoSQL 的Key/vale数据库。这两种工具是可以同时使用的。就像用Google来搜索,用FaceBook进行社交一样,Hive可以用来进行统计查询,Hbase可以用来进行实时查询,数据也可以从Hive写到Hbase,或者从Hbase写回Hive。
整合配置 1. 拷贝jar包
  • 把hive安装目录的lib文件夹下的hbase开头的jar包删除,然后hbase安装目录下lib下hbase开头的几个jar拷贝到hive的lib文件夹下
rm /bigdata/install/hive-3.1.2/lib/hbase*
cp /bigdata/install/hbase-2.2.6/lib/hbase* /bigdata/install/hive-3.1.2/lib
2. 修改hive配置文件
  • 编辑node03服务器上面的hive的配置文件hive-site.xml,vim conf/hive-site.xml 添加以下两个属性的配置:

    hive.zookeeper.quorum
    node01,node02,node03


    hbase.zookeeper.quorum
    node01,node02,node03

3. 修改hive-env.sh配置文件
  • vim conf/hive-env.sh 添加以下配置
export HADOOP_HOME=/bigdata/install/hadoop-3.1.4
export Hbase_HOME=/bigdata/install/hbase-2.2.6
export HIVE_CONF_DIR=/bigdata/install/hive-3.1.2/conf
需求一:将hive表当中分析的结果保存到hbase表当中 1. hive中创建表
  • node03 进入hive客户端,执行以下命令创建hive表
hive

create database course;
use course;

create external table if not exists course.score(id int, cname string, score int) 
row format delimited fields terminated by 't' stored as textfile ;
2. 准备数据
  • 数据文件 /bigdata/install/hivedatas/hive-hbase.txt
1       zhangsan        80
2       lisi            60
3       wangwu          30
4       zhaoliu         70
  • 进入hive客户端进行加载数据:
load data local inpath '/bigdata/install/hivedatas/hive-hbase.txt' into table score;
select * from score;
3. 创建hive管理表与Hbase进行映射
  • 我们可以创建一个hive的管理表与hbase当中的表进行映射,hive管理表当中的数据,都会存储到hbase上面去
  • hive当中创建内部表
create table course.hbase_score(id int,cname string,score int) 
stored by 'org.apache.hadoop.hive.hbase.HbaseStorageHandler'  
with serdeproperties("hbase.columns.mapping" = "cf:name,cf:score") tblproperties("hbase.table.name" = "hbase_score");
  • 通过 insert overwrite select 插入数据
insert overwrite table course.hbase_score select id,cname,score from course.score;
4. hbase当中查看表hbase_score
  • 进入hbase的客户端查看表hbase_score,并查看当中的数据
list
scan 'hbase_score'

需求二:创建hive外部表,映射Hbase当中已有的表模型 1. Hbase当中创建表并手动插入加载一些数据
  • 进入Hbase的shell客户端,手动创建一张表,并插入加载一些数据进去
create 'hbase_hive_score', { NAME =>'cf'}
put 'hbase_hive_score','1','cf:name','zhangsan'
put 'hbase_hive_score','1','cf:score', '95'
put 'hbase_hive_score','2','cf:name','lisi'
put 'hbase_hive_score','2','cf:score', '96'
put 'hbase_hive_score','3','cf:name','wangwu'
put 'hbase_hive_score','3','cf:score', '97'
2. 建立hive的外部表,映射Hbase当中的表以及字段
  • 在hive当中建立外部表,进入hive客户端,然后执行以下命令进行创建hive外部表,就可以实现映射Hbase当中的表数据
CREATE external TABLE course.hbase2hive(id int, name string, score int) 
STORED BY 'org.apache.hadoop.hive.hbase.HbaseStorageHandler' 
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:name,cf:score") TBLPROPERTIES("hbase.table.name" ="hbase_hive_score");
  • 查看hive表course.hbase2hive
select * from course.hbase2hive;

Hbase表的rowkey设计 rowkey长度原则
  • rowkey是一个二进制码流,可以是任意字符串,最大长度64kb,实际应用中一般为10-100bytes,以byte[]形式保存,一般设计成定长。
  • 建议尽可能短,但是也不能太短,否则rowkey前缀重复的概率增大。
  • 设计过长会降低memstore内存的利用率和HFile存储数据的效率。
rowkey散列原则
  • 建议将rowkey的高位作为散列字段,这样将提高数据均衡分布在每个RegionServer,以实现负载均衡的几率。
  • 如果没有散列字段,首字段直接是时间信息。所有的数据都会集中在一个RegionServer上,这样在数据检索的时候负载会集中在个别的RegionServer上,造成热点问题,会降低查询效率。
rowkey唯一原则
  • 必须在设计上保证其唯一性,rowkey是按照字典顺序排序存储的。
  • 因此,设计rowkey的时候,要充分利用这个排序的特点,可以将经常读取的数据存储到一块,将最近可能会被访问的数据放到一块。
Hbase表的热点 什么是热点?
  • 检索habse的记录首先要通过rowkey来定位数据行。
  • 当大量的client访问hbase集群的一个或少数几个节点,造成少数region server的读/写请求过多、负载过大,而其他region server负载却很小,就造成了“热点”现象。
热点的解决方案 1. 预分区
  • 预分区的目的让表的数据可以均衡的分散在集群中,而不是默认只有一个region分布在集群的一个节点上。
2. 加盐
  • 这里所说的加盐不是密码学中的加盐,而是在rowkey的前面增加随机数,具体就是给rowkey分配一个随机前缀以使得它和之前的rowkey的开头不同
3. 哈希
  • 哈希会使同一行永远用一个前缀加盐。哈希也可以使负载分散到整个集群,但是读却是可以预测的。使用确定的哈希可以让客户端重构完整的rowkey,可以使用get操作准确获取某一个行数据。
rowkey=MD5(username).subString(0,10)+时间戳	
4. 反转
  • 反转固定长度或者数字格式的rowkey。这样可以使得rowkey中经常改变的部分(最没有意义的部分)放在前面。
  • 这样可以有效的随机rowkey,但是牺牲了rowkey的有序性。
电信公司:
移动-----------> 136xxxx9301  ----->1039xxxx631
				136xxxx1234  
				136xxxx2341 
电信
联通

user表
rowkey    name    	age   	sex    	  	address
		  lisi1    	21     	m       	beijing
		  lisi2    	22     	m       	beijing
		  lisi3    	25     	m       	beijing
		  lisi4    	30     	m       	beijing
		  lisi5    	40     	f       	shanghai
		  lisi6    	50     	f       	tianjin
	          
需求:后期想经常按照居住地和年龄进行查询?	
rowkey = address+age+随机数
         beijing21+随机数
         beijing22+随机数
         beijing25+随机数
         beijing30+随机数
   
rowkey = address+age+随机数
region分裂策略
  • region中存储的是一张表的数据,当region中的数据条数过多的时候,会直接影响查询效率。
  • 当region过大的时候,hbase会将region拆分为两个region,这也是Hbase的一个优点。
  • Hbase的region split策略一共有以下6种
ConstantSizeRegionSplitPolicy
  • 0.94版本前,Hbase region的默认切分策略

  • 当region中最大的store大小超过某个阈值(hbase.hregion.max.filesize=10G)之后就会触发切分,一个region等分为2个region。

  • 但是在生产线上这种切分策略却有相当大的弊端:

    • 切分策略对于大表和小表没有明显的区分。
    • 阈值(hbase.hregion.max.filesize)设置较大对大表比较友好,但是小表就有可能不会触发分裂,极端情况下可能就1个,形成热点,这对业务来说并不是什么好事。
    • 如果设置较小则对小表友好,但一个大表就会在整个集群产生大量的region,这对于集群的管理、资源使用、failover来说都不是一件好事。
IncreasingToUpperBoundRegionSplitPolicy
  • 0.94版本~2.0版本默认切分策略

  • 总体看和ConstantSizeRegionSplitPolicy思路相同

    • 一个region中最大的store大小大于设置阈值就会触发切分。
    • 但是这个阈值并不像 ConstantSizeRegionSplitPolicy 是一个固定的值,而是会在一定条件下不断调整,调整规则和region所属表在当前regionserver上的region个数有关系。
  • region split阈值的计算公式是:

    • 设 regioncount 是region所属表在当前regionserver上的region的个数
    • 阈值 = $ regioncount^3 * 128M * 2$,当然阈值并不会无限增长,最大不超过 MaxRegionFileSize(10G);当region中最大的store的大小达到该阈值的时候进行region split
  • 例如:

    • 第一次split阈值 = 13 * 256 = 256MB
    • 第二次split阈值 = 23 * 256 = 2048MB
    • 第三次split阈值 = 33 * 256 = 6912MB
    • 第四次split阈值 = 43 * 256 = 16384MB > 10GB,因此取较小的值10GB
    • 后面每次split的size都是10GB了
  • 特点

    • 相比ConstantSizeRegionSplitPolicy,可以自适应大表、小表;
    • 在集群规模比较大的情况下,对大表的表现比较优秀;
    • 但是,它并不完美,小表可能产生大量的小region,分散在各regionserver上。
SteppingSplitPolicy
  • 2.0版本默认切分策略
  • 相比 IncreasingToUpperBoundRegionSplitPolicy 简单了一些
  • region切分的阈值依然和待分裂region所属表在当前regionserver上的region个数有关系
    • 如果region个数等于1,切分阈值为flush size 128M * 2
    • 否则为 MaxRegionFileSize。
  • 这种切分策略对于大集群中的大表、小表会比 IncreasingToUpperBoundRegionSplitPolicy 更加友好,小表不会再产生大量的小region,而是适可而止。
KeyPrefixRegionSplitPolicy
  • 根据rowKey的前缀对数据进行分区,这里是指定rowKey的前多少位作为前缀,比如rowKey都是16位的,指定前5位是前缀,那么前5位相同的rowKey在相同的region中。
DelimitedKeyPrefixRegionSplitPolicy
  • 保证相同前缀的数据在同一个region中,例如rowKey的格式为:userid_eventtype_eventid,指定的delimiter为 _ ,则split的的时候会确保userid相同的数据在同一个region中。
DisabledRegionSplitPolicy
  • 不启用自动拆分, 需要指定手动拆分。
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/677228.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号