2021SC@SDUSC
目录
一、引言
二、整体流程图
三、部分源码分析
一、引言
Hbase是一种基于LSM模型(Log-Structured Merge-Trees 即日志结构合并树)的分布式数据库。相比于Oracle普通索引所采用的B+树,LSM模型的一大特点就是,在读写之间采取一种平衡,牺牲部分读数据的性能,来大幅度的提升写数据的性能,所以Hbase写数据才能够如此快(将数据写入内存和日志文件后即立即返回)。
但是,将数据存放在内存和日志中是不妥当的,内存是非常有限且稀缺的并且重要的,持续的写入会造成内存的溢出,而日志的写入仅是由于内存数据系统宕机或进程退出后立刻消失而采取的一种保护性措施,不是作为最终数据持久化的手段,另外写入日志时仅仅是简单的追加,这会大大降低读数据的效率。
MemStore的flush就是为了解决上述问题而采取的一种有效措施。
二、整体流程图
三、部分源码分析
HRegion上flush的入口方法为flushCache()
public FlushResultImpl flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker,
FlushLifeCycleTracker tracker) throws IOException {...}
首先需要判断下HRegion的状态,如果Region正处于关闭状态,记录日志,并返回CANNOT_FLUSH的刷新结果
if (this.closing.get()) {
String msg = "Skipping flush on " + this + " because closing";
LOG.debug(msg);
return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false);
}
获取任务追踪器,并创建初始状态,设置任务追踪器的状态:请求Region读锁。
获取Region的读锁之后,阻塞等待刷新缓存的锁释放;
MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
status.enableStatusJournal(false);
status.setStatus("Acquiring readlock on region");
// block waiting for the lock for flushing cache
lock.readLock().lock();
再次判断HRegion的状态,如果Region已经下线,记录日志并返回CANNOT_FLUSH的结果
if (this.closed.get()) {
String msg = "Skipping flush on " + this + " because closed";
LOG.debug(msg);
status.abort(msg);
return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false);
}
如果协处理器不为空:
设置任务追踪器的状态:执行协处理器预刷写钩子preFlush()方法:Running coprocessor pre-flush hooks
执行协处理器预刷写钩子preFlush()方法
if (coprocessorHost != null) {
status.setStatus("Running coprocessor pre-flush hooks");
coprocessorHost.preFlush(tracker);
}
如果writestate不是flushing,且writestate的可以读取启用,将状态中的flushing设置为true,表示正在刷新,否则记录日志,并返回CANNOT_FLUSH的结果
synchronized (writestate) {
if (!writestate.flushing && writestate.writesEnabled) {
this.writestate.flushing = true;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("NOT flushing memstore for region " + this
+ ", flushing=" + writestate.flushing + ", writesEnabled="
+ writestate.writesEnabled);
}
String msg = "Not flushing since "
+ (writestate.flushing ? "already flushing"
: "writes not enabled");
status.abort(msg);
return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false);
}
}
调用internalFlushcache()方法,执行真正的flush
CollectionspecificStoresToFlush = forceFlushAllStores ? stores.values() : flushPolicy.selectStoresToFlush(); FlushResultImpl fs = internalFlushcache(specificStoresToFlush, status, writeFlushRequestWalMarker, tracker);
刷新结束后,如果协处理器不为空,设置状态,即Running post-flush coprocessor hooks,并执行协处理器的钩子方法postFlush()
if (coprocessorHost != null) {
status.setStatus("Running post-flush coprocessor hooks");
coprocessorHost.postFlush(tracker);
}
状态追踪器标记完成状态:Flush successful
if(fs.isFlushSucceeded()) {
flushesQueued.reset();
}
status.markComplete("Flush successful " + fs.toString());
return fs;
将writestate中的flushing、flushRequested均设置为false
synchronized (writestate) {
writestate.flushing = false;
this.writestate.flushRequested = false;
writestate.notifyAll();
}
释放读锁,并清空状态追踪器的状态
lock.readLock().unlock();
LOG.debug("Flush status journal for {}:n{}", this.getRegionInfo().getEncodedName(),
status.prettyPrintJournal());
status.cleanup();
返回刷新结果。
if(fs.isFlushSucceeded()) {
flushesQueued.reset();
}
status.markComplete("Flush successful " + fs.toString());
return fs;
//return语句在try中,所以会在finally语句执行后返回。



