2021SC@SDUSC
目录
一、简介
二、缓存级别
三、LruBlockCache实现分析
四、淘汰缓存实现分析
一、简介
如果每次读数据时都访问hfile的话,效率是非常低的,尤其是随机小数据量读时。为了提高IO的性能,Hbase提供了缓存机制BlockCache,LruBlockCache是它的方案之一。
二、缓存级别
有三个缓存级别,在blockPriority中定义:
public enum BlockPriority {
SINGLE,
MULTI,
MEMORY
}
- SINGLE:用于scan等,避免大量的一次访问导致缓存替换
- MULTI:多次缓存
- MEMORY:常驻缓存的
三、LruBlockCache实现分析
cacheBlock()为该缓存的实现:
// BlockCache implementation
@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
final boolean cacheDataInL1) {
if (buf.heapSize() > maxBlockSize) {
// If there are a lot of blocks that are too
// big this can make the logs way too noisy.
// So we log 2%
if (stats.failInsert() % 50 == 0) {
LOG.warn("Trying to cache too large a block "
+ cacheKey.getHfileName() + " @ "
+ cacheKey.getOffset()
+ " is " + buf.heapSize()
+ " which is larger than " + maxBlockSize);
}
return;
}
LruCachedBlock cb = map.get(cacheKey);
if (cb != null) {
// compare the contents, if they are not equal, we are in big trouble
if (compare(buf, cb.getBuffer()) != 0) {
throw new RuntimeException("Cached block contents differ, which should not have happened."
+ "cacheKey:" + cacheKey);
}
String msg = "Cached an already cached block: " + cacheKey + " cb:" + cb.getCacheKey();
msg += ". This is harmless and can happen in rare cases (see Hbase-8547)";
LOG.warn(msg);
return;
}
long currentSize = size.get();
long currentAcceptableSize = acceptableSize();
long hardLimitSize = (long) (hardCapacityLimitFactor * currentAcceptableSize);
if (currentSize >= hardLimitSize) {
stats.failInsert();
if (LOG.isTraceEnabled()) {
LOG.trace("LruBlockCache current size " + StringUtils.byteDesc(currentSize)
+ " has exceeded acceptable size " + StringUtils.byteDesc(currentAcceptableSize) + " too many."
+ " the hard limit size is " + StringUtils.byteDesc(hardLimitSize) + ", failed to put cacheKey:"
+ cacheKey + " into LruBlockCache.");
}
if (!evictionInProgress) {
runEviction();
}
return;
}
cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
long newSize = updateSizeMetrics(cb, false);
map.put(cacheKey, cb);
long val = elements.incrementAndGet();
if (LOG.isTraceEnabled()) {
long size = map.size();
assertCounterSanity(size, val);
}
if (newSize > currentAcceptableSize && !evictionInProgress) {
runEviction();
}
}
方法逻辑如下:
- 判断需要缓存的数据的大小是否比最大块大,如果大于则按照2%的频率记录log然后返回
- 尝试用缓存map中根据cacheKey获取以缓存数据块
- 如果获取成功:内容不一致,抛出异常,并记录和返回日志
- 获取当前缓存大小和可以接受的缓存大小,计算硬性限制大小hardLimitSize
- 如果当前的缓存大小超过了硬性限制的大小,当回收没执行时,执行回收并return,否则直接return
- 构造LruBlockCache实例cb
- 将cb放入map缓存中
- 元素个数原子性增一
- 如果新的缓存大小超过了可接受大小,且未执行回收过程时,执行内存回收
四、淘汰缓存实现分析
淘汰缓存的实现方式有两种:
- 在主线程中执行缓存淘汰
- 在一个专门的淘汰线程中通过持有对外部类LruBlockCache的弱引用WeakReference来执行缓存淘汰
用那种方式是通过构造函数中的evictionThread决定的:
if(evictionThread) {
this.evictionThread = new EvictionThread(this);
this.evictionThread.start(); // FindBugs SC_START_IN_CTOR
} else {
this.evictionThread = null;
}
在执行淘汰缓存的runEviction()方法中:
private void runEviction() {
if(evictionThread == null) {
evict();
} else {
evictionThread.evict();
}
}
其中evict()的实现如下:
public void evict() {
synchronized(this) {
this.notifyAll();
}
}
即通过synchronized获取此线程的对象锁,然后主线程通过回收线程对象的notifyAll来唤醒此线程。
未完……



