2021SC@SDUSC
本文来研究如何选择一个HRegion进行flush以缓解MemStore压力,还有HRegion的flush是如何发起的。
上文中我们说到了flush处理线程时可能会调用flushoneForGlobalPressure()方法,按照一定策略,flush一个HRegion的MemStore,来降低MemStore的大小,从而预防一些异常情况的发生。这次我们重点分析一下flushoneForGlobalPressure()方法:
private boolean flushoneForGlobalPressure() {
SortedMap regionsBySize =
server.getCopyOfonlineRegionsSortedBySize();
Set excludedRegions = new HashSet();
boolean flushedOne = false;
while (!flushedOne) {
// Find the biggest region that doesn't have too many storefiles
// (might be null!)
HRegion bestFlushableRegion = getBiggestMemstoreRegion(
regionsBySize, excludedRegions, true);
// Find the biggest region, total, even if it might have too many flushes.
HRegion bestAnyRegion = getBiggestMemstoreRegion(
regionsBySize, excludedRegions, false);
if (bestAnyRegion == null) {
LOG.error("Above memory mark but there are no flushable regions!");
return false;
}
HRegion regionToFlush;
if (bestFlushableRegion != null &&
bestAnyRegion.memstoreSize.get() > 2 * bestFlushableRegion.memstoreSize.get()) {
// Even if it's not supposed to be flushed, pick a region if it's more than twice
// as big as the best flushable one - otherwise when we're under pressure we make
// lots of little flushes and cause lots of compactions, etc, which just makes
// life worse!
if (LOG.isDebugEnabled()) {
LOG.debug("Under global heap pressure: " +
"Region " + bestAnyRegion.getRegionNameAsString() + " has too many " +
"store files, but is " +
StringUtils.humanReadableInt(bestAnyRegion.memstoreSize.get()) +
" vs best flushable region's " +
StringUtils.humanReadableInt(bestFlushableRegion.memstoreSize.get()) +
". Choosing the bigger.");
}
regionToFlush = bestAnyRegion;
} else {
if (bestFlushableRegion == null) {
regionToFlush = bestAnyRegion;
} else {
regionToFlush = bestFlushableRegion;
}
}
Preconditions.checkState(regionToFlush.memstoreSize.get() > 0);
LOG.info("Flush of region " + regionToFlush + " due to global heap pressure");
flushedOne = flushRegion(regionToFlush, true);
if (!flushedOne) {
LOG.info("Excluding unflushable region " + regionToFlush +
" - trying to find a different region to flush.");
excludedRegions.add(regionToFlush);
}
}
return true;
}
此方法的处理过程大概如下:
- 获取RegionServer上的在线Region,然后根据Region的memstoreSize倒序排列,得到regionsBySize。
- 构造被排除的Region集合excludedRegions。
- flushedOne设置为false。
- 循环regionBySize,选择一个Menstore最大且不含太多storefiles的region作为bestFlushableRegion:有以下几种情况时直接跳过:当前region在excludedRegions列表中;当前region的写状态为正在flush;当前region的写状态不是写启用;需要检查StoreFile数目,且包含太多StoreFiles。其余情况返回该region。
- 循环regionsBySize,选择一个Memstore最大的region,即便是它包含太多storefiles,作为bestAnyRegion:有以下情况时直接跳过:当前region在excludedRegions列表中;当前region的写状态为正在flush,或者当前region的写状态不是写启用。其余情况返回该region。
- 在内存上阈值之上但是没有能够flush的region的话,直接返回false。
- 选择需要flush的region。
- 检测被选中region的memstoreSize是否大于零。
- 调用flushRegion(),针对单个region进行memstore的flush。
- flush失败则添加到excludedRegions集合中,避免在被选中。
以上是flushoneForGlobalPressure()方法,即按照一定策略选择一个HRegion进行memstore的flush以缓解memstore压力的方法。接下来是HRegion的flush如何发起的问题,首先看一下带一个参数的flushRegion()方法:
private boolean flushRegion(final FlushRegionEntry fqe) {
HRegion region = fqe.region;
if (!region.getRegionInfo().ismetaRegion() &&
isTooManyStoreFiles(region)) {
if (fqe.isMaximumWait(this.blockingWaitTime)) {
LOG.info("Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime) +
"ms on a compaction to clean up 'too many store files'; waited " +
"long enough... proceeding with flush of " +
region.getRegionNameAsString());
} else {
// If this is first time we've been put off, then emit a log message.
if (fqe.getRequeueCount() <= 0) {
// Note: We don't impose blockingStoreFiles constraint on meta regions
LOG.warn("Region " + region.getRegionNameAsString() + " has too many " +
"store files; delaying flush up to " + this.blockingWaitTime + "ms");
if (!this.server.compactSplitThread.requestSplit(region)) {
try {
this.server.compactSplitThread.requestSystemCompaction(
region, Thread.currentThread().getName());
} catch (IOException e) {
LOG.error(
"Cache flush failed for region " + Bytes.toStringBinary(region.getRegionName()),
RemoteExceptionHandler.checkIOException(e));
}
}
}
// Put back on the queue. Have it come back out of the queue
// after a delay of this.blockingWaitTime / 100 ms.
this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
// Tell a lie, it's not flushed but it's ok
return true;
}
}
return flushRegion(region, false);
}
方法流程如下:
- 如果region不是mataRegion且region上有太多storeFiles:
- isMaximumWait()判断阻塞时间,已阻塞达到或超过指定时间,记录日志并执行flush,跳到2,结束。
- 如果是第一次推迟,记录一条日志信息,然后对该HRegion请求分裂Split,分裂不成功的话再请求系统合并SystemCompaction。
- 将fqe放回到队列flushQueue,增加延迟时间900ms,到期后再从队列中取出来进行处理。
- 如果该Region被推迟进行flush,结果还不确定,应返回true。
- 调用2个参数的flushRegion()方法,通知HRegion执行flush。
接下来是带有两个参数的flushRegion()方法:
private boolean flushRegion(final HRegion region, final boolean emergencyFlush) {
long startTime = 0;
synchronized (this.regionsInQueue) {
FlushRegionEntry fqe = this.regionsInQueue.remove(region);
// Use the start time of the FlushRegionEntry if available
if (fqe != null) {
startTime = fqe.createTime;
}
if (fqe != null && emergencyFlush) {
// Need to remove from region from delay queue. When NOT an
// emergencyFlush, then item was removed via a flushQueue.poll.
flushQueue.remove(fqe);
}
}
if (startTime == 0) {
// Avoid getting the system time unless we don't have a FlushRegionEntry;
// shame we can't capture the time also spent in the above synchronized
// block
startTime = EnvironmentEdgeManager.currentTime();
}
lock.readLock().lock();
try {
notifyFlushRequest(region, emergencyFlush);
HRegion.FlushResult flushResult = region.flushcache();
boolean shouldCompact = flushResult.isCompactionNeeded();
// We just want to check the size
boolean shouldSplit = region.checkSplit() != null;
if (shouldSplit) {
this.server.compactSplitThread.requestSplit(region);
} else if (shouldCompact) {
server.compactSplitThread.requestSystemCompaction(
region, Thread.currentThread().getName());
}
if (flushResult.isFlushSucceeded()) {
long endTime = EnvironmentEdgeManager.currentTime();
server.metricsRegionServer.updateFlushTime(endTime - startTime);
}
} catch (DroppedSnapshotException ex) {
// Cache flush can fail in a few places. If it fails in a critical
// section, we get a DroppedSnapshotException and a replay of wal
// is required. Currently the only way to do this is a restart of
// the server. Abort because hdfs is probably bad (Hbase-644 is a case
// where hdfs was bad but passed the hdfs check).
server.abort("Replay of WAL required. Forcing server shutdown", ex);
return false;
} catch (IOException ex) {
LOG.error("Cache flush failed" +
(region != null ? (" for region " + Bytes.toStringBinary(region.getRegionName())) : ""),
RemoteExceptionHandler.checkIOException(ex));
if (!server.checkFileSystem()) {
return false;
}
} finally {
lock.readLock().unlock();
wakeUpIfBlocking();
}
return true;
}
方法流程如下:
- 从regionsInQueue中移除对应的HRegion信息
- 获取flush的开始时间
- 如果是紧急刷新,需要从flushQueue队列中移除对应的fqe否则,fqe将通过flushQueue.poll()移除
- 如果开始时间为null,获取flush的开始时间
- 上读锁
- 通过监听器Listener通知flush请求者flush的type
- 调用HRegion的flushcache()方法执行MemStore的flush,获得flush结果
- 根据flush结果判断是否应该进行合并compact(标志位shouldCompact)
- 调用HRegion的checkSplit()方法检测是否应该进行分裂split(标志位shouldSplit)
- 通过两个标志位判断,必要的情况下,先进行split,再进行system compact
- 若flush成功,获取flush结束时间,计算耗时,记录HRegion上的度量信息
- 释放读锁,唤醒阻塞的其他线程。
以上。
如有错误,欢迎指正。



