2021SC@SDUSC
在第四篇文章中我们探究了cacheFlusher怎么初始化,现在我们看一下cacheFlusher如何处理flush请求。
通过前一篇的分析我们知道,在MemStoreFlusher内部,存在了两个存储flush请求及其HRegion封装类的队列和集合:flushQueue和regionsInQueue,而MemStoreFlusher对外提供了一个requestFlush()方法:
public void requestFlush(HRegion r) {
synchronized (regionsInQueue) {
if (!regionsInQueue.containsKey(r)) {
FlushRegionEntry fqe = new FlushRegionEntry(r);
this.regionsInQueue.put(r, fqe);
this.flushQueue.add(fqe);
}
}
}
requestFlush()方法添加了一个flush region的请求至MemStoreFlusher内部队列。主要流程如下:
1、用synchronized对regionsInQueue进行线程同步
2、判断regionsInQueue中是否存在对应的HRegion,如果regionsInQueue集合中存在则返回
3、将HRegion类型的r封装成FlushRegionEntry类型的fqe
4、将HRegion->FlushRegionEntry的对应关系添加到regionsInQueue集合
5、将flush请求FlushRegionEntry添加到flushQueue队列
这里flush的请求已经被添加至flushQueue队列,相当于生产者生产了产品等待消费者消费,而消费者是由FlushHandler线程担任的。既然是线程,那么处理的逻辑肯定在其run()方法内,但是在研究其run()方法前,我们先看下flushQueue中存储的是什么:
我们再回顾下flushQueue的定义,它是一个存储了FlushQueueEntry的队列DelayQueue。FlushQueueEntry是继承了Delayed的一个接口(Delayed的另一个实现类是WakeupFlushThread)。不过在介绍它们之前,我们先看一下flushQueue对应的队列类型---Java中的DelayQueue。
DelayQueue是一个无界的BlockingQueue,其内部存储的是实现了Delayed接口的对象。所以,FlushQueueEntry必须实现java的Delayed接口。这种队列中的成员有一个最大特点,就是只有在其到期后才能出列,并且队列成员有序(按照延迟到期时间的长短排序)。那么如何判断成员是否到期呢?对应成员对象的getDelay()方法返回一个小于等于0的值,就说明对应对象在队列中已到期。
既然DelayQueue中存储的成员对象都是有序的,那么实现了Delayed接口的类必须能够排序,并且需实现上述getDelay()方法,判断队内成员是否到期。
接下来,我们来研究下WakeupFlushThread和FlushRegionEntry。
WakeupFlushThread代码如下:
static class WakeupFlushThread implements FlushQueueEntry {
@Override
public long getDelay(TimeUnit unit) {
return 0;
}
@Override
public int compareTo(Delayed o) {
return -1;
}
@Override
public boolean equals(Object obj) {
return (this == obj);
}
}
此方法是作为一个占位符或令牌插入到刷新队列flushQueue,以确保FlushHandler不会休眠。而且,其getDelay()方法返回值为0,说明其不存在延迟时间,入列后即可出列。而它的compareTo()方法返回的值是-1,说明它与其它WakeupFlushThread在队内的顺序是等价的。
FlushRegionEntry定义如下:
static class FlushRegionEntry implements FlushQueueEntry {
private final HRegion region;
private final long createTime;
private long whenToExpire;
private int requeueCount = 0;
FlushRegionEntry(final HRegion r) {
this.region = r;
this.createTime = EnvironmentEdgeManager.currentTime();
this.whenToExpire = this.createTime;
}
public boolean isMaximumWait(final long maximumWait) {
return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait;
}
public int getRequeueCount() {
return this.requeueCount;
}
public FlushRegionEntry requeue(final long when) {
this.whenToExpire = EnvironmentEdgeManager.currentTime() + when;
this.requeueCount++;
return this;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.whenToExpire - EnvironmentEdgeManager.currentTime(),
TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed other) {
// Delay is compared first. If there is a tie, compare region's hash code
int ret = Long.valueOf(getDelay(TimeUnit.MILLISECONDS) -
other.getDelay(TimeUnit.MILLISECONDS)).intValue();
if (ret != 0) {
return ret;
}
FlushQueueEntry otherEntry = (FlushQueueEntry) other;
return hashCode() - otherEntry.hashCode();
}
@Override
public String toString() {
return "[flush region " + Bytes.toStringBinary(region.getRegionName()) + "]";
}
@Override
public int hashCode() {
int hash = (int) getDelay(TimeUnit.MILLISECONDS);
return hash ^ region.hashCode();
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
Delayed other = (Delayed) obj;
return compareTo(other) == 0;
}
}
}
FlushRegionEntry类的对象在初始化时,createTime设为当前时间,whenToExpire也为设置当前时间,判断是否到期的getDelay()方法为whenToExpire减去createTime(也就是说首次入列即可出列)。另外,它的compareTo()方法,也是根据getDelay()方法来决定顺序,whenToExpire时间一致的话,根据hashCode()来排序。另外,此类还提供了类似重新入列的方法,重新入列次数requeueCount加1,whenToExpire设置为当前时间加上参数when。
最后,我们看一下flush请求的实际处理流程:
@Override
public void run() {
while (!server.isStopped()) {
FlushQueueEntry fqe = null;
try {
wakeupPending.set(false);
fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
if (fqe == null || fqe instanceof WakeupFlushThread) {
if (isAboveLowWaterMark()) {
LOG.debug("Flush thread woke up because memory above low water="
+ StringUtils.humanReadableInt(globalMemStoreLimitLowMark));
if (!flushoneForGlobalPressure()) {
Thread.sleep(1000);
wakeUpIfBlocking();
}
wakeupFlushThread();
}
continue;
}
FlushRegionEntry fre = (FlushRegionEntry) fqe;
if (!flushRegion(fre)) {
break;
}
} catch (InterruptedException ex) {
continue;
} catch (ConcurrentModificationException ex) {
continue;
} catch (Exception ex) {
LOG.error("Cache flusher failed for entry " + fqe, ex);
if (!server.checkFileSystem()) {
break;
}
}
}
synchronized (regionsInQueue) {
regionsInQueue.clear();
flushQueue.clear();
}
wakeUpIfBlocking();
LOG.info(getName() + " exiting");
}
处理流程如下;
1、HRegionServer未停止时,此方法一直运行
2、将wakeupPending设置为false
3、从flushQueue队列中拉取一个FlushQueueEntry。,如果其为空,或者为WakeupFlushThread,则通过isAboveLowWaterMark()方法判断全局MemStore大小,如果高于限制值的低水平线就调用flushoneForGlobalPressure()方法,按照一定策略,flush一个HRegion的MemStore,降低MemStore大小,并入列另一个令牌,以使该线程之后再次被唤醒;如果不为空且不为WakeupFlushThread的话,转化为FlushRegionEntry类型:调用flushRegion()方法,结果为false的话,跳出循环
4、循环结束清空regionsInQueue和flushQueue
5、唤醒所有的等待者,让它们能够看到close标志
6、记录日志
综上,WakeupFlushThread的主要是作为一个占位符或令牌插入到刷新队列flushQueue,以确保FlushHandler不会休眠,实际上WakeupFlushThread还有其他的作用,在FlushHandler线程不断的poll刷新队列flushQueue中的元素时,如果获取到的是一个WakeupFlushThread,它会发起 一个检测,即RegionServer的全局MemStore大小是否超过低水平线,如果未超过,WakeupFlushThread仅仅起到了一个占位符的作用,否则,WakeupFlushThread按照一定策略选择该RegionServer上的一个Region刷新MemStore,以缓解RegionServer内存压力。
如有错误,欢迎指正。



