2021SC@SDUSC
在前几篇文章中我们已经介绍了HRegion上MemStore的flush流程和HRegionServer上MenStore的flush流程。本文我们将探究Hbase中MemStore的flush的发起时机。
前文中我们已知,flush的请求会通过requestFlush()或requestDelayedFlush()方法被添加到MemStoreFlusher的flushQueue队列中,然后由其内部的FlushHandler线程组消费,对需要flush的HRegion进行处理。所以调用MemStoreFlusher这两个方法的地方肯定就是MemStore发起flush的部分时机,不过也可能会存在部分操作和内部流程直接调用HRegion的flushcache()触发flush。下面,我们开始总结下会触发flush的操作或者内部流程,包括flush的判断条件等。
一、通过将RequsetFlush请求添加到MemStoreFlusher的flushQueue队列
1. 单个put
HRegion中处理Put的put(Put put)方法再开始执行操作前会调用checkResources()检查HRegion的MemStore大小是否超过一定的阈值,超过了就会调用requestFlush()发起对该HRegion的MemStore的flush请求,并抛出RegionTooBusyException异常,阻止该操作继续。checkResources()方法代码如下:
private void checkResources()
throws RegionTooBusyException {
// If catalog region, do not impose resource constraints or block updates.
if (this.getRegionInfo().ismetaRegion()) return;
if (this.memstoreSize.get() > this.blockingMemStoreSize) {
blockedRequestsCount.increment();
requestFlush();
throw new RegionTooBusyException("Above memstore limit, " +
"regionName=" + (this.getRegionInfo() == null ? "unknown" :
this.getRegionInfo().getRegionNameAsString()) +
", server=" + (this.getRegionServerServices() == null ? "unknown" :
this.getRegionServerServices().getServerName()) +
", memstoreSize=" + memstoreSize.get() +
", blockingMemStoreSize=" + blockingMemStoreSize);
}
}
首先判断是否是meta Region,如果是则不实施资源约束或阻塞更新;
如果Region当前内存大小超过阈值,则更新阻塞请求计数器,发起刷新MemStore请求,然后抛出RegionTooBusyException异常,阻止该操作。
内存大小memstoreSize是当前时刻HRegion上MemStore的大小,他是在Put等操作中通过调用addAndGetGlobalMemstoreSize()方法实时更新的:
public long addAndGetGlobalMemstoreSize(long memStoreSize) {
if (this.rsAccounting != null) {
rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
}
return this.memstoreSize.addAndGet(memStoreSize);
}
blockingMemStoreSize是HRegion上设定的MemStore的一个阈值,当MemStore的大小超过这个阈值时,将会阻塞数据更新操作。其定义在HRegion上线背构造时需要调用的一个setHTableSpecificConf()中:
this.blockingMemStoreSize = this.memstoreFlushSize *
conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER);
由代码可见,它是由memstoreFlushSize乘以一个比例来计算的(没配置的话默认为4)
那么memstoreFlushSize又是什么呢?它是HRegion上设定的一个阈值,当MemStore的大小超过它时,将会发起flush请求。它的计算首先是由Table决定的,即每个表可以设定自己的memstoreFlushSize:
if (this.htableDescriptor == null) return;
long flushSize = this.htableDescriptor.getMemStoreFlushSize();
if (flushSize <= 0) {
flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
}
this.memstoreFlushSize = flushSize;
接下来我们来看requestFlush()是如何发送的:
private void requestFlush() {
if (this.rsServices == null) {
return;
}
synchronized (writestate) {
if (this.writestate.isFlushRequested()) {
return;
}
writestate.flushRequested = true;
}
this.rsServices.getFlushRequester().requestFlush(this);
if (LOG.isDebugEnabled()) {
LOG.debug("Flush requested on " + this);
}
}
代码逻辑如下:
- 如果HRegion的rsServices为空,直接返回
- 检查writestate的状态,如果为flushRequested,直接返回,避免重复请求,否则将writestate的flushRequested设置为true,并通过rsServices获得FlushRequester,调用其requestFlush()方法,将HRegion传入,发起flush请求。
继续回到put()方法,依次调用doBatchMutate()、batchMutate()、batchMutate()方法,执行Put操作。batchMutate()是针对批量的Put、Delete等操作而专门设计的一个方法,只不过单个的Put等操作传入的是一个只包含一个操作的数组。该方法会在批量操作未全部完成前一直循环,每次循环时,都会调用checkResources()检测MemStore,并调用doMiniBatchMutation()方法完成操作并同步更新HRegion的MemStore大小,获取其值为newSize,最后通过isFlushSize()方法判断是否需要发起一个flush请求来决定是否调用requestFlush()方法:
**
* Perform a batch of mutations.
* It supports only Put and Delete mutations and will ignore other types passed.
* @param batchOp contains the list of mutations
* @return an array of OperationStatus which internally contains the
* OperationStatusCode and the exceptionMessage if any.
* @throws IOException
*/
OperationStatus[] batchMutate(BatchOperationInProgress> batchOp) throws IOException {
boolean initialized = false;
Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE;
startRegionOperation(op);
try {
while (!batchOp.isDone()) {
if (!batchOp.isInReplay()) {
checkReadonly();
}
checkResources();
if (!initialized) {
this.writeRequestsCount.add(batchOp.operations.length);
if (!batchOp.isInReplay()) {
doPreMutationHook(batchOp);
}
initialized = true;
}
long addedSize = doMiniBatchMutation(batchOp);
long newSize = this.addAndGetGlobalMemstoreSize(addedSize);
if (isFlushSize(newSize)) {
requestFlush();
}
}
} finally {
closeRegionOperation(op);
}
return batchOp.retCodeDetails;
}
isFlushSize()方法是通过判断当前MemStore大小newSize是否超过memstoreFlushSize来决定的:
private boolean isFlushSize(final long size) {
return size > this.memstoreFlushSize;
}
至此,单个put操作结束。
2、单个Delete操作
与单个put大概一致:
//
// set() methods for client use.
//
public void delete(Delete delete)
throws IOException {
checkReadonly();
checkResources();
startRegionOperation(Operation.DELETE);
try {
delete.getRow();
// All edits for the given row (across all column families) must happen atomically.
doBatchMutate(delete);
} finally {
closeRegionOperation(Operation.DELETE);
}
}
也是先调用checkResources()检查MemStore,再调用doBatchMutate()进行处理。
3、checkAndMutate/checkAndRowMutate操作
4.单个append操作
5.单个increment操作
6.批量操作
综上,涉及到数据更新的操作均是先检查MemStore,如果高于阈值,就会发送flush请求并抛出异常,阻塞操作。操作执行完毕后,也会根据MemStore增长情况,判断是否达到了触发flush的条件,进而决定是否发送flush请求。
二、通过直接调用HRegion的flushcache()方法
1.命令行等外部触发
这种是通过RegionServer上RSRpcServices的flushRegion()方法发起的:
@Override
@QosPriority(priority=HConstants.ADMIN_QOS)
public FlushRegionResponse flushRegion(final RpcController controller,
final FlushRegionRequest request) throws ServiceException {
try {
checkOpen();
requestCount.increment();
HRegion region = getRegion(request.getRegion());
LOG.info("Flushing " + region.getRegionNameAsString());
boolean shouldFlush = true;
if (request.hasIfOlderThanTs()) {
shouldFlush = region.getLastFlushTime() < request.getIfOlderThanTs();
}
FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
if (shouldFlush) {
long startTime = EnvironmentEdgeManager.currentTime();
HRegion.FlushResult flushResult = region.flushcache();
if (flushResult.isFlushSucceeded()) {
long endTime = EnvironmentEdgeManager.currentTime();
regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
}
boolean result = flushResult.isCompactionNeeded();
if (result) {
regionServer.compactSplitThread.requestSystemCompaction(region,
"Compaction through user triggered flush");
}
builder.setFlushed(result);
}
builder.setLastFlushTime(region.getLastFlushTime());
return builder.build();
} catch (DroppedSnapshotException ex) {
// Cache flush can fail in a few places. If it fails in a critical
// section, we get a DroppedSnapshotException and a replay of wal
// is required. Currently the only way to do this is a restart of
// the server.
regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
throw new ServiceException(ex);
} catch (IOException ie) {
throw new ServiceException(ie);
}
}
该方法在判断应该进行flush后,会调用HRegion的flushcache()方法对其Memstore进行flush处理。
2、Region合并
在RSRpcServices中存在Region合并时调用的mergeRegions()方法,在其内部会先后调用regionA和regionB的flushcache()方法去flush每个Region上的MemStore,然后再执行Region合并:
long startTime = EnvironmentEdgeManager.currentTime();
HRegion.FlushResult flushResult = regionA.flushcache();
if (flushResult.isFlushSucceeded()) {
long endTime = EnvironmentEdgeManager.currentTime();
regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
}
startTime = EnvironmentEdgeManager.currentTime();
flushResult = regionB.flushcache();
if (flushResult.isFlushSucceeded()) {
long endTime = EnvironmentEdgeManager.currentTime();
regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
}
regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible,
masterSystemTime);
3、Region分裂
在RSRpcServices中存在Region分裂时调用的splitRegion()方法,也是先调用flushcache()将Region上的memstore刷新:
@Override
@QosPriority(priority=HConstants.ADMIN_QOS)
public SplitRegionResponse splitRegion(final RpcController controller,
final SplitRegionRequest request) throws ServiceException {
try {
checkOpen();
requestCount.increment();
HRegion region = getRegion(request.getRegion());
region.startRegionOperation(Operation.SPLIT_REGION);
LOG.info("Splitting " + region.getRegionNameAsString());
long startTime = EnvironmentEdgeManager.currentTime();
HRegion.FlushResult flushResult = region.flushcache();
if (flushResult.isFlushSucceeded()) {
long endTime = EnvironmentEdgeManager.currentTime();
regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
}
byte[] splitPoint = null;
if (request.hasSplitPoint()) {
splitPoint = request.getSplitPoint().toByteArray();
}
region.forceSplit(splitPoint);
regionServer.compactSplitThread.requestSplit(region, region.checkSplit());
return SplitRegionResponse.newBuilder().build();
} catch (DroppedSnapshotException ex) {
regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
throw new ServiceException(ex);
} catch (IOException ie) {
throw new ServiceException(ie);
}
}
4、利用Bulk加载HFile
Bulk是Hbase直接加载HFile存储数据的一种高速、实用的手段。在HRegion中的bulkLoadHFiles()方法中,也会调用flushcache()方法刷新HRegion上的MemStore内存:
if (assignSeqId) {
FlushResult fs = this.flushcache();
if (fs.isFlushSucceeded()) {
seqId = fs.flushSequenceId;
} else if (fs.result == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
seqId = fs.flushSequenceId;
} else {
throw new IOException("Could not bulk load with an assigned sequential ID because the " +
"flush didn't run. Reason for not flushing: " + fs.failureReason);
}
}
5、做Table的快照
在做表的快照时,会将对应Table中涉及到的Region的MemStore进行flush,做这项工作的是FlushSnapshotSubprocedure类中的RegionSnapshotTask,它在call()方法中,当snapshotSkipFlush为false时,会调用HRegion的flushcache()方法,对MemStore进行flush:
if (snapshotSkipFlush) {
LOG.debug("take snapshot without flush memstore first");
} else {
LOG.debug("Flush Snapshotting region " + region.toString() + " started...");
region.flushcache();
}
至此,memStore的flush发起时机、判断条件等内容基本就这些了,如有错误,欢迎指正。



