栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

2021SC@SDUSC HBase项目分析:读数据流程源码分析(四)

2021SC@SDUSC HBase项目分析:读数据流程源码分析(四)

2021SC@SDUSC 

目录

Get过程源码分析(0.98版本)

HTable.get() 


2021SC@SDUSC 

Get过程源码分析(0.98版本)

接上一篇继续分析

HTable.get() 

根据scanner = getScanner(scan)这个语句来查看HRegion.getScanner()方法:先将所有列族添加到scan中,然后检查各个列族的有效性,最后通过instantiateRegionScanner()构造RegionScanner,并返回RegionScanner

  void prepareScanner(Scan scan) throws IOException {
    if (!scan.hasFamilies()) {
      for (byte[] family : this.htableDescriptor.getFamiliesKeys()) {
        scan.addFamily(family);
      }
    }
  }
  protected RegionScanner getScanner(Scan scan,
      List additionalScanners) throws IOException {
    startRegionOperation(Operation.SCAN);
    try {
      prepareScanner(scan);
      if(scan.hasFamilies()) {
        for(byte [] family : scan.getFamilyMap().keySet()) {
          checkFamily(family);
        }
      }
      return instantiateRegionScanner(scan, additionalScanners);
    } finally {
      closeRegionOperation(Operation.SCAN);
    }
  }

 接下来查看HRegion.instantiateRegionScanner()方法:在这里会真正构造RegionScanner的实现类RegionScannerImpl(暂时先不考虑ReversedRegionScannerImpl)

protected RegionScanner instantiateRegionScanner(Scan scan,
      List additionalScanners) throws IOException {
    if (scan.isReversed()) {
      if (scan.getFilter() != null) {
        scan.getFilter().setReversed(true);
      }
      return new ReversedRegionScannerImpl(scan, additionalScanners, this);
    }
    return new RegionScannerImpl(scan, additionalScanners, this);
  }

再查看正常的RegionScannerImpl的构造函数:在这里会初始化StoreScanner,每一个列族对应一个store,也就是会对应一个storescanner,并把所有列族的storescanner存到一个排序堆里,也就是KeyValueHeap中

for (Map.Entry> entry :
          scan.getFamilyMap().entrySet()) {
        Store store = stores.get(entry.getKey());
        KeyValueScanner scanner;
        try {
          scanner = store.getScanner(scan, entry.getValue(), this.readPt);
        } catch (FileNotFoundException e) {
          abortRegionServer(e.getMessage());
          throw new NotServingRegionException(region.getRegionNameAsString() + " is closing");
        }
        if (this.filter == null || !scan.doLoadColumnFamiliesonDemand()
          || this.filter.isFamilyEssential(entry.getKey())) {
          scanners.add(scanner);
        } else {
          joinedScanners.add(scanner);
        }
      }

再回到HRegion.get()方法,可以看到执行完scanner = getScanner(scan)这个语句之后,就通过scanner.next(results)来读数据

我们可以通过RegionScannerImpl.next()方法,找到RegionScannerImpl.nextRaw()方法,再据此找到RegionScannerImpl.nextInternal()方法,具体代码不再贴出

查看RegionScannerImpl.nextInternal()方法,代码很长,一步步进行分析,首先从storeHeap里取了个值,第一次取得的值是null,后面会往storeHeap里添加数据

KeyValue current = this.storeHeap.peek();

byte[] currentRow = null;
int offset = 0;
short length = 0;
if (current != null) {
  currentRow = current.getBuffer();
  offset = current.getRowOffset();
  length = current.getRowLength();
}

然后判断当前数据是否是停止行,如果是停止行,就直接返回false:

