2021SC@SDUSC
目录
Get过程源码分析(0.98版本)
HTable.get()
2021SC@SDUSC
Get过程源码分析(0.98版本)接上一篇继续分析
HTable.get()
根据scanner = getScanner(scan)这个语句来查看HRegion.getScanner()方法:先将所有列族添加到scan中,然后检查各个列族的有效性,最后通过instantiateRegionScanner()构造RegionScanner,并返回RegionScanner
void prepareScanner(Scan scan) throws IOException {
if (!scan.hasFamilies()) {
for (byte[] family : this.htableDescriptor.getFamiliesKeys()) {
scan.addFamily(family);
}
}
}
protected RegionScanner getScanner(Scan scan,
List additionalScanners) throws IOException {
startRegionOperation(Operation.SCAN);
try {
prepareScanner(scan);
if(scan.hasFamilies()) {
for(byte [] family : scan.getFamilyMap().keySet()) {
checkFamily(family);
}
}
return instantiateRegionScanner(scan, additionalScanners);
} finally {
closeRegionOperation(Operation.SCAN);
}
}
接下来查看HRegion.instantiateRegionScanner()方法:在这里会真正构造RegionScanner的实现类RegionScannerImpl(暂时先不考虑ReversedRegionScannerImpl)
protected RegionScanner instantiateRegionScanner(Scan scan,
List additionalScanners) throws IOException {
if (scan.isReversed()) {
if (scan.getFilter() != null) {
scan.getFilter().setReversed(true);
}
return new ReversedRegionScannerImpl(scan, additionalScanners, this);
}
return new RegionScannerImpl(scan, additionalScanners, this);
}
再查看正常的RegionScannerImpl的构造函数:在这里会初始化StoreScanner,每一个列族对应一个store,也就是会对应一个storescanner,并把所有列族的storescanner存到一个排序堆里,也就是KeyValueHeap中
for (Map.Entry> entry : scan.getFamilyMap().entrySet()) { Store store = stores.get(entry.getKey()); KeyValueScanner scanner; try { scanner = store.getScanner(scan, entry.getValue(), this.readPt); } catch (FileNotFoundException e) { abortRegionServer(e.getMessage()); throw new NotServingRegionException(region.getRegionNameAsString() + " is closing"); } if (this.filter == null || !scan.doLoadColumnFamiliesonDemand() || this.filter.isFamilyEssential(entry.getKey())) { scanners.add(scanner); } else { joinedScanners.add(scanner); } }
再回到HRegion.get()方法,可以看到执行完scanner = getScanner(scan)这个语句之后,就通过scanner.next(results)来读数据
我们可以通过RegionScannerImpl.next()方法,找到RegionScannerImpl.nextRaw()方法,再据此找到RegionScannerImpl.nextInternal()方法,具体代码不再贴出
查看RegionScannerImpl.nextInternal()方法,代码很长,一步步进行分析,首先从storeHeap里取了个值,第一次取得的值是null,后面会往storeHeap里添加数据
KeyValue current = this.storeHeap.peek();
byte[] currentRow = null;
int offset = 0;
short length = 0;
if (current != null) {
currentRow = current.getBuffer();
offset = current.getRowOffset();
length = current.getRowLength();
}
然后判断当前数据是否是停止行,如果是停止行,就直接返回false:
boolean stopRow = isStopRow(currentRow, offset, length);
if (joinedContinuationRow == null) {
if (stopRow) {
if (filter != null && filter.hasFilterRow()) {
filter.filterRowCells(results);
}
return false;
}
接着调用过滤器判断下一步的行为,如果这条数据无法满足filter的要求,就再判断下是否还有更多数据,没有就返回,有就continue循环:
if (filterRowKey(currentRow, offset, length)) {
boolean moreRows = nextRow(currentRow, offset, length);
if (!moreRows) return false;
results.clear();
continue;
}
此时停止行与过滤器均已通过,可以开始读数据:
KeyValue nextKv = populateResult(results, this.storeHeap, limit, currentRow, offset,
length);
查看populateResult()方法: 遍历数据。populateResult的遍历和外层nextInternal遍历的差别在于populateResult遍历负责找齐一行数据的所有列,而外层的遍历是行级别的遍历,同时负责对一整行数据进行过滤,包括stopRow、Filter
private KeyValue populateResult(List| results, KeyValueHeap heap, int limit, byte[] currentRow, int offset, short length) throws IOException { KeyValue nextKv; try { do { heap.next(results, limit - results.size()); if (limit > 0 && results.size() == limit) { return KV_LIMIT; } nextKv = heap.peek(); } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length)); } catch (FileNotFoundException e) { abortRegionServer(e.getMessage()); throw new NotServingRegionException(region.getRegionNameAsString() + " is closing"); } return nextKv; } |
再据此查看KeyValueHeap.next()方法:在这里调用了current,即StoreScanner来读取下一行数据,StoreScanner.next()会在后面分析
public boolean next(List| result, int limit) throws IOException { if (this.current == null) { return false; } InternalScanner currentAsInternal = (InternalScanner)this.current; boolean mayContainMoreRows = currentAsInternal.next(result, limit); KeyValue pee = this.current.peek(); if (pee == null || !mayContainMoreRows) { this.current.close(); } else { this.heap.add(this.current); } this.current = null; this.current = pollRealKV(); return (this.current != null); } |
回到RegionScannerImpl.nextInternal()方法,此时该行数据已经取完,接下来判断下一行是不是停止行,如果不是停止行,则continue循环
stopRow = nextKv == null ||
isStopRow(nextKv.getBuffer(), nextKv.getRowOffset(), nextKv.getRowLength());
final boolean isEmptyRow = results.isEmpty();
FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
if (filter != null && filter.hasFilterRow()) {
ret = filter.filterRowCellsWithRet(results);
}
if ((isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE) || filterRow()) {
results.clear();
boolean moreRows = nextRow(currentRow, offset, length);
if (!moreRows) return false;
if (!stopRow) continue;
return false;
}
现在 RegionScannerImpl.nextInternal()方法已经分析结束,接下来分析StoreScanner.next()方法
查看StoreScanner.next():(此处仅贴出关键代码)进入循环后,通过ScanQueryMatcher来判断操作结果,分为多种情况,在下面的代码中可以看到具体的分类。如果row和列族都匹配,就进入INCLUDE中;对于一个KeyValueScanner来说,不断读数据直到将所有数据读完,在此过程中将取到的数据放入outResult中,然后让this.heap取下一个值,如果当前KeyValueScanner数据全部读完,那么就从KeyValueHeap中取下一个KeyValueScanner读数据,如此循环,直到读完所有数据。
LOOP: do {
if (prevKV != kv) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
checkScanOrder(prevKV, kv, comparator);
prevKV = kv;
ScanQueryMatcher.MatchCode qcode = matcher.match(kv);
qcode = optimize(qcode, kv);
switch(qcode) {
case INCLUDE:
case INCLUDE_AND_SEEK_NEXT_ROW:
case INCLUDE_AND_SEEK_NEXT_COL:
Filter f = matcher.getFilter();
if (f != null) {
kv = KeyValueUtil.ensureKeyValue(f.transformCell(kv));
}
this.countPerRow++;
if (storeLimit > -1 &&
this.countPerRow > (storeLimit + storeOffset)) {
if (!matcher.moreRowsMayExistAfter(kv)) {
return false;
}
seekTonextRow(kv);
break LOOP;
}
if (this.countPerRow > storeOffset) {
outResult.add(kv);
count++;
}
if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
if (!matcher.moreRowsMayExistAfter(kv)) {
return false;
}
seekTonextRow(kv);
} else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
seekAsDirection(matcher.getKeyForNextColumn(kv));
} else {
this.heap.next();
}
if (limit > 0 && (count == limit)) {
break LOOP;
}
continue;
case DONE:
return true;
case DONE_SCAN:
close();
return false;
case SEEK_NEXT_ROW:
if (!matcher.moreRowsMayExistAfter(kv)) {
return false;
}
seekTonextRow(kv);
break;
case SEEK_NEXT_COL:
seekAsDirection(matcher.getKeyForNextColumn(kv));
break;
case SKIP:
this.heap.next();
break;
case SEEK_NEXT_USING_HINT:
KeyValue nextKV = KeyValueUtil.ensureKeyValue(matcher.getNextKeyHint(kv));
if (nextKV != null) {
seekAsDirection(nextKV);
} else {
heap.next();
}
break;
default:
throw new RuntimeException("UNEXPECTED");
}
} while((kv = this.heap.peek()) != null);
这里调用的peek()方法是StoreFile的peek()方法,再查看一下StoreFile.peek()方法:可以看到该方法返回了当前的KeyValue
public KeyValue peek() {
return cur;
}
StoreFile.peek()方法仅仅是返回了当前的KeyValue,真正让cur指向下一个KeyValue的是StoreScanner.seekTonextRow() 方法:
protected boolean seekTonextRow(KeyValue kv) throws IOException {
return reseek(matcher.getKeyForNextRow(kv));
}
继续往下追踪可以找到StoreFileScanner.reseek()方法:可以看到是通过hfs.getKeyValue()改变的cur的值
public boolean reseek(KeyValue key) throws IOException {
if (seekCount != null) seekCount.incrementAndGet();
try {
try {
if (!reseekAtOrAfter(hfs, key)) {
close();
return false;
}
cur = hfs.getKeyValue();
return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
} finally {
realSeekDone = true;
}
} catch (FileNotFoundException e) {
throw e;
} catch (IOException ioe) {
throw new IOException("Could not reseek " + this + " to key " + key,
ioe);
}
}
其中的hfs为HFileScanner这个接口,现在需要清楚它调用的getKeyValue()方法究竟是哪个类的,我们可以追踪RegionScannerImpl构造函数中的HStore.getScanner():HStore.getScanner()->new StoreScanner->StoreScanner.getScannersNoCompaction()->HStore.getScanners()-> StoreFileScanner.getScannersForStoreFiles->StoreFile.Reader.getStoreFileScanner->getScanner() ,最后的getScanner()方法为
public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
final boolean isCompaction) {
if (dataBlockEncoder.useEncodedScanner()) {
return new EncodedScannerV2(this, cacheBlocks, pread, isCompaction,
hfileContext);
}
return new ScannerV2(this, cacheBlocks, pread, isCompaction);
}
再据此找到ScannerV2的相关方法:
@Override
public KeyValue getKeyValue() {
if (!isSeeked())
return null;
return formNoTagsKeyValue();
}
protected KeyValue formNoTagsKeyValue() {
KeyValue ret = new NoTagsKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
+ blockBuffer.position(), getCellBufSize());
if (this.reader.shouldIncludeMemstoreTS()) {
ret.setMvccVersion(currMemstoreTS);
}
return ret;
}
结合KeyValue的构造函数:我们可以知道ScannerV2 将HFile视为byte数组,通过偏移量和长度截取这个byte的数据来获取keyvalue的值
public KeyValue(final byte [] bytes, final int offset, final int length) {
this.bytes = bytes;
this.offset = offset;
this.length = length;
}



