2021SC@SDUSC
目录
Compact发起时机
2021SC@SDUSC
在这学期的最后一篇博客将compact收尾
Compact发起时机
上一篇博客已经把Chore类分析完了,这篇博客回到HRegionServer的成员变量Chore compactionChecker
compactionChecker的初始化在HRegionServer的initializeThreads()方法,具体代码为
this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this);
可以看到compactionChecker被初始化为CompactionChecker类的一个对象,查看CompactionChecker的构造方法:首先用父类Chore的构造方法,设置period和stopper,其中工period是HRegionServer的threadWakeFrequency变量,取自参数hbase.server.thread.wakefrequency,默认为10s,然后将HRegionServer赋值给instance变量,并设置major合并优先级
CompactionChecker(final HRegionServer h, final int sleepTime,
final Stoppable stopper) {
//调用父类Chore的构造方法
super("CompactionChecker", sleepTime, h);
//将HRegionServer赋值给instance变量
this.instance = h;
LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime));
//设置major合并优先级
this.majorCompactPriority = this.instance.conf.
getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
DEFAULT_PRIORITY);
}
分析完compactionChecker的初始化之后,再来看一下它是如何工作的
查看CompactionChecker.chore()方法:该方法周期性的检测HRegionServer中所有的onlineRegion的每个HStore,调用Store的getCompactionCheckMultiplier()方法,获取合并检查倍增器multiplier,当迭代因子iteration为合并检查倍增器multiplier的整数倍时,发起针对该HStore是否需要compact的检查,如果需要合并,则根据合并的种类,确定发起何种合并请求,并且如果是Major合并的话,需要确定优先级
@Override
protected void chore() {
//检测HRegionServer的onlineRegions中的每个HRegion
for (HRegion r : this.instance.onlineRegions.values()) {
if (r == null)
continue;
//取出每个region的store
for (Store s : r.getStores().values()) {
try {
//调用Store的getCompactionCheckMultiplier()方法,获取合并检查倍增器multiplier
long multiplier = s.getCompactionCheckMultiplier();
//确保合并检查倍增器multiplier大于0
assert multiplier > 0;
//如果迭代因子iteration不是multiplier的整数倍,则直接进入下一次循环,只有当iteration是合并检查倍增器multiplier的整数倍时,才会发起检查
if (iteration % multiplier != 0) continue;
//如果需要合并,则发起SystemCompaction请求
if (s.needsCompaction()) {
this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()
+ " requests compaction");
} else if (s.isMajorCompaction()) {
//如果是Major合并的话,根据配置的major合并优先级majorCompactPriority确定发起合并请求
//如果设置的合并优先级为Integer.MAX_VALUE或者HRegion的合并优先级小于设置值的话
if (majorCompactPriority == DEFAULT_PRIORITY
|| majorCompactPriority > r.getCompactPriority()) {
//使用默认优先级发起合并请求
this.instance.compactSplitThread.requestCompaction(r, s, getName()
+ " requests major compaction; use default priority", null);
} else {
//使用设置的优先级发起合并请求
this.instance.compactSplitThread.requestCompaction(r, s, getName()
+ " requests major compaction; use configured priority",
this.majorCompactPriority, null, null);
}
}
} catch (IOException e) {
LOG.warn("Failed major compaction check on " + r, e);
}
}
}
//迭代计数器累加
iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
}
}
再来看一下该方法的一些细节
- 首先,onlineRegions是指HRegionServer上存储的所有能够提供有效服务的在线Region集合;
- 该方法遍历所有的HRegion时会将HRegion上每个HStore都进行遍历,但是它并不是对每个HRegion上所有HStore都进行检查,而是利用取余算法选择性地对HRegion上的HStore进行检查。而这个过程的关键就是合并检查倍增器multiplier,该值如果配置为1的话,即为每个都进行检查,如果配置成2的话,则是隔一个检查一个,依次类推。合并检查倍增器multiplier是通过HStore的getCompactionCheckMultiplier()方法获取的,该方法会返回的HStore的compactionCheckMultiplier变量,而compactionCheckMultiplier是通过参数hbase.server.compactchecker.interval.multiplier完成初始化的。具体代码如下:
public long getCompactionCheckMultiplier() { return this.compactionCheckMultiplier; }this.compactionCheckMultiplier = conf.getInt( COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY, DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER); if (this.compactionCheckMultiplier <= 0) { LOG.error("Compaction check period multiplier must be positive, setting default: " + DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER); this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER; - 通过HStore的needsCompaction()方法判断是否需要进行compact,该方法会经过一系列的调用,最终的实现逻辑是storeFile的总数减去正在合并的文件的数目,如果这个数目超过配置中合并文件的最小值,则视为需要发起合并请求。相关代码不再贴出
- 如果需要合并,则调用CompactSplitThread的requestSystemCompaction()方法发起SystemCompaction请求,如果是Major合并的话,则根据配置的major合并优先级majorCompactPriority确定发起合并请求,继而调用CompactSplitThread的requestCompaction()方法发起合并请求
至此,对于compact的全部分析已经结束



