2021SC@SDUSC
目录
Compact介绍
Compact定义
Compact分类
compact源码分析
Compact介绍 Compact定义
Compact是指Hbase表中HRegion上某个Column Family下,部分或全部HFiles的合并。它是由于数据在持续写入后,MemStore达到一定阈值,被flush到磁盘生成hfile,随着hfile的逐渐增多,如果不做处理,将会严重影响Hbase数据读取的效率。所以,在Hbase系统内部,需要定期在满足一定条件的情况下,或者人为手动触发,将这许多文件合并成一个大文件,即compact。
Compact分类
Compact分为两种:Minor Compact、Major Compact。
Minor Compact:选择部分小的、相邻的hfile合并为一个大文件;
Major Compat:将store中所有的hfile文件合并成一个大的文件,并且这个阶段会将过期的数据、已删除的数据和超出版本的数据进行物理上的清除。
compact源码分析
compact的发起方法是HRegion的compact()方法,HRegion的compact()方法:
- 首先确认合并上下文不为空,即合并请求request不为空,且请求request中所包含的文件不为空;
- 如果Region正在下线或者已经下线,记录日志,取消合并请求,返回false,取消合并调用的是Store的cancelRequestedCompaction()方法;
- 设置一个标志位:requestNeedsCancellation,表示是否需要撤销合并请求;
- 获取HRegion上lock的读锁;
- 从store中获取列族名;
- 根据列族名从stores中获取store,如果其和传入的store不相等,则记录warn日志,并返回false。此时,对应store已在该HRegion上重新初始化,我们要取消此次合并请求;
- 获取任务状态监控器,并记录状态:Compacting storename in regionname;
- 如果Region已下线,跳过合并,返回false;
- 设置一个状态位wasStateSet;
- 判断writestate,确认Region可写,并累加合并正在进行的数目;
- 任务状态监控器记录状态:Compacting store storename;
- 标志位requestNeedsCancellation设置为false,表示此时compact可以真正执行;
- 调用store的compact方法,执行合并;
- 如果标志位wasStateSet为true,则合并正在进行的数目writestate.compacting减一;并且如果没有合并在进行,则唤醒其他阻塞线程
- 任务状态监控器记录状态:Compaction complete;
- 返回处理结果true;
- 在finally中需要做如下处理:a)如果需要取消合并,调用Store的cancelRequestedCompaction()方法取消合并;b)清空状态跟踪器;c)释放读锁。
public boolean compact(CompactionContext compaction, Store store,
CompactionThroughputController throughputController, User user) throws IOException {
//确认合并上下文不为空,即合并请求request不为空,且请求request中所包含的文件不为空
assert compaction != null && compaction.hasSelection();
assert !compaction.getRequest().getFiles().isEmpty();
//如果Region正在下线或者已经下线,记录日志,并取消合并请求,返回false
//取消合并调用的是Store的cancelRequestedCompaction()方法
if (this.closing.get() || this.closed.get()) {
LOG.debug("Skipping compaction on " + this + " because closing/closed");
store.cancelRequestedCompaction(compaction);
return false;
}
//任务状态监控器
MonitoredTask status = null;
//标志位,请求需要撤销
boolean requestNeedsCancellation = true;
//阻塞,等待合并的锁
//这个lock锁是Region行为上的一个读写锁,加上这个锁,控制Region的整体行为,比如flush、compact、close等
//flush和compact使用的是读锁,是一个共享锁,意味着flush和compact可以同步进行,但是不能执行close,因为close是写锁,
//这是一个独占锁,一旦被占用,其他线程就不能发起flush、compact等操作
lock.readLock().lock();
try {
//获取列族名
byte[] cf = Bytes.toBytes(store.getColumnFamilyName());
//如果根据列族名从stores中获取的store,和传入的store不相等,则记录warn日志,并返回false
if (stores.get(cf) != store) {
//此时,对应store已在该HRegion上被重新初始化,我们要取消此次合并请求,这种情况可能是由于分裂事务回滚时造成的
LOG.warn("Store " + store.getColumnFamilyName() + " on region " + this
+ " has been re-instantiated, cancel this compaction request. "
+ " It may be caused by the roll back of split transaction");
return false;
}
//任务状态监控器记录状态:Compacting storename in regionname
status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this);
//如果Region已下线,跳过合并
if (this.closed.get()) {
String msg = "Skipping compaction on " + this + " because closed";
LOG.debug(msg);
status.abort(msg);
return false;
}
//状态位
boolean wasStateSet = false;
try {
//判断writestate,确认Region可写,并累加合并正在进行的数目
synchronized (writestate) {
if (writestate.writesEnabled) {
//如果Region可写,累加合并进行的数目,标志位wasStateSet设置为true
wasStateSet = true;
++writestate.compacting;
} else {
//如果Region不可写,记录log信息,舍弃该状态
String msg = "NOT compacting region " + this + ". Writes disabled.";
LOG.info(msg);
status.abort(msg);
return false;
}
}
LOG.info("Starting compaction on " + store + " in region " + this
+ (compaction.getRequest().isOffPeak()?" as an off-peak compaction":""));
doRegionCompactionPrep();
try {
//任务状态监控器记录状态:Compacting store storename
status.setStatus("Compacting store " + store);
//标志位requestNeedsCancellation设置为false,说明此时compact可以真正执行
requestNeedsCancellation = false;
//调用store的compact方法,执行合并
store.compact(compaction, throughputController, user);
} catch (InterruptedIOException iioe) {
String msg = "compaction interrupted";
LOG.info(msg, iioe);
status.abort(msg);
return false;
}
} finally {
if (wasStateSet) {
//如果合并正在进行的数目已经累加,合并正在进行的数目减一
synchronized (writestate) {
--writestate.compacting;
//如果没有合并在进行,唤醒其他阻塞线程
if (writestate.compacting <= 0) {
writestate.notifyAll();
}
}
}
}
//任务状态监控器记录状态:合并完成
status.markComplete("Compaction complete");
return true;
} finally {
try {
// 如果需要取消合并,调用Store的cancelRequestedCompaction()方法取消合并
if (requestNeedsCancellation) store.cancelRequestedCompaction(compaction);
//清空状态跟踪器
if (status != null) status.cleanup();
} finally {
//释放读锁
lock.readLock().unlock();
}
}
}
再来看一下上述方法的一个重要参数CompactionContext类型的compaction
CompactionContext是合并的上下文类。该类含有运行一个合并所必需的全部“物理”细节,其内部包含一个合并请求CompactionRequest类型的request,同时包含针对该请求的判断、获取、赋值方法,相关代码如下
//设置合并请求
public void forceSelect(CompactionRequest request) {
this.request = request;
}
//获取合并请求
public CompactionRequest getRequest() {
assert hasSelection();
return this.request;
}
//判断合并请求是否为空,意思也就是是否已选择文件
public boolean hasSelection() {
return this.request != null;
}
同时CompactionContext本身是一个抽象类,它有两个子类,分别为:位于DefaultStoreEngine中的DefaultCompactionContext和位于StripeStoreEngine中的StripeCompaction;
这两个CompactionContext的子类都分别实现了用于选择需要合并文件的select()方法、在已选择文件的基础上执行合并的compact()方法,有关这两个类会在后续博客中详细分析。