boolean stopRow = isStopRow(currentRow, offset, length);
if (joinedContinuationRow == null) {
  if (stopRow) {
    if (filter != null && filter.hasFilterRow()) {
      filter.filterRowCells(results);
    }
    return false;
  }

接着调用过滤器判断下一步的行为,如果这条数据无法满足filter的要求,就再判断下是否还有更多数据,没有就返回,有就continue循环:

if (filterRowKey(currentRow, offset, length)) {
            boolean moreRows = nextRow(currentRow, offset, length);
            if (!moreRows) return false;
            results.clear();
            continue;
          }

此时停止行与过滤器均已通过,可以开始读数据:

KeyValue nextKv = populateResult(results, this.storeHeap, limit, currentRow, offset,
              length);

查看populateResult()方法: 遍历数据。populateResult的遍历和外层nextInternal遍历的差别在于populateResult遍历负责找齐一行数据的所有列,而外层的遍历是行级别的遍历,同时负责对一整行数据进行过滤,包括stopRow、Filter

 private KeyValue populateResult(List results, KeyValueHeap heap, int limit,
        byte[] currentRow, int offset, short length) throws IOException {
      KeyValue nextKv;
      try {
        do {
          heap.next(results, limit - results.size());
          if (limit > 0 && results.size() == limit) {
            return KV_LIMIT;
          }
          nextKv = heap.peek();
        } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
      } catch (FileNotFoundException e) {
        abortRegionServer(e.getMessage());
        throw new NotServingRegionException(region.getRegionNameAsString() + " is closing");
      }
      return nextKv;
    }

再据此查看KeyValueHeap.next()方法:在这里调用了current,即StoreScanner来读取下一行数据,StoreScanner.next()会在后面分析

public boolean next(List result, int limit) throws IOException {
    if (this.current == null) {
      return false;
    }
    InternalScanner currentAsInternal = (InternalScanner)this.current;
    boolean mayContainMoreRows = currentAsInternal.next(result, limit);
    KeyValue pee = this.current.peek();
    
    if (pee == null || !mayContainMoreRows) {
      this.current.close();
    } else {
      this.heap.add(this.current);
    }
    this.current = null;
    this.current = pollRealKV();
    return (this.current != null);
  }

回到RegionScannerImpl.nextInternal()方法,此时该行数据已经取完,接下来判断下一行是不是停止行,如果不是停止行,则continue循环

stopRow = nextKv == null ||
    isStopRow(nextKv.getBuffer(), nextKv.getRowOffset(), nextKv.getRowLength());
final boolean isEmptyRow = results.isEmpty();

FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
if (filter != null && filter.hasFilterRow()) {
  ret = filter.filterRowCellsWithRet(results);
}

if ((isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE) || filterRow()) {
  results.clear();
  boolean moreRows = nextRow(currentRow, offset, length);
  if (!moreRows) return false;

  if (!stopRow) continue;
  return false;
          }

现在 RegionScannerImpl.nextInternal()方法已经分析结束,接下来分析StoreScanner.next()方法

查看StoreScanner.next():(此处仅贴出关键代码)进入循环后,通过ScanQueryMatcher来判断操作结果,分为多种情况,在下面的代码中可以看到具体的分类。如果row和列族都匹配,就进入INCLUDE中;对于一个KeyValueScanner来说,不断读数据直到将所有数据读完,在此过程中将取到的数据放入outResult中,然后让this.heap取下一个值,如果当前KeyValueScanner数据全部读完,那么就从KeyValueHeap中取下一个KeyValueScanner读数据,如此循环,直到读完所有数据。

LOOP: do {
  if (prevKV != kv) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
  checkScanOrder(prevKV, kv, comparator);
  prevKV = kv;
  ScanQueryMatcher.MatchCode qcode = matcher.match(kv);
  qcode = optimize(qcode, kv);
  switch(qcode) {
    case INCLUDE:
    case INCLUDE_AND_SEEK_NEXT_ROW:
    case INCLUDE_AND_SEEK_NEXT_COL:
      Filter f = matcher.getFilter();
      if (f != null) {
        kv = KeyValueUtil.ensureKeyValue(f.transformCell(kv));
      }
      this.countPerRow++;
      if (storeLimit > -1 &&
          this.countPerRow > (storeLimit + storeOffset)) {
        if (!matcher.moreRowsMayExistAfter(kv)) {
          return false;
        }
        seekTonextRow(kv);
        break LOOP;
      }
      if (this.countPerRow > storeOffset) {
        outResult.add(kv);
        count++;
      }
      if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
        if (!matcher.moreRowsMayExistAfter(kv)) {
          return false;
        }
        seekTonextRow(kv);
      } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
        seekAsDirection(matcher.getKeyForNextColumn(kv));
      } else {
        this.heap.next();
      }
          if (limit > 0 && (count == limit)) {
            break LOOP;
          }
          continue;
        case DONE:
          return true;
        case DONE_SCAN:
          close();
          return false;
        case SEEK_NEXT_ROW:
          if (!matcher.moreRowsMayExistAfter(kv)) {
            return false;
          }
          seekTonextRow(kv);
          break;
        case SEEK_NEXT_COL:
          seekAsDirection(matcher.getKeyForNextColumn(kv));
          break;
        case SKIP:
          this.heap.next();
          break;
        case SEEK_NEXT_USING_HINT:
          KeyValue nextKV = KeyValueUtil.ensureKeyValue(matcher.getNextKeyHint(kv));
          if (nextKV != null) {
            seekAsDirection(nextKV);
          } else {
            heap.next();
          }
          break;
        default:
          throw new RuntimeException("UNEXPECTED");
      }
    } while((kv = this.heap.peek()) != null);

