2021SC@SDUSC
目录
Compact回顾
Compact发起时机
2021SC@SDUSC
过去的五篇里分析了compact的流程,从这篇开始分析compact的发起时机、判断条件。
Compact回顾
先来回顾一下compact的定义与作用。Compact是指Hbase表中HRegion上某个Column Family下,部分或全部HFiles的合并。它是由于数据在持续写入后,MemStore达到一定阈值,被flush到磁盘生成hfile,随着hfile的逐渐增多,如果不做处理,将会严重影响Hbase数据读取的效率。所以,在Hbase系统内部,需要定期在满足一定条件的情况下,或者人为手动触发,将这许多文件合并成一个大文件,即compact。Compact分为两种:Minor Compact、Major Compact。Minor Compact是指选择部分小的、相邻的hfile合并为一个大文件;Major Compat是指将store中所有的hfile文件合并成一个大的文件,并且这个阶段会将过期的数据、已删除的数据和超出版本的数据进行物理上的清除。
Compact发起时机
在HRegionServer内有一个成员变量Chore compactionChecker,compactionChecker负责检查compact请求,查看Chore的定义:可以看出Chore继承自HasThread类,HasThread类是一个实现了Runnable接口的抽象类,并且定义了一个抽象的run()方法。所以,Chore是一个线程。再结合源码中给的注释,可以看出 a)Chore是定期在Hbase中执行的一个任务;b)Chore在它所在的线程内执行;c)这个基础抽象类提供了loop循环和sleep机制。
public abstract class Chore extends HasThread {
}
再来查看一下Chore的成员变量:Chore提供的sleep机制就是依靠sleeper来实现的,而stopper可以是任何实现了Stoppable接口的类的实例
private final Log LOG = LogFactory.getLog(this.getClass()); private final Sleeper sleeper; protected final Stoppable stopper;
再来看一下Chore的构造函数:可以看到Chore的构造函数的构造函数需要String name, final int p, final Stoppable stopper三个参数。其中name是Chore的名称,而p和stopper用来构造一个sleeper
public Chore(String name, final int p, final Stoppable stopper) {
super(name);
if (stopper == null){
throw new NullPointerException("stopper cannot be null");
}
this.sleeper = new Sleeper(p, stopper);
this.stopper = stopper;
}
接下来看一下Sleeper类,首先查看Sleeper类的成员变量:其中period表示睡眠周期,通过上述构造方法中的p赋值;sleepLock是一个Object对象,依靠它的wait()方法,可以实现对象等待一段时间;triggerWake是一个标志位,将其设置为true,可以跳出睡眠,重新复苏
private final Log LOG = LogFactory.getLog(this.getClass().getName()); private final int period; private final Stoppable stopper; private static final long MINIMAL_DELTA_FOR_LOGGING = 10000; private final Object sleepLock = new Object(); private boolean triggerWake = false;
再来看一下Sleep类最重要的一个方法——sleep(),查看sleep()方法:该方法会根据参数睡眠的起始时间startTime,结合睡眠器构造时设定好的睡眠周期period,以及当前时间now,计算出等待时间waitTime。而后,在一个等待时间waitTime大于0的while循环内,首先判断标志位triggerWake,如果其为true,则break,复位triggerWake并停止休眠,否则,利用sleepLock的wait()方法休眠指定时间waitTime,直到时间结束或者有其他线程设置triggerWake标志位为true并通过sleepLock的notifyAll()方法唤醒sleepLock对象,让其wait()方法抛出InterruptedException异常,继而重新计算等待时间,并进入下一个循环。此时,标志位triggerWake已设置为true,直接跳出循环,结束休眠
public void sleep(final long startTime) {
//如果stopper已停止,则直接返回
if (this.stopper.isStopped()) {
return;
}
long now = System.currentTimeMillis();
long waitTime = this.period - (now - startTime);
//如果等待时间waitTime超过周期period,那么直接将period赋值给waitTime,并记录警告信息
if (waitTime > this.period) {
LOG.warn("Calculated wait time > " + this.period +
"; setting to this.period: " + System.currentTimeMillis() + ", " +
startTime);
waitTime = this.period;
}
//等待时间waitTime大于0则一直循环
while (waitTime > 0) {
long woke = -1;
try {
//判断标志位triggerWake,如果其他线程已唤醒该睡眠期,则跳出循环,复位triggerWake为fale,直接返回,不再睡眠
synchronized (sleepLock) {
if (triggerWake) break;
sleepLock.wait(waitTime);
}
//计算已睡眠时间slept
woke = System.currentTimeMillis();
long slept = woke - now;
//如果slept时间超出周期10s,则记录警告信息
if (slept - this.period > MINIMAL_DELTA_FOR_LOGGING) {
LOG.warn("We slept " + slept + "ms instead of " + this.period +
"ms, this is likely due to a long " +
"garbage collecting pause and it's usually bad, see " +
"http://hbase.apache.org/book.html#trouble.rs.runtime.zkexpired");
}
} catch(InterruptedException iex) {
if (this.stopper.isStopped()) {
return;
}
}
//重新计算等待时间:等待周期减去已睡眠时间
woke = (woke == -1)? System.currentTimeMillis(): woke;
waitTime = this.period - (woke - startTime);
}
triggerWake = false;
}
现在回到Chore类,查看Chore类最重要的方法run():该方法中只要stopper不停止,while循环就会进行。首先,第一次进入循环时,标志位initialChoreComplete初始化为false,标志着Chore尚未初始化完毕,此时调用initialChore()做初始化工作,并返回初始化结果赋值给标志位initialChoreComplete,而第一次循环后的每次循环,会周期性的调用chore()方法,每次调用完chore()方法后,都通过睡眠器sleeper的sleep()方法,从每次进入while循环时获取的时刻startTime开始,休眠Chore构造函数传入的p时间。如果stopper已停止,或者发生Throwable异常,则Chore调用cleanup()完成清理工作
public void run() {
try {
boolean initialChoreComplete = false;
//只要stopper不停止,while就继续
while (!this.stopper.isStopped()) {
//开始时间
long startTime = System.currentTimeMillis();
try {
//如果是第一次循环,则完成初始化工作
if (!initialChoreComplete) {
initialChoreComplete = initialChore();
} else {
//如果不是第一次循环,则调用chore()方法
chore();
}
} catch (Exception e) {
LOG.error("Caught exception", e);
if (this.stopper.isStopped()) {
continue;
}
}
//睡眠一定的时间
this.sleeper.sleep(startTime);
}
} catch (Throwable t) {
LOG.fatal(getName() + "error", t);
} finally {
LOG.info(getName() + " exiting");
cleanup();
}
}



