栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

2021SC@SDUSC Hbase(十二)项目代码分析-BucketCache

2021SC@SDUSC Hbase(十二)项目代码分析-BucketCache

2021SC@SDUSC

目录

一、前言

二、BuckerCache

三、CombinedBlockCache


一、前言

        前面两篇文章我们说了LruBlockCache和外部缓存,这篇我们来分析一下BuckerCache的实现以及结合了LruBlockCache和BuckerCache的CombinedBlockCache。

二、BuckerCache

        首先看一下BuckerCache的一些属性:

final IOEngine ioEngine;//最终Block缓存的地方

final ConcurrentMap ramCache;// 临时存储block,后续会写入到缓存

ConcurrentMap backingMap;// 记录block的元数据信息

// 多个writerThread来负责将block写入cache中,每个writerThread有一个BlockingQueue.
final ArrayList> writerQueues =
      new ArrayList>();
@VisibleForTesting
final WriterThread[] writerThreads;

        实例化bucketcache时,writerThread回启动对应个数的后台线程和一个统计线程,每五分钟在日志中打印cache的统计信息。

        接下来时cacheBlock的过程:

        

public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
      boolean wait) {
    if (!cacheEnabled) {
      return;
    }

    if (backingMap.containsKey(cacheKey)) {
      return;
    }

    
    RAMQueueEntry re =
        new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory);
    if (ramCache.putIfAbsent(cacheKey, re) != null) {
      return;
    }
    int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size();
    BlockingQueue bq = writerQueues.get(queueNum);
    boolean successfulAddition = false;
    if (wait) {
      try {
        successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }
    } else {
      successfulAddition = bq.offer(re);
    }
    if (!successfulAddition) {
      ramCache.remove(cacheKey);
      failedBlockAdditions.incrementAndGet();
    } else {
      this.blockNumber.incrementAndGet();
      this.heapSize.addAndGet(cachedItem.heapSize());
      blocksByHFile.put(cacheKey.getHfileName(), cacheKey);
    }
  }

  处理逻辑如下:

  1. 判断backingMap中是否包含传入的cacheKey,是则直接返回。
  2. 使用传入的参数初始化一个RAMQueueEntry对象,并将其放入ramCache中,如果ramCache已经存在了则返回。
  3. 根据cacheKey的哈希值对writeQueues大小取余来选择writeQueue,然后将刚才生成的RAMQueueEntry放入这个队列中。

这个方法知识将需要缓存的块放入writeQueue中,然耨通过writeThtred来将这些块写入缓存。接下来是writeThread写的过程:

       

public void run() {
      List entries = new ArrayList();
      try {
        while (cacheEnabled && writerEnabled) {
          try {
            try {
              // Blocks
              entries = getRAMQueueEntries(inputQueue, entries);
            } catch (InterruptedException ie) {
              if (!cacheEnabled) break;
            }
            doDrain(entries);
          } catch (Exception ioe) {
            LOG.error("WriterThread encountered error", ioe);
          }
        }
      } catch (Throwable t) {
        LOG.warn("Failed doing drain", t);
      }
      LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled);
    }

处理逻辑如下:

  1. 从队列中取出所有准备写入的RAMQueueEntry。
  2. 调用doDrain方法,将取出的entries写入缓存,并在backingMap中记录对应的位置信息,然后从ranCache中清除对应的cacheKey数据。

block写入缓存具体是通过RAMQueueEntry的writeToCache来实现的:

public BucketEntry writeToCache(final IOEngine ioEngine,
        final BucketAllocator bucketAllocator,
        final UniqueIndexMap deserialiserMap,
        final AtomicLong realCacheSize) throws CacheFullException, IOException,
        BucketAllocatorException {
      int len = data.getSerializedLength();
      // This cacheable thing can't be serialized...
      if (len == 0) return null;
      long offset = bucketAllocator.allocateBlock(len);
      BucketEntry bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory);
      bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);
      try {
        if (data instanceof HFileBlock) {
          HFileBlock block = (HFileBlock) data;
          ByteBuffer sliceBuf = block.getBufferReadonlyWithHeader();
          sliceBuf.rewind();
          assert len == sliceBuf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE ||
            len == sliceBuf.limit() + block.headerSize() + HFileBlock.EXTRA_SERIALIZATION_SPACE;
          ByteBuffer extraInfoBuffer = ByteBuffer.allocate(HFileBlock.EXTRA_SERIALIZATION_SPACE);
          block.serializeExtraInfo(extraInfoBuffer);
          ioEngine.write(sliceBuf, offset);
          ioEngine.write(extraInfoBuffer, offset + len - HFileBlock.EXTRA_SERIALIZATION_SPACE);
        } else {
          ByteBuffer bb = ByteBuffer.allocate(len);
          data.serialize(bb);
          ioEngine.write(bb, offset);
        }
      } catch (IOException ioe) {
        // free it in bucket allocator
        bucketAllocator.freeBlock(offset);
        throw ioe;
      }

      realCacheSize.addAndGet(len);
      return bucketEntry;
    }
  }

BucketAllocator负责缓存的管理,在初始化时会将缓存空间分配为多个固定大小的bucket,每个bucket大小为配置的最大bucketSize的四倍,每个bucket会维护一个缓存空间的offset信息。另外通过bucketSizeInfos维护:

final class BucketSizeInfo {
    // Free bucket means it has space to allocate a block;
    // Completely free bucket means it has no block.
    private linkedMap bucketList, freeBuckets, completelyFreeBuckets;
    private int sizeIndex;
    }

接下来看看block申请的过程:

public synchronized long allocateBlock(int blockSize) throws CacheFullException,
      BucketAllocatorException {
    assert blockSize > 0;
    BucketSizeInfo bsi = roundUpToBucketSizeInfo(blockSize);
    if (bsi == null) {
      throw new BucketAllocatorException("Allocation too big size=" + blockSize +
        "; adjust BucketCache sizes " + CacheConfig.BUCKET_CACHE_BUCKETS_KEY +
        " to accomodate if size seems reasonable and you want it cached.");
    }
    long offset = bsi.allocateBlock();

    // Ask caller to free up space and try again!
    if (offset < 0)
      throw new CacheFullException(blockSize, bsi.sizeIndex());
    usedSize += bucketSizes[bsi.sizeIndex()];
    return offset;
  }

  // BucketSizeInfo
  public long allocateBlock() {
      Bucket b = null;
      if (freeBuckets.size() > 0) {
        // Use up an existing one first...
        b = (Bucket) freeBuckets.lastKey();
      }
      if (b == null) {
        b = grabGlobalCompletelyFreeBucket();
        if (b != null) instantiateBucket(b);
      }
      if (b == null) return -1;
      long result = b.allocate();
      blockAllocated(b);
      return result;
    }

首先根据申请的大小选择合适的bucketsize,然后对应bucketSize的BucketSizeInfo来分配块:

  1. 从freeBuckets中取处bucket,未获取到则从所有的bucketSizeInfos中获取空闲的bucket
  2. bucket进行分配,返回分配的offset地址

三、CombinedBlockCache

接下来看一下LruBlockCache和BucketCache是如何组合起来处理block的:

public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
      final boolean cacheDataInL1) {
    boolean ismetaBlock = buf.getBlockType().getCategory() != BlockCategory.DATA;
    if (ismetaBlock || cacheDataInL1) {
      lruCache.cacheBlock(cacheKey, buf, inMemory, cacheDataInL1);
    } else {
      l2Cache.cacheBlock(cacheKey, buf, inMemory, false);
    }
  }
  1. 在cacheBlock的时候会将metaBlock或者cacheDataInl1放入LruBlockCache,其他情况放入bucketcache
  2. 初始化blockcache时会把LruBlockCache的victimHandler设为BucketCache,这样的话BucketCache就可以作为LruBlockCache的二级缓存使用。

以上,如有错误欢迎指正。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/671186.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号