2021SC@SDUSC
目录
一、前言
二、BuckerCache
三、CombinedBlockCache
一、前言
前面两篇文章我们说了LruBlockCache和外部缓存,这篇我们来分析一下BuckerCache的实现以及结合了LruBlockCache和BuckerCache的CombinedBlockCache。
二、BuckerCache
首先看一下BuckerCache的一些属性:
final IOEngine ioEngine;//最终Block缓存的地方 final ConcurrentMapramCache;// 临时存储block,后续会写入到缓存 ConcurrentMap backingMap;// 记录block的元数据信息 // 多个writerThread来负责将block写入cache中,每个writerThread有一个BlockingQueue. final ArrayList > writerQueues = new ArrayList >(); @VisibleForTesting final WriterThread[] writerThreads;
实例化bucketcache时,writerThread回启动对应个数的后台线程和一个统计线程,每五分钟在日志中打印cache的统计信息。
接下来时cacheBlock的过程:
public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
boolean wait) {
if (!cacheEnabled) {
return;
}
if (backingMap.containsKey(cacheKey)) {
return;
}
RAMQueueEntry re =
new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory);
if (ramCache.putIfAbsent(cacheKey, re) != null) {
return;
}
int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size();
BlockingQueue bq = writerQueues.get(queueNum);
boolean successfulAddition = false;
if (wait) {
try {
successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} else {
successfulAddition = bq.offer(re);
}
if (!successfulAddition) {
ramCache.remove(cacheKey);
failedBlockAdditions.incrementAndGet();
} else {
this.blockNumber.incrementAndGet();
this.heapSize.addAndGet(cachedItem.heapSize());
blocksByHFile.put(cacheKey.getHfileName(), cacheKey);
}
}
处理逻辑如下:
- 判断backingMap中是否包含传入的cacheKey,是则直接返回。
- 使用传入的参数初始化一个RAMQueueEntry对象,并将其放入ramCache中,如果ramCache已经存在了则返回。
- 根据cacheKey的哈希值对writeQueues大小取余来选择writeQueue,然后将刚才生成的RAMQueueEntry放入这个队列中。
这个方法知识将需要缓存的块放入writeQueue中,然耨通过writeThtred来将这些块写入缓存。接下来是writeThread写的过程:
public void run() {
List entries = new ArrayList();
try {
while (cacheEnabled && writerEnabled) {
try {
try {
// Blocks
entries = getRAMQueueEntries(inputQueue, entries);
} catch (InterruptedException ie) {
if (!cacheEnabled) break;
}
doDrain(entries);
} catch (Exception ioe) {
LOG.error("WriterThread encountered error", ioe);
}
}
} catch (Throwable t) {
LOG.warn("Failed doing drain", t);
}
LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled);
}
处理逻辑如下:
- 从队列中取出所有准备写入的RAMQueueEntry。
- 调用doDrain方法,将取出的entries写入缓存,并在backingMap中记录对应的位置信息,然后从ranCache中清除对应的cacheKey数据。
block写入缓存具体是通过RAMQueueEntry的writeToCache来实现的:
public BucketEntry writeToCache(final IOEngine ioEngine,
final BucketAllocator bucketAllocator,
final UniqueIndexMap deserialiserMap,
final AtomicLong realCacheSize) throws CacheFullException, IOException,
BucketAllocatorException {
int len = data.getSerializedLength();
// This cacheable thing can't be serialized...
if (len == 0) return null;
long offset = bucketAllocator.allocateBlock(len);
BucketEntry bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory);
bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);
try {
if (data instanceof HFileBlock) {
HFileBlock block = (HFileBlock) data;
ByteBuffer sliceBuf = block.getBufferReadonlyWithHeader();
sliceBuf.rewind();
assert len == sliceBuf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE ||
len == sliceBuf.limit() + block.headerSize() + HFileBlock.EXTRA_SERIALIZATION_SPACE;
ByteBuffer extraInfoBuffer = ByteBuffer.allocate(HFileBlock.EXTRA_SERIALIZATION_SPACE);
block.serializeExtraInfo(extraInfoBuffer);
ioEngine.write(sliceBuf, offset);
ioEngine.write(extraInfoBuffer, offset + len - HFileBlock.EXTRA_SERIALIZATION_SPACE);
} else {
ByteBuffer bb = ByteBuffer.allocate(len);
data.serialize(bb);
ioEngine.write(bb, offset);
}
} catch (IOException ioe) {
// free it in bucket allocator
bucketAllocator.freeBlock(offset);
throw ioe;
}
realCacheSize.addAndGet(len);
return bucketEntry;
}
}
BucketAllocator负责缓存的管理,在初始化时会将缓存空间分配为多个固定大小的bucket,每个bucket大小为配置的最大bucketSize的四倍,每个bucket会维护一个缓存空间的offset信息。另外通过bucketSizeInfos维护:
final class BucketSizeInfo {
// Free bucket means it has space to allocate a block;
// Completely free bucket means it has no block.
private linkedMap bucketList, freeBuckets, completelyFreeBuckets;
private int sizeIndex;
}
接下来看看block申请的过程:
public synchronized long allocateBlock(int blockSize) throws CacheFullException,
BucketAllocatorException {
assert blockSize > 0;
BucketSizeInfo bsi = roundUpToBucketSizeInfo(blockSize);
if (bsi == null) {
throw new BucketAllocatorException("Allocation too big size=" + blockSize +
"; adjust BucketCache sizes " + CacheConfig.BUCKET_CACHE_BUCKETS_KEY +
" to accomodate if size seems reasonable and you want it cached.");
}
long offset = bsi.allocateBlock();
// Ask caller to free up space and try again!
if (offset < 0)
throw new CacheFullException(blockSize, bsi.sizeIndex());
usedSize += bucketSizes[bsi.sizeIndex()];
return offset;
}
// BucketSizeInfo
public long allocateBlock() {
Bucket b = null;
if (freeBuckets.size() > 0) {
// Use up an existing one first...
b = (Bucket) freeBuckets.lastKey();
}
if (b == null) {
b = grabGlobalCompletelyFreeBucket();
if (b != null) instantiateBucket(b);
}
if (b == null) return -1;
long result = b.allocate();
blockAllocated(b);
return result;
}
首先根据申请的大小选择合适的bucketsize,然后对应bucketSize的BucketSizeInfo来分配块:
- 从freeBuckets中取处bucket,未获取到则从所有的bucketSizeInfos中获取空闲的bucket
- bucket进行分配,返回分配的offset地址
三、CombinedBlockCache
接下来看一下LruBlockCache和BucketCache是如何组合起来处理block的:
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
final boolean cacheDataInL1) {
boolean ismetaBlock = buf.getBlockType().getCategory() != BlockCategory.DATA;
if (ismetaBlock || cacheDataInL1) {
lruCache.cacheBlock(cacheKey, buf, inMemory, cacheDataInL1);
} else {
l2Cache.cacheBlock(cacheKey, buf, inMemory, false);
}
}
- 在cacheBlock的时候会将metaBlock或者cacheDataInl1放入LruBlockCache,其他情况放入bucketcache
- 初始化blockcache时会把LruBlockCache的victimHandler设为BucketCache,这样的话BucketCache就可以作为LruBlockCache的二级缓存使用。
以上,如有错误欢迎指正。