这里调用的peek()方法是StoreFile的peek()方法,再查看一下StoreFile.peek()方法:可以看到该方法返回了当前的KeyValue

public KeyValue peek() {
    return cur;
  }

StoreFile.peek()方法仅仅是返回了当前的KeyValue,真正让cur指向下一个KeyValue的是StoreScanner.seekTonextRow() 方法:

protected boolean seekTonextRow(KeyValue kv) throws IOException {
    return reseek(matcher.getKeyForNextRow(kv));
  }

继续往下追踪可以找到StoreFileScanner.reseek()方法:可以看到是通过hfs.getKeyValue()改变的cur的值

public boolean reseek(KeyValue key) throws IOException {
    if (seekCount != null) seekCount.incrementAndGet();

    try {
      try {
        if (!reseekAtOrAfter(hfs, key)) {
          close();
          return false;
        }
        cur = hfs.getKeyValue();

        return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
      } finally {
        realSeekDone = true;
      }
    } catch (FileNotFoundException e) {
      throw e;
    } catch (IOException ioe) {
      throw new IOException("Could not reseek " + this + " to key " + key,
          ioe);
    }
  }

 其中的hfs为HFileScanner这个接口,现在需要清楚它调用的getKeyValue()方法究竟是哪个类的,我们可以追踪RegionScannerImpl构造函数中的HStore.getScanner():HStore.getScanner()->new StoreScanner->StoreScanner.getScannersNoCompaction()->HStore.getScanners()-> StoreFileScanner.getScannersForStoreFiles->StoreFile.Reader.getStoreFileScanner->getScanner() ,最后的getScanner()方法为

public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
      final boolean isCompaction) {
    if (dataBlockEncoder.useEncodedScanner()) {
      return new EncodedScannerV2(this, cacheBlocks, pread, isCompaction,
          hfileContext);
    }
 
    return new ScannerV2(this, cacheBlocks, pread, isCompaction);
  }

再据此找到ScannerV2的相关方法:

  @Override
    public KeyValue getKeyValue() {
      if (!isSeeked())
        return null;
      return formNoTagsKeyValue();
    }
    protected KeyValue formNoTagsKeyValue() {
      KeyValue ret = new NoTagsKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
          + blockBuffer.position(), getCellBufSize());
      if (this.reader.shouldIncludeMemstoreTS()) {
        ret.setMvccVersion(currMemstoreTS);
      }
      return ret;
    }

结合KeyValue的构造函数:我们可以知道ScannerV2 将HFile视为byte数组,通过偏移量和长度截取这个byte的数据来获取keyvalue的值

public KeyValue(final byte [] bytes, final int offset, final int length) {
    this.bytes = bytes;
    this.offset = offset;
    this.length = length;
  }

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/344337.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号