栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

2021SC@SDUSC hbase代码分析(六)写入流程(4)

2021SC@SDUSC hbase代码分析(六)写入流程(4)

2021SC@SDUSC hbase代码分析(六) 写入流程

2021SC@SDUSC 2021SC@SDUSC

2021SC@SDUSC 2021SC@SDUSC

接上次博客 (2)随机写入MemStore

KeyValue写入Rgion分为两步:首先追加到HLog,再写入MemStore。MemStore使用数据结构ConcurrentSkipListMap来实际存储KeyValue.优点是能够非常友好地支持大规模并发写入,同时跳跃表本身是有序存储的,这有利于数据有序落盘,并且有利于提升MemStore中的KeyValue查找性能。

ConcurrentSkipListMap初始化函数:

private void initialize() {
    keySet = null;
    entrySet = null;
    values = null;
    descendingMap = null;
    head = new HeadIndex(new Node(null, base_HEADER, null),
                              null, null, 1);
}

MemStore的写入流程可以表述为以下3步。

  1. 检查当前可用的Chunk是否写满,如果写满,重新申请一个2M的Chunk。
  2. 将当前KeyValue在内存中重新构建,在可用Chunk的指定offset处申请内存创建一个新的KeyValue对象。
  3. 将新创建的KeyValue对象写入ConcurrentSkipListMap中。
MemStore Fush阶段

Hbase中,Region是集群节点上最小的数据服务单元,用户数据表由一个或多个Region组成。在Region中每个ColumnFamily的数据组成一个Store。每个Store由一个Memstore和多个HFile组成。

HFile是由Memstore flush产生的,每一次的flush都会产生一个全新的HFile文件。

1.触发条件

Hbase会在以下几种情况下触发MemStore Flush:

  1. MemStore级别限制:当Region中任意一个MemStore的大小达到了上限hbase.hregion.memstore.flush.size(默认是134217728,即128MB),会触发MemStore刷写,生成一个HFile。

    public static final String HREGION_MEMSTORE_FLUSH_SIZE =
        "hbase.hregion.memstore.flush.size";
    
    hbase.hregion.memstore.flush.size
    134217728
    
  2. Region级别限制:当Region中所有MemStore的大小总和达到了上限,会触发MemStore刷新。

  3. RegionServer级别限制:当整个RegionServer中所有写入MemStore的数据大小总和超过低水位阈值,RegionServer开始强制执行flush,先flush MemStore最大的Region,再flush次大的,依次执行。
    如果此时写入吞吐量依然很高,导致总MemStore的数据大小超过高水位阈值 hbase.regionserver.global.memstore.size,RegionServer会阻塞所有写入请求并强制执行flush,直至总MemStore大小下降到低水位阈值。
    一旦出现RegionServer写入出现阻塞,查看日志中是否存在关键字Blocking updates on,如果存在说明当前RegionServer总MemStore内存大小超过了高水位阀值。

    public static final String MEMSTORE_SIZE_KEY = "hbase.regionserver.global.memstore.size";
    public static final String MEMSTORE_SIZE_OLD_KEY =
        "hbase.regionserver.global.memstore.upperLimit";
    public static final String MEMSTORE_SIZE_LOWER_LIMIT_KEY =
        "hbase.regionserver.global.memstore.size.lower.limit";
    public static final String MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY =
        "hbase.regionserver.global.memstore.lowerLimit";
    
  4. 当一个RegionServer中HLog数量达到上限:系统会选取最早的HLog对应的一个或多个Region进行flush。

  5. Hbase定期刷新MemStore:时间间隔的配置项是:hbase.regionserver.optionalcacheflushinterval,默认值为3600000,即1个小时。如果设定为0,则意味着关闭定时自动刷写。

    
      hbase.regionserver.optionalcacheflushinterval
      3600000
      
      Maximum amount of time an edit lives in memory before being automatically flushed.
      Default 1 hour. Set it to 0 to disable automatic flushing.
    
    
    
  6. 手动执行flush:Admin接口也提供了方法来手动触发Memstore的刷写、hbase shell来手动触发Memstore的刷写。

    • flush(TableName tableName):对单个表进行刷写。
    • flushRegion(byte[] regionName):对单个Region进行刷写。
