栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

2021SC@SDUSC HBase项目分析:compact流程分析(六)

2021SC@SDUSC HBase项目分析:compact流程分析(六)

2021SC@SDUSC

目录

Compact回顾

Compact发起时机


2021SC@SDUSC 

过去的五篇里分析了compact的流程,从这篇开始分析compact的发起时机、判断条件。

Compact回顾

先来回顾一下compact的定义与作用。Compact是指Hbase表中HRegion上某个Column Family下,部分或全部HFiles的合并。它是由于数据在持续写入后,MemStore达到一定阈值,被flush到磁盘生成hfile,随着hfile的逐渐增多,如果不做处理,将会严重影响Hbase数据读取的效率。所以,在Hbase系统内部,需要定期在满足一定条件的情况下,或者人为手动触发,将这许多文件合并成一个大文件,即compact。Compact分为两种:Minor Compact、Major Compact。Minor Compact是指选择部分小的、相邻的hfile合并为一个大文件;Major Compat是指将store中所有的hfile文件合并成一个大的文件,并且这个阶段会将过期的数据、已删除的数据和超出版本的数据进行物理上的清除。

Compact发起时机

在HRegionServer内有一个成员变量Chore compactionChecker,compactionChecker负责检查compact请求,查看Chore的定义:可以看出Chore继承自HasThread类,HasThread类是一个实现了Runnable接口的抽象类,并且定义了一个抽象的run()方法。所以,Chore是一个线程。再结合源码中给的注释,可以看出 a)Chore是定期在Hbase中执行的一个任务;b)Chore在它所在的线程内执行;c)这个基础抽象类提供了loop循环和sleep机制。

public abstract class Chore extends HasThread {
}

再来查看一下Chore的成员变量:Chore提供的sleep机制就是依靠sleeper来实现的,而stopper可以是任何实现了Stoppable接口的类的实例

  private final Log LOG = LogFactory.getLog(this.getClass());
  private final Sleeper sleeper;
  protected final Stoppable stopper;

 再来看一下Chore的构造函数:可以看到Chore的构造函数的构造函数需要String name, final int p, final Stoppable stopper三个参数。其中name是Chore的名称,而p和stopper用来构造一个sleeper

  public Chore(String name, final int p, final Stoppable stopper) {
    super(name);
    if (stopper == null){
      throw new NullPointerException("stopper cannot be null");
    }
    this.sleeper = new Sleeper(p, stopper);
    this.stopper = stopper;
  }

接下来看一下Sleeper类,首先查看Sleeper类的成员变量:其中period表示睡眠周期,通过上述构造方法中的p赋值;sleepLock是一个Object对象,依靠它的wait()方法,可以实现对象等待一段时间;triggerWake是一个标志位,将其设置为true,可以跳出睡眠,重新复苏

  private final Log LOG = LogFactory.getLog(this.getClass().getName());
  private final int period;
  private final Stoppable stopper;
  private static final long MINIMAL_DELTA_FOR_LOGGING = 10000;

  private final Object sleepLock = new Object();
  private boolean triggerWake = false;

再来看一下Sleep类最重要的一个方法——sleep(),查看sleep()方法:该方法会根据参数睡眠的起始时间startTime,结合睡眠器构造时设定好的睡眠周期period,以及当前时间now,计算出等待时间waitTime。而后,在一个等待时间waitTime大于0的while循环内,首先判断标志位triggerWake,如果其为true,则break,复位triggerWake并停止休眠,否则,利用sleepLock的wait()方法休眠指定时间waitTime,直到时间结束或者有其他线程设置triggerWake标志位为true并通过sleepLock的notifyAll()方法唤醒sleepLock对象,让其wait()方法抛出InterruptedException异常,继而重新计算等待时间,并进入下一个循环。此时,标志位triggerWake已设置为true,直接跳出循环,结束休眠

  public void sleep(final long startTime) {
    //如果stopper已停止,则直接返回
    if (this.stopper.isStopped()) {
      return;
    }
    long now = System.currentTimeMillis();
    long waitTime = this.period - (now - startTime);
    //如果等待时间waitTime超过周期period,那么直接将period赋值给waitTime,并记录警告信息
    if (waitTime > this.period) {
      LOG.warn("Calculated wait time > " + this.period +
        "; setting to this.period: " + System.currentTimeMillis() + ", " +
        startTime);
      waitTime = this.period;
    }
    //等待时间waitTime大于0则一直循环
    while (waitTime > 0) {
      long woke = -1;
      try {
        //判断标志位triggerWake,如果其他线程已唤醒该睡眠期,则跳出循环,复位triggerWake为fale,直接返回,不再睡眠
        synchronized (sleepLock) {
          if (triggerWake) break;
          sleepLock.wait(waitTime);
        }
        //计算已睡眠时间slept
        woke = System.currentTimeMillis();
        long slept = woke - now;
        //如果slept时间超出周期10s,则记录警告信息
        if (slept - this.period > MINIMAL_DELTA_FOR_LOGGING) {
          LOG.warn("We slept " + slept + "ms instead of " + this.period +
              "ms, this is likely due to a long " +
              "garbage collecting pause and it's usually bad, see " +
              "http://hbase.apache.org/book.html#trouble.rs.runtime.zkexpired");
        }
      } catch(InterruptedException iex) {
        if (this.stopper.isStopped()) {
          return;
        }
      }
      //重新计算等待时间:等待周期减去已睡眠时间
      woke = (woke == -1)? System.currentTimeMillis(): woke;
      waitTime = this.period - (woke - startTime);
    }
    triggerWake = false;
  }

 现在回到Chore类,查看Chore类最重要的方法run():该方法中只要stopper不停止,while循环就会进行。首先,第一次进入循环时,标志位initialChoreComplete初始化为false,标志着Chore尚未初始化完毕,此时调用initialChore()做初始化工作,并返回初始化结果赋值给标志位initialChoreComplete,而第一次循环后的每次循环,会周期性的调用chore()方法,每次调用完chore()方法后,都通过睡眠器sleeper的sleep()方法,从每次进入while循环时获取的时刻startTime开始,休眠Chore构造函数传入的p时间。如果stopper已停止,或者发生Throwable异常,则Chore调用cleanup()完成清理工作

  public void run() {
    try {
      boolean initialChoreComplete = false;
      //只要stopper不停止,while就继续
      while (!this.stopper.isStopped()) {
        //开始时间
        long startTime = System.currentTimeMillis();
        try {
          //如果是第一次循环,则完成初始化工作
          if (!initialChoreComplete) {
            initialChoreComplete = initialChore();
          } else {
            //如果不是第一次循环,则调用chore()方法
            chore();
          }
        } catch (Exception e) {
          LOG.error("Caught exception", e);
          if (this.stopper.isStopped()) {
            continue;
          }
        }
        //睡眠一定的时间
        this.sleeper.sleep(startTime);
      }
    } catch (Throwable t) {
      LOG.fatal(getName() + "error", t);
    } finally {
      LOG.info(getName() + " exiting");
      cleanup();
    }
  }

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/682220.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号