栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

2021SC@SDUSC Hbase(五)项目代码分析- flush

2021SC@SDUSC Hbase(五)项目代码分析- flush

2021SC@SDUSC

        

        在第四篇文章中我们探究了cacheFlusher怎么初始化,现在我们看一下cacheFlusher如何处理flush请求。

        通过前一篇的分析我们知道,在MemStoreFlusher内部,存在了两个存储flush请求及其HRegion封装类的队列和集合:flushQueue和regionsInQueue,而MemStoreFlusher对外提供了一个requestFlush()方法:

public void requestFlush(HRegion r) {
   synchronized (regionsInQueue) {
      if (!regionsInQueue.containsKey(r)) {
        FlushRegionEntry fqe = new FlushRegionEntry(r);
        this.regionsInQueue.put(r, fqe);
        this.flushQueue.add(fqe);
      }
   }
}

        requestFlush()方法添加了一个flush region的请求至MemStoreFlusher内部队列。主要流程如下:
        1、用synchronized对regionsInQueue进行线程同步

        2、判断regionsInQueue中是否存在对应的HRegion,如果regionsInQueue集合中存在则返回

        3、将HRegion类型的r封装成FlushRegionEntry类型的fqe

        4、将HRegion->FlushRegionEntry的对应关系添加到regionsInQueue集合

        5、将flush请求FlushRegionEntry添加到flushQueue队列

        这里flush的请求已经被添加至flushQueue队列,相当于生产者生产了产品等待消费者消费,而消费者是由FlushHandler线程担任的。既然是线程,那么处理的逻辑肯定在其run()方法内,但是在研究其run()方法前,我们先看下flushQueue中存储的是什么:

        我们再回顾下flushQueue的定义,它是一个存储了FlushQueueEntry的队列DelayQueue。FlushQueueEntry是继承了Delayed的一个接口(Delayed的另一个实现类是WakeupFlushThread)。不过在介绍它们之前,我们先看一下flushQueue对应的队列类型---Java中的DelayQueue。

        DelayQueue是一个无界的BlockingQueue,其内部存储的是实现了Delayed接口的对象。所以,FlushQueueEntry必须实现java的Delayed接口。这种队列中的成员有一个最大特点,就是只有在其到期后才能出列,并且队列成员有序(按照延迟到期时间的长短排序)。那么如何判断成员是否到期呢?对应成员对象的getDelay()方法返回一个小于等于0的值,就说明对应对象在队列中已到期。

        既然DelayQueue中存储的成员对象都是有序的,那么实现了Delayed接口的类必须能够排序,并且需实现上述getDelay()方法,判断队内成员是否到期。

        接下来,我们来研究下WakeupFlushThread和FlushRegionEntry。

        WakeupFlushThread代码如下:

static class WakeupFlushThread implements FlushQueueEntry {
    @Override
    public long getDelay(TimeUnit unit) {
      return 0;
    }
 
    @Override
    public int compareTo(Delayed o) {
      return -1;
    }
 
    @Override
    public boolean equals(Object obj) {
      return (this == obj);
    }
}

        此方法是作为一个占位符或令牌插入到刷新队列flushQueue,以确保FlushHandler不会休眠。而且,其getDelay()方法返回值为0,说明其不存在延迟时间,入列后即可出列。而它的compareTo()方法返回的值是-1,说明它与其它WakeupFlushThread在队内的顺序是等价的。

        FlushRegionEntry定义如下:

  static class FlushRegionEntry implements FlushQueueEntry {
	private final HRegion region;
    private final long createTime;
    private long whenToExpire;
    private int requeueCount = 0;
 
    FlushRegionEntry(final HRegion r) {
      this.region = r;
      this.createTime = EnvironmentEdgeManager.currentTime();
      this.whenToExpire = this.createTime;
    }
 
    
    public boolean isMaximumWait(final long maximumWait) {
      return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait;
    }
 
    
    public int getRequeueCount() {
      return this.requeueCount;
    }
 
    
    public FlushRegionEntry requeue(final long when) {
      this.whenToExpire = EnvironmentEdgeManager.currentTime() + when;
      this.requeueCount++;
      return this;
    }
    @Override
    public long getDelay(TimeUnit unit) {
      return unit.convert(this.whenToExpire - EnvironmentEdgeManager.currentTime(),
          TimeUnit.MILLISECONDS);
    }
    @Override
    public int compareTo(Delayed other) {
      // Delay is compared first. If there is a tie, compare region's hash code
      int ret = Long.valueOf(getDelay(TimeUnit.MILLISECONDS) -
        other.getDelay(TimeUnit.MILLISECONDS)).intValue();
      if (ret != 0) {
        return ret;
      }
      FlushQueueEntry otherEntry = (FlushQueueEntry) other;
      return hashCode() - otherEntry.hashCode();
    }
 
    @Override
    public String toString() {
      return "[flush region " + Bytes.toStringBinary(region.getRegionName()) + "]";
    }
 
    @Override
    public int hashCode() {
      int hash = (int) getDelay(TimeUnit.MILLISECONDS);
      return hash ^ region.hashCode();
    }
 
   @Override
    public boolean equals(Object obj) {
      if (this == obj) {
        return true;
      }
      if (obj == null || getClass() != obj.getClass()) {
        return false;
      }
      Delayed other = (Delayed) obj;
      return compareTo(other) == 0;
    }
  }
}

        FlushRegionEntry类的对象在初始化时,createTime设为当前时间,whenToExpire也为设置当前时间,判断是否到期的getDelay()方法为whenToExpire减去createTime(也就是说首次入列即可出列)。另外,它的compareTo()方法,也是根据getDelay()方法来决定顺序,whenToExpire时间一致的话,根据hashCode()来排序。另外,此类还提供了类似重新入列的方法,重新入列次数requeueCount加1,whenToExpire设置为当前时间加上参数when。

        最后,我们看一下flush请求的实际处理流程:

@Override
public void run() {
      while (!server.isStopped()) {
        FlushQueueEntry fqe = null;
        
        try {
          
          wakeupPending.set(false);

          fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
          
          if (fqe == null || fqe instanceof WakeupFlushThread) {
          
            if (isAboveLowWaterMark()) {

              LOG.debug("Flush thread woke up because memory above low water="
                  + StringUtils.humanReadableInt(globalMemStoreLimitLowMark));
              
              if (!flushoneForGlobalPressure()) {
                Thread.sleep(1000);
                wakeUpIfBlocking();
              }
              wakeupFlushThread();
            }
            continue;
          }
          
          FlushRegionEntry fre = (FlushRegionEntry) fqe;
          
          if (!flushRegion(fre)) {
            break;
          }
        } catch (InterruptedException ex) {
          continue;
        } catch (ConcurrentModificationException ex) {
          continue;
        } catch (Exception ex) {
          LOG.error("Cache flusher failed for entry " + fqe, ex);
          if (!server.checkFileSystem()) {
            break;
          }
        }
      }

      synchronized (regionsInQueue) {
        regionsInQueue.clear();
        flushQueue.clear();
      }
 
      wakeUpIfBlocking();
      
      LOG.info(getName() + " exiting");
}

        处理流程如下;

        1、HRegionServer未停止时,此方法一直运行

        2、将wakeupPending设置为false

        3、从flushQueue队列中拉取一个FlushQueueEntry。,如果其为空,或者为WakeupFlushThread,则通过isAboveLowWaterMark()方法判断全局MemStore大小,如果高于限制值的低水平线就调用flushoneForGlobalPressure()方法,按照一定策略,flush一个HRegion的MemStore,降低MemStore大小,并入列另一个令牌,以使该线程之后再次被唤醒;如果不为空且不为WakeupFlushThread的话,转化为FlushRegionEntry类型:调用flushRegion()方法,结果为false的话,跳出循环

        4、循环结束清空regionsInQueue和flushQueue

        5、唤醒所有的等待者,让它们能够看到close标志

        6、记录日志

        综上,WakeupFlushThread的主要是作为一个占位符或令牌插入到刷新队列flushQueue,以确保FlushHandler不会休眠,实际上WakeupFlushThread还有其他的作用,在FlushHandler线程不断的poll刷新队列flushQueue中的元素时,如果获取到的是一个WakeupFlushThread,它会发起 一个检测,即RegionServer的全局MemStore大小是否超过低水平线,如果未超过,WakeupFlushThread仅仅起到了一个占位符的作用,否则,WakeupFlushThread按照一定策略选择该RegionServer上的一个Region刷新MemStore,以缓解RegionServer内存压力。

        如有错误,欢迎指正。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/354774.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号