2.执行流程

为了减少flush过程对读写的影响,Hbase采用了类似两阶段提交的方式,将整个flush过程分为三个阶段。

  1. prepare阶段:遍历当前Region中的所有MemStore,将MemStore中当前数据集ConcurrentSkipListMap做一个快照snapshot,然后再新建一个CellSkipListSet接收新的数据写入。prepare阶段需要添加updateLock对写请求阻塞,结束之后会释放该锁。因为此阶段没有任何费时操作,因此持锁时间很短。

      protected KeyValueScanner pollRealKV() throws IOException {
        KeyValueScanner kvScanner = heap.poll();
        if (kvScanner == null) {
          return null;
        }
    
        while (kvScanner != null && !kvScanner.realSeekDone()) {
          if (kvScanner.peek() != null) {
            try {
              kvScanner.enforceSeek();
            } catch (IOException ioe) {
              // Add the item to delayed close set in case it is leak from close
              this.scannersForDelayedClose.add(kvScanner);
              throw ioe;
            }
            Cell curKV = kvScanner.peek();
            if (curKV != null) {
              KeyValueScanner nextEarliestScanner = heap.peek();
              if (nextEarliestScanner == null) {
                // The heap is empty. Return the only possible scanner.
                return kvScanner;
              }
    
              Cell nextKV = nextEarliestScanner.peek();
              if (nextKV == null || comparator.compare(curKV, nextKV) < 0) {
                return kvScanner;
              }
              heap.add(kvScanner);
            } else {
              this.scannersForDelayedClose.add(kvScanner);
            }
          } else {
            this.scannersForDelayedClose.add(kvScanner);
          }
          kvScanner = heap.poll();
        }
    
        return kvScanner;
      }
    
  2. flush阶段:遍历所有MemStore,将prepare阶段生成的snapshot持久化为临时文件,临时文件会统一放到region目录下的.tmp路径。这个过程因为涉及磁盘IO操作,因此相对比较耗时。

  3. commit阶段:遍历所有的MemStore,将flush阶段生成的临时文件移到指定的ColumnFamily目录下,针对HFile生成对应的storefile和Reader,把storefile添加到Store的storefiles列表中,最后再清空prepare阶段生成的snapshot。

    HFileContext(boolean useHbaseChecksum, boolean includesMvcc, boolean includesTags,
                 Compression.Algorithm compressAlgo, boolean compressTags, ChecksumType checksumType,
                 int bytesPerChecksum, int blockSize, DataBlockEncoding encoding,
                 Encryption.Context cryptoContext, long fileCreateTime, String hfileName,
                 byte[] columnFamily, byte[] tableName, CellComparator cellComparator) {
      this.usesHbaseChecksum = useHbaseChecksum;
      this.includesMvcc =  includesMvcc;
      this.includesTags = includesTags;
      this.compressAlgo = compressAlgo;
      this.compressTags = compressTags;
      this.checksumType = checksumType;
      this.bytesPerChecksum = bytesPerChecksum;
      this.blocksize = blockSize;
      if (encoding != null) {
        this.encoding = encoding;
      }
      this.cryptoContext = cryptoContext;
      this.fileCreateTime = fileCreateTime;
      this.hfileName = hfileName;
      this.columnFamily = columnFamily;
      this.tableName = tableName;
      // If no cellComparator specified, make a guess based off tablename. If hbase:meta, then should
      // be the meta table comparator. Comparators are per table.
      this.cellComparator = cellComparator != null ? cellComparator : this.tableName != null ?
        CellComparatorImpl.getCellComparator(this.tableName) : CellComparator.getInstance();
    }
    
3.生成HFile

Hbase执行Flush操作之后将内存中的数据按照特定格式写成HFile文件。

该部分会再后续博客中分析。。。

至此写入流程大体结束
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/422854.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号