2021SC@SDUSC 2021SC@SDUSC
2021SC@SDUSC 2021SC@SDUSC
接上次博客 (2)随机写入MemStoreKeyValue写入Rgion分为两步:首先追加到HLog,再写入MemStore。MemStore使用数据结构ConcurrentSkipListMap来实际存储KeyValue.优点是能够非常友好地支持大规模并发写入,同时跳跃表本身是有序存储的,这有利于数据有序落盘,并且有利于提升MemStore中的KeyValue查找性能。
ConcurrentSkipListMap初始化函数:
private void initialize() {
keySet = null;
entrySet = null;
values = null;
descendingMap = null;
head = new HeadIndex(new Node(null, base_HEADER, null),
null, null, 1);
}
MemStore的写入流程可以表述为以下3步。
- 检查当前可用的Chunk是否写满,如果写满,重新申请一个2M的Chunk。
- 将当前KeyValue在内存中重新构建,在可用Chunk的指定offset处申请内存创建一个新的KeyValue对象。
- 将新创建的KeyValue对象写入ConcurrentSkipListMap中。
Hbase中,Region是集群节点上最小的数据服务单元,用户数据表由一个或多个Region组成。在Region中每个ColumnFamily的数据组成一个Store。每个Store由一个Memstore和多个HFile组成。
HFile是由Memstore flush产生的,每一次的flush都会产生一个全新的HFile文件。
1.触发条件Hbase会在以下几种情况下触发MemStore Flush:
-
MemStore级别限制:当Region中任意一个MemStore的大小达到了上限hbase.hregion.memstore.flush.size(默认是134217728,即128MB),会触发MemStore刷写,生成一个HFile。
public static final String HREGION_MEMSTORE_FLUSH_SIZE = "hbase.hregion.memstore.flush.size";hbase.hregion.memstore.flush.size 134217728 -
Region级别限制:当Region中所有MemStore的大小总和达到了上限,会触发MemStore刷新。
-
RegionServer级别限制:当整个RegionServer中所有写入MemStore的数据大小总和超过低水位阈值,RegionServer开始强制执行flush,先flush MemStore最大的Region,再flush次大的,依次执行。
如果此时写入吞吐量依然很高,导致总MemStore的数据大小超过高水位阈值 hbase.regionserver.global.memstore.size,RegionServer会阻塞所有写入请求并强制执行flush,直至总MemStore大小下降到低水位阈值。
一旦出现RegionServer写入出现阻塞,查看日志中是否存在关键字Blocking updates on,如果存在说明当前RegionServer总MemStore内存大小超过了高水位阀值。public static final String MEMSTORE_SIZE_KEY = "hbase.regionserver.global.memstore.size"; public static final String MEMSTORE_SIZE_OLD_KEY = "hbase.regionserver.global.memstore.upperLimit"; public static final String MEMSTORE_SIZE_LOWER_LIMIT_KEY = "hbase.regionserver.global.memstore.size.lower.limit"; public static final String MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY = "hbase.regionserver.global.memstore.lowerLimit"; -
当一个RegionServer中HLog数量达到上限:系统会选取最早的HLog对应的一个或多个Region进行flush。
-
Hbase定期刷新MemStore:时间间隔的配置项是:hbase.regionserver.optionalcacheflushinterval,默认值为3600000,即1个小时。如果设定为0,则意味着关闭定时自动刷写。
hbase.regionserver.optionalcacheflushinterval 3600000 Maximum amount of time an edit lives in memory before being automatically flushed. Default 1 hour. Set it to 0 to disable automatic flushing. -
手动执行flush:Admin接口也提供了方法来手动触发Memstore的刷写、hbase shell来手动触发Memstore的刷写。
- flush(TableName tableName):对单个表进行刷写。
- flushRegion(byte[] regionName):对单个Region进行刷写。
为了减少flush过程对读写的影响,Hbase采用了类似两阶段提交的方式,将整个flush过程分为三个阶段。
-
prepare阶段:遍历当前Region中的所有MemStore,将MemStore中当前数据集ConcurrentSkipListMap做一个快照snapshot,然后再新建一个CellSkipListSet接收新的数据写入。prepare阶段需要添加updateLock对写请求阻塞,结束之后会释放该锁。因为此阶段没有任何费时操作,因此持锁时间很短。
protected KeyValueScanner pollRealKV() throws IOException { KeyValueScanner kvScanner = heap.poll(); if (kvScanner == null) { return null; } while (kvScanner != null && !kvScanner.realSeekDone()) { if (kvScanner.peek() != null) { try { kvScanner.enforceSeek(); } catch (IOException ioe) { // Add the item to delayed close set in case it is leak from close this.scannersForDelayedClose.add(kvScanner); throw ioe; } Cell curKV = kvScanner.peek(); if (curKV != null) { KeyValueScanner nextEarliestScanner = heap.peek(); if (nextEarliestScanner == null) { // The heap is empty. Return the only possible scanner. return kvScanner; } Cell nextKV = nextEarliestScanner.peek(); if (nextKV == null || comparator.compare(curKV, nextKV) < 0) { return kvScanner; } heap.add(kvScanner); } else { this.scannersForDelayedClose.add(kvScanner); } } else { this.scannersForDelayedClose.add(kvScanner); } kvScanner = heap.poll(); } return kvScanner; } -
flush阶段:遍历所有MemStore,将prepare阶段生成的snapshot持久化为临时文件,临时文件会统一放到region目录下的.tmp路径。这个过程因为涉及磁盘IO操作,因此相对比较耗时。
-
commit阶段:遍历所有的MemStore,将flush阶段生成的临时文件移到指定的ColumnFamily目录下,针对HFile生成对应的storefile和Reader,把storefile添加到Store的storefiles列表中,最后再清空prepare阶段生成的snapshot。
HFileContext(boolean useHbaseChecksum, boolean includesMvcc, boolean includesTags, Compression.Algorithm compressAlgo, boolean compressTags, ChecksumType checksumType, int bytesPerChecksum, int blockSize, DataBlockEncoding encoding, Encryption.Context cryptoContext, long fileCreateTime, String hfileName, byte[] columnFamily, byte[] tableName, CellComparator cellComparator) { this.usesHbaseChecksum = useHbaseChecksum; this.includesMvcc = includesMvcc; this.includesTags = includesTags; this.compressAlgo = compressAlgo; this.compressTags = compressTags; this.checksumType = checksumType; this.bytesPerChecksum = bytesPerChecksum; this.blocksize = blockSize; if (encoding != null) { this.encoding = encoding; } this.cryptoContext = cryptoContext; this.fileCreateTime = fileCreateTime; this.hfileName = hfileName; this.columnFamily = columnFamily; this.tableName = tableName; // If no cellComparator specified, make a guess based off tablename. If hbase:meta, then should // be the meta table comparator. Comparators are per table. this.cellComparator = cellComparator != null ? cellComparator : this.tableName != null ? CellComparatorImpl.getCellComparator(this.tableName) : CellComparator.getInstance(); }
Hbase执行Flush操作之后将内存中的数据按照特定格式写成HFile文件。
该部分会再后续博客中分析。。。
至此写入流程大体结束


