栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

2021SC@SDUSC HBase项目分析:compact流程分析(一)

2021SC@SDUSC HBase项目分析:compact流程分析(一)

2021SC@SDUSC

目录

Compact介绍

Compact定义

Compact分类

compact源码分析


Compact介绍 Compact定义

Compact是指Hbase表中HRegion上某个Column Family下,部分或全部HFiles的合并。它是由于数据在持续写入后,MemStore达到一定阈值,被flush到磁盘生成hfile,随着hfile的逐渐增多,如果不做处理,将会严重影响Hbase数据读取的效率。所以,在Hbase系统内部,需要定期在满足一定条件的情况下,或者人为手动触发,将这许多文件合并成一个大文件,即compact。

Compact分类

Compact分为两种:Minor Compact、Major Compact。

Minor Compact:选择部分小的、相邻的hfile合并为一个大文件;

Major Compat:将store中所有的hfile文件合并成一个大的文件,并且这个阶段会将过期的数据、已删除的数据和超出版本的数据进行物理上的清除。

compact源码分析

compact的发起方法是HRegion的compact()方法,HRegion的compact()方法:

  1. 首先确认合并上下文不为空,即合并请求request不为空,且请求request中所包含的文件不为空;
  2. 如果Region正在下线或者已经下线,记录日志,取消合并请求,返回false,取消合并调用的是Store的cancelRequestedCompaction()方法;
  3. 设置一个标志位:requestNeedsCancellation,表示是否需要撤销合并请求;
  4. 获取HRegion上lock的读锁;
  5. 从store中获取列族名;
  6. 根据列族名从stores中获取store,如果其和传入的store不相等,则记录warn日志,并返回false。此时,对应store已在该HRegion上重新初始化,我们要取消此次合并请求;
  7. 获取任务状态监控器,并记录状态:Compacting storename in regionname;
  8. 如果Region已下线,跳过合并,返回false;
  9. 设置一个状态位wasStateSet;
  10. 判断writestate,确认Region可写,并累加合并正在进行的数目;
  11. 任务状态监控器记录状态:Compacting store storename;
  12. 标志位requestNeedsCancellation设置为false,表示此时compact可以真正执行;
  13. 调用store的compact方法,执行合并;
  14. 如果标志位wasStateSet为true,则合并正在进行的数目writestate.compacting减一;并且如果没有合并在进行,则唤醒其他阻塞线程
  15. 任务状态监控器记录状态:Compaction complete;
  16. 返回处理结果true;
  17. 在finally中需要做如下处理:a)如果需要取消合并,调用Store的cancelRequestedCompaction()方法取消合并;b)清空状态跟踪器;c)释放读锁。
  public boolean compact(CompactionContext compaction, Store store,
                         CompactionThroughputController throughputController, User user) throws IOException {
    //确认合并上下文不为空,即合并请求request不为空,且请求request中所包含的文件不为空
    assert compaction != null && compaction.hasSelection();
    assert !compaction.getRequest().getFiles().isEmpty();
    //如果Region正在下线或者已经下线,记录日志,并取消合并请求,返回false
    //取消合并调用的是Store的cancelRequestedCompaction()方法
    if (this.closing.get() || this.closed.get()) {
      LOG.debug("Skipping compaction on " + this + " because closing/closed");
      store.cancelRequestedCompaction(compaction);
      return false;
    }
    //任务状态监控器
    MonitoredTask status = null;
    //标志位,请求需要撤销
    boolean requestNeedsCancellation = true;
    //阻塞,等待合并的锁
    //这个lock锁是Region行为上的一个读写锁,加上这个锁,控制Region的整体行为,比如flush、compact、close等
    //flush和compact使用的是读锁,是一个共享锁,意味着flush和compact可以同步进行,但是不能执行close,因为close是写锁,
    //这是一个独占锁,一旦被占用,其他线程就不能发起flush、compact等操作
    lock.readLock().lock();
    try {
      //获取列族名
      byte[] cf = Bytes.toBytes(store.getColumnFamilyName());
      //如果根据列族名从stores中获取的store,和传入的store不相等,则记录warn日志,并返回false
      if (stores.get(cf) != store) {
        //此时,对应store已在该HRegion上被重新初始化,我们要取消此次合并请求,这种情况可能是由于分裂事务回滚时造成的
        LOG.warn("Store " + store.getColumnFamilyName() + " on region " + this
                + " has been re-instantiated, cancel this compaction request. "
                + " It may be caused by the roll back of split transaction");
        return false;
      }
      //任务状态监控器记录状态:Compacting storename in regionname
      status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this);
      //如果Region已下线,跳过合并
      if (this.closed.get()) {
        String msg = "Skipping compaction on " + this + " because closed";
        LOG.debug(msg);
        status.abort(msg);
        return false;
      }
      //状态位
      boolean wasStateSet = false;
      try {
        //判断writestate,确认Region可写,并累加合并正在进行的数目
        synchronized (writestate) {
          if (writestate.writesEnabled) {
            //如果Region可写,累加合并进行的数目,标志位wasStateSet设置为true
            wasStateSet = true;
            ++writestate.compacting;
          } else {
            //如果Region不可写,记录log信息,舍弃该状态
            String msg = "NOT compacting region " + this + ". Writes disabled.";
            LOG.info(msg);
            status.abort(msg);
            return false;
          }
        }
        LOG.info("Starting compaction on " + store + " in region " + this
                + (compaction.getRequest().isOffPeak()?" as an off-peak compaction":""));
        doRegionCompactionPrep();
        try {
          //任务状态监控器记录状态:Compacting store storename
          status.setStatus("Compacting store " + store);
          //标志位requestNeedsCancellation设置为false,说明此时compact可以真正执行
          requestNeedsCancellation = false;
          //调用store的compact方法,执行合并
          store.compact(compaction, throughputController, user);
        } catch (InterruptedIOException iioe) {
          String msg = "compaction interrupted";
          LOG.info(msg, iioe);
          status.abort(msg);
          return false;
        }
      } finally {
        if (wasStateSet) {
          //如果合并正在进行的数目已经累加,合并正在进行的数目减一
          synchronized (writestate) {
            --writestate.compacting;
            //如果没有合并在进行,唤醒其他阻塞线程
            if (writestate.compacting <= 0) {
              writestate.notifyAll();
            }
          }
        }
      }
      //任务状态监控器记录状态:合并完成
      status.markComplete("Compaction complete");
      return true;
    } finally {
      try {
    	// 如果需要取消合并,调用Store的cancelRequestedCompaction()方法取消合并
        if (requestNeedsCancellation) store.cancelRequestedCompaction(compaction);
        //清空状态跟踪器
        if (status != null) status.cleanup();
      } finally {
        //释放读锁
        lock.readLock().unlock();
      }
    }
  }

再来看一下上述方法的一个重要参数CompactionContext类型的compaction

CompactionContext是合并的上下文类。该类含有运行一个合并所必需的全部“物理”细节,其内部包含一个合并请求CompactionRequest类型的request,同时包含针对该请求的判断、获取、赋值方法,相关代码如下

  //设置合并请求
  public void forceSelect(CompactionRequest request) {
    this.request = request;
  }
  //获取合并请求
  public CompactionRequest getRequest() {
    assert hasSelection();
    return this.request;
  }
  //判断合并请求是否为空,意思也就是是否已选择文件
  public boolean hasSelection() {
    return this.request != null;
  }

同时CompactionContext本身是一个抽象类,它有两个子类,分别为:位于DefaultStoreEngine中的DefaultCompactionContext和位于StripeStoreEngine中的StripeCompaction;

这两个CompactionContext的子类都分别实现了用于选择需要合并文件的select()方法、在已选择文件的基础上执行合并的compact()方法,有关这两个类会在后续博客中详细分析。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/583121.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号