栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

2021SC@SDUSC Hbase(十)项目代码分析- LruBlockCache

2021SC@SDUSC Hbase(十)项目代码分析- LruBlockCache

2021SC@SDUSC

目录

一、简介

二、淘汰缓存实现分析

三、LruBlockCache获取缓存数据


一、简介

        如果每次读数据时都访问hfile的话,效率是非常低的,尤其是随机小数据量读时。为了提高IO的性能,Hbase提供了缓存机制BlockCache,LruBlockCache是它的方案之一。

        本文我们接着上篇文章来分析。

二、淘汰缓存实现分析

        上文我们输掉evict方法中,主线程通过回收线程对象的notifyAll来唤醒EvictionThread线程,那么它十什么时候wait的呢?来看它的run方法:

    @Override
    public void run() {
      enteringRun = true;
      while (this.go) {
        synchronized(this) {
          try {
            this.wait(1000 * 10);
          } catch(InterruptedException e) {
            LOG.warn("Interrupted eviction thread ", e);
            Thread.currentThread().interrupt();
          }
        }
        LruBlockCache cache = this.cache.get();
        if (cache == null) break;
        cache.evict();
      }
    }

        线程会wait10s,放弃对象锁,notifyAll()后,继续执行后面的淘汰流程:

  
  void evict() {
 
    // Ensure only one eviction at a time
    if(!evictionLock.tryLock()) return;
 
    try {
    	
      evictionInProgress = true;
      
      long currentSize = this.size.get();
      long bytesToFree = currentSize - minSize();
 
      if (LOG.isTraceEnabled()) {
        LOG.trace("Block cache LRU eviction started; Attempting to free " +
          StringUtils.byteDesc(bytesToFree) + " of total=" +
          StringUtils.byteDesc(currentSize));
      }
 
      if(bytesToFree <= 0) return;
 
      // Instantiate priority buckets
      BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize,
          singleSize());
      BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize,
          multiSize());
      BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize,
          memorySize());
 
      // Scan entire map putting into appropriate buckets
      for(LruCachedBlock cachedBlock : map.values()) {
        switch(cachedBlock.getPriority()) {
          case SINGLE: {
            bucketSingle.add(cachedBlock);
            break;
          }
          case MULTI: {
            bucketMulti.add(cachedBlock);
            break;
          }
          case MEMORY: {
            bucketMemory.add(cachedBlock);
            break;
          }
        }
      }
 
      long bytesFreed = 0;
      if (forceInMemory || memoryFactor > 0.999f) {
        long s = bucketSingle.totalSize();
        long m = bucketMulti.totalSize();
        if (bytesToFree > (s + m)) {
          // this means we need to evict blocks in memory bucket to make room,
          // so the single and multi buckets will be emptied
          bytesFreed = bucketSingle.free(s);
          bytesFreed += bucketMulti.free(m);
          if (LOG.isTraceEnabled()) {
            LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
              " from single and multi buckets");
          }
          bytesFreed += bucketMemory.free(bytesToFree - bytesFreed);
          if (LOG.isTraceEnabled()) {
            LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
              " total from all three buckets ");
          }
        } else {
          // this means no need to evict block in memory bucket,
          // and we try best to make the ratio between single-bucket and
          // multi-bucket is 1:2
          long bytesRemain = s + m - bytesToFree;
          if (3 * s <= bytesRemain) {
            // single-bucket is small enough that no eviction happens for it
            // hence all eviction goes from multi-bucket
            bytesFreed = bucketMulti.free(bytesToFree);
          } else if (3 * m <= 2 * bytesRemain) {
            // multi-bucket is small enough that no eviction happens for it
            // hence all eviction goes from single-bucket
            bytesFreed = bucketSingle.free(bytesToFree);
          } else {
            // both buckets need to evict some blocks
            bytesFreed = bucketSingle.free(s - bytesRemain / 3);
            if (bytesFreed < bytesToFree) {
              bytesFreed += bucketMulti.free(bytesToFree - bytesFreed);
            }
          }
        }
      } else {
        PriorityQueue bucketQueue =
          new PriorityQueue(3);
 
        bucketQueue.add(bucketSingle);
        bucketQueue.add(bucketMulti);
        bucketQueue.add(bucketMemory);
 
        int remainingBuckets = 3;
 
        BlockBucket bucket;
        while((bucket = bucketQueue.poll()) != null) {
          long overflow = bucket.overflow();
          if(overflow > 0) {
            long bucketBytesToFree = Math.min(overflow,
                (bytesToFree - bytesFreed) / remainingBuckets);
            bytesFreed += bucket.free(bucketBytesToFree);
          }
          remainingBuckets--;
        }
      }
 
      if (LOG.isTraceEnabled()) {
        long single = bucketSingle.totalSize();
        long multi = bucketMulti.totalSize();
        long memory = bucketMemory.totalSize();
        LOG.trace("Block cache LRU eviction completed; " +
          "freed=" + StringUtils.byteDesc(bytesFreed) + ", " +
          "total=" + StringUtils.byteDesc(this.size.get()) + ", " +
          "single=" + StringUtils.byteDesc(single) + ", " +
          "multi=" + StringUtils.byteDesc(multi) + ", " +
          "memory=" + StringUtils.byteDesc(memory));
      }
    } finally {
      stats.evict();
      evictionInProgress = false;
      evictionLock.unlock();
    }
  }

        处理逻辑如下:

  1. 通过互斥锁确保只有一个回收在执行
  2. 设置标志位evictionInProgress为true,表示正在进行回收过程
  3. 获取currentSize
  4. 计算应释放的缓冲大小bytesToFree,如果bytesToFree小于0,直接返回
  5. 实例化优先级队列:single、multi、memory
  6. 扫描缓存,分别加入三个优先级队列中
  7. 如果forceInMemory或者InMemory缓存超过99.9%
    1. 如果bytesToFree超过single和multi的大小和则全部回收二者的缓存,剩余的从InMemory中回收
    2. 否则不需要从InMemory中回收,按照一定的策略回收single、multi中的缓存,尝试让single-bucket和multi-bucket的比例为1:2
  8. 否则从三个优先级队列中循环回收
  9. 重置标志位
  10. 释放锁

三、LruBlockCache获取缓存数据

        获取缓存数据是在getBlock中实现的:

  
  @Override
  public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
      boolean updateCacheMetrics) {
    LruCachedBlock cb = map.get(cacheKey);
    if (cb == null) {
      if (!repeat && updateCacheMetrics) stats.miss(caching, cacheKey.isPrimary());
      // If there is another block cache then try and read there.
      // However if this is a retry ( second time in double checked locking )
      // And it's already a miss then the l2 will also be a miss.
      if (victimHandler != null && !repeat) {
        Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
 
        // Promote this to L1.
        if (result != null && caching) {
          cacheBlock(cacheKey, result,  false,  true);
        }
        return result;
      }
      return null;
    }
    if (updateCacheMetrics) stats.hit(caching, cacheKey.isPrimary());
    cb.access(count.incrementAndGet());
    return cb.getBuffer();
  }

        实现逻辑如下:

  1. 尝试从LruBlockCache的map中直接获取
  2. 如果map中没有、victimHandler存在且!repeat,通过victimHandler的getBlock方法获取并缓存到LruBlockCache中。
  3. 如果前面两步能获取到数据,更新统计数据,且通过缓存块的access方法,更新访问时间accesstime,将可能的BlockPriority.SINGLE升级为BlockPriority.MULTI
  4. return

        以上。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/629011.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号