2021SC@SDUSC
目录
一、简介
二、淘汰缓存实现分析
三、LruBlockCache获取缓存数据
一、简介
如果每次读数据时都访问hfile的话,效率是非常低的,尤其是随机小数据量读时。为了提高IO的性能,Hbase提供了缓存机制BlockCache,LruBlockCache是它的方案之一。
本文我们接着上篇文章来分析。
二、淘汰缓存实现分析
上文我们输掉evict方法中,主线程通过回收线程对象的notifyAll来唤醒EvictionThread线程,那么它十什么时候wait的呢?来看它的run方法:
@Override
public void run() {
enteringRun = true;
while (this.go) {
synchronized(this) {
try {
this.wait(1000 * 10);
} catch(InterruptedException e) {
LOG.warn("Interrupted eviction thread ", e);
Thread.currentThread().interrupt();
}
}
LruBlockCache cache = this.cache.get();
if (cache == null) break;
cache.evict();
}
}
线程会wait10s,放弃对象锁,notifyAll()后,继续执行后面的淘汰流程:
void evict() {
// Ensure only one eviction at a time
if(!evictionLock.tryLock()) return;
try {
evictionInProgress = true;
long currentSize = this.size.get();
long bytesToFree = currentSize - minSize();
if (LOG.isTraceEnabled()) {
LOG.trace("Block cache LRU eviction started; Attempting to free " +
StringUtils.byteDesc(bytesToFree) + " of total=" +
StringUtils.byteDesc(currentSize));
}
if(bytesToFree <= 0) return;
// Instantiate priority buckets
BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize,
singleSize());
BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize,
multiSize());
BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize,
memorySize());
// Scan entire map putting into appropriate buckets
for(LruCachedBlock cachedBlock : map.values()) {
switch(cachedBlock.getPriority()) {
case SINGLE: {
bucketSingle.add(cachedBlock);
break;
}
case MULTI: {
bucketMulti.add(cachedBlock);
break;
}
case MEMORY: {
bucketMemory.add(cachedBlock);
break;
}
}
}
long bytesFreed = 0;
if (forceInMemory || memoryFactor > 0.999f) {
long s = bucketSingle.totalSize();
long m = bucketMulti.totalSize();
if (bytesToFree > (s + m)) {
// this means we need to evict blocks in memory bucket to make room,
// so the single and multi buckets will be emptied
bytesFreed = bucketSingle.free(s);
bytesFreed += bucketMulti.free(m);
if (LOG.isTraceEnabled()) {
LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
" from single and multi buckets");
}
bytesFreed += bucketMemory.free(bytesToFree - bytesFreed);
if (LOG.isTraceEnabled()) {
LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
" total from all three buckets ");
}
} else {
// this means no need to evict block in memory bucket,
// and we try best to make the ratio between single-bucket and
// multi-bucket is 1:2
long bytesRemain = s + m - bytesToFree;
if (3 * s <= bytesRemain) {
// single-bucket is small enough that no eviction happens for it
// hence all eviction goes from multi-bucket
bytesFreed = bucketMulti.free(bytesToFree);
} else if (3 * m <= 2 * bytesRemain) {
// multi-bucket is small enough that no eviction happens for it
// hence all eviction goes from single-bucket
bytesFreed = bucketSingle.free(bytesToFree);
} else {
// both buckets need to evict some blocks
bytesFreed = bucketSingle.free(s - bytesRemain / 3);
if (bytesFreed < bytesToFree) {
bytesFreed += bucketMulti.free(bytesToFree - bytesFreed);
}
}
}
} else {
PriorityQueue bucketQueue =
new PriorityQueue(3);
bucketQueue.add(bucketSingle);
bucketQueue.add(bucketMulti);
bucketQueue.add(bucketMemory);
int remainingBuckets = 3;
BlockBucket bucket;
while((bucket = bucketQueue.poll()) != null) {
long overflow = bucket.overflow();
if(overflow > 0) {
long bucketBytesToFree = Math.min(overflow,
(bytesToFree - bytesFreed) / remainingBuckets);
bytesFreed += bucket.free(bucketBytesToFree);
}
remainingBuckets--;
}
}
if (LOG.isTraceEnabled()) {
long single = bucketSingle.totalSize();
long multi = bucketMulti.totalSize();
long memory = bucketMemory.totalSize();
LOG.trace("Block cache LRU eviction completed; " +
"freed=" + StringUtils.byteDesc(bytesFreed) + ", " +
"total=" + StringUtils.byteDesc(this.size.get()) + ", " +
"single=" + StringUtils.byteDesc(single) + ", " +
"multi=" + StringUtils.byteDesc(multi) + ", " +
"memory=" + StringUtils.byteDesc(memory));
}
} finally {
stats.evict();
evictionInProgress = false;
evictionLock.unlock();
}
}
处理逻辑如下:
- 通过互斥锁确保只有一个回收在执行
- 设置标志位evictionInProgress为true,表示正在进行回收过程
- 获取currentSize
- 计算应释放的缓冲大小bytesToFree,如果bytesToFree小于0,直接返回
- 实例化优先级队列:single、multi、memory
- 扫描缓存,分别加入三个优先级队列中
- 如果forceInMemory或者InMemory缓存超过99.9%
- 如果bytesToFree超过single和multi的大小和则全部回收二者的缓存,剩余的从InMemory中回收
- 否则不需要从InMemory中回收,按照一定的策略回收single、multi中的缓存,尝试让single-bucket和multi-bucket的比例为1:2
- 否则从三个优先级队列中循环回收
- 重置标志位
- 释放锁
三、LruBlockCache获取缓存数据
获取缓存数据是在getBlock中实现的:
@Override
public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
boolean updateCacheMetrics) {
LruCachedBlock cb = map.get(cacheKey);
if (cb == null) {
if (!repeat && updateCacheMetrics) stats.miss(caching, cacheKey.isPrimary());
// If there is another block cache then try and read there.
// However if this is a retry ( second time in double checked locking )
// And it's already a miss then the l2 will also be a miss.
if (victimHandler != null && !repeat) {
Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
// Promote this to L1.
if (result != null && caching) {
cacheBlock(cacheKey, result, false, true);
}
return result;
}
return null;
}
if (updateCacheMetrics) stats.hit(caching, cacheKey.isPrimary());
cb.access(count.incrementAndGet());
return cb.getBuffer();
}
实现逻辑如下:
- 尝试从LruBlockCache的map中直接获取
- 如果map中没有、victimHandler存在且!repeat,通过victimHandler的getBlock方法获取并缓存到LruBlockCache中。
- 如果前面两步能获取到数据,更新统计数据,且通过缓存块的access方法,更新访问时间accesstime,将可能的BlockPriority.SINGLE升级为BlockPriority.MULTI
- return
以上。



