栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

2021SC@SDUSC HBase项目分析:compact流程分析(二)

2021SC@SDUSC HBase项目分析:compact流程分析(二)

2021SC@SDUSC

目录

compact源码分析


2021SC@SDUSC 

接上一篇继续分析

compact源码分析

查看HRegion.compact()方法中调用的HStore.compact()方法:

  1. 从CompactionContext类型的compaction中获取合并请求CompactionRequest
  2. 设置comapct的开始时间
  3. 确保合并请求compaction不为空
  4. 从cr中获取需要合并的文件集合filesToCompact
  5. 确保filesToCompact不为空
  6. 确保filesCompacting中包含所有的待合并文件
  7. 调用CompactionContext的compact()方法进行合并,该方法会返回合并后的新文件
  8. 根据参数hbase.hstore.compaction.complete判断是否要完整的进行compact,默认为true。如果配置为false,则创建StoreFile列表sfs,遍历的合并后的文件newFiles,针对每个文件创建StoreFile和Reader,关闭StoreFile上的Reader,并将创建的StoreFile添加至列表sfs,最后返回sfs,该方法至此结束;如果配置为true,则不进行上述过程,继续向下执行
  9. 调用moveCompatedFilesIntoPlace()方法将合并后的新文件移至正确地方,并创建StoreFile和Reader,该方法会返回StoreFile列表
  10. 在WAL中写入Compaction记录
  11. 替换StoreFiles
  12. 根据合并的类型,对不同的计数器进行累加,方便系统性能指标监控
  13. 归档旧文件,关闭旧文件上的Reader,并更新store大小
  14. 记录日志
  15. 返回sfs
  @Override
  public List compact(CompactionContext compaction,
    CompactionThroughputController throughputController, User user) throws IOException {
    assert compaction != null;
    List sfs = null;
    //从CompactionContext类型的compaction中获取合并请求CompactionRequest
    CompactionRequest cr = compaction.getRequest();
    try {
      //设置compact的开始时间
      long compactionStartTime = EnvironmentEdgeManager.currentTimeMillis();
      //确保合并请求request不为空
      assert compaction.hasSelection();

      // 从cr中获得需要合并的文件集合filesToCompact
      Collection filesToCompact = cr.getFiles();
      //确保filesToCompact不为空
      assert !filesToCompact.isEmpty();

      //确保filesCompact包含所有的待合并文件
      synchronized (filesCompacting) {
        Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
      }

      LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
          + this + " of " + this.getRegionInfo().getRegionNameAsString()
          + " into tmpdir=" + fs.getTempDir() + ", totalSize="
          + StringUtils.humanReadableInt(cr.getSize()));

      // 调用CompactionContext的compact()方法进行合并,该方法会返回合并后的新文件
      List newFiles = compaction.compact(throughputController, user);
      //根据参数hbase.hstore.compaction.complete判断是否要完整的进行compact
      if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
        LOG.warn("hbase.hstore.compaction.complete is set to false");
        //创建StoreFile列表sfs
        sfs = new ArrayList(newFiles.size());
        // 遍历合并后的文件newFiles,针对每个文件创建StoreFile和Reader,关闭StoreFile上的Reader,并将创建的StoreFile添加至sfs
        for (Path newFile : newFiles) {
          StoreFile sf = createStoreFileAndReader(newFile);
          sf.closeReader(true);
          sfs.add(sf);
        }
        return sfs;
      }
      //调用moveCompatedFilesIntoPlace()将合并后的新文件移至正确的地方,创建StoreFile和Reader,该方法会返回StoreFile列表
      sfs = moveCompatedFilesIntoPlace(cr, newFiles, user);
      //在WAL中写入Compaction记录
      writeCompactionWalRecord(filesToCompact, sfs);
      //替换StoreFiles
      replaceStoreFiles(filesToCompact, sfs);

      //根据合并的类型,对不同的计数器进行累加,方便系统性能指标监控
      if (cr.isMajor()) {
        majorCompactedCellsCount += getCompactionProgress().totalCompactingKVs;
        majorCompactedCellsSize += getCompactionProgress().totalCompactedSize;
      } else {
        compactedCellsCount += getCompactionProgress().totalCompactingKVs;
        compactedCellsSize += getCompactionProgress().totalCompactedSize;
      }
      //归档旧文件,关闭旧文件上的Reader,并更新store大小
      completeCompaction(filesToCompact); // Archive old files & update store size.
      //记录日志
      logCompactionEndMessage(cr, sfs, compactionStartTime);
      //返回sfs
      return sfs;
    } finally {
      finishCompactionRequest(cr);
    }
  }

现在HStore.compact()方法的基本流程已经分析完成,我们再来分析一下其中的一些细节

第九步:调用moveCompatedFilesIntoPlace()方法将合并后的新文件移至正确地方,并创建StoreFile和Reader,该方法会返回StoreFile列表

查看moveCompatedFilesIntoPlace()方法:首先创建StoreFile列表sfs,遍历合并后的文件newFiles:调用moveFileIntoPlace()方法将newFile移动至正确地点,为其创建StoreFile和Reader,然后将创建的StoreFile添加至sfs中。

 private List moveCompatedFilesIntoPlace(
      final CompactionRequest cr, List newFiles, User user) throws IOException {
    //创建StoreFile列表
    List sfs = new ArrayList(newFiles.size());
    //遍历newFiles
    for (Path newFile : newFiles) {
      assert newFile != null;
      //将newFile移动至正确地点,并为其创建StoreFile和Reader
      final StoreFile sf = moveFileIntoPlace(newFile);
      if (this.getCoprocessorHost() != null) {
        final Store thisStore = this;
        if (user == null) {
          getCoprocessorHost().postCompact(thisStore, sf, cr);
        } else {
          try {
            user.getUGI().doAs(new PrivilegedExceptionAction() {
              @Override
              public Void run() throws Exception {
                getCoprocessorHost().postCompact(thisStore, sf, cr);
                return null;
              }
            });
          } catch (InterruptedException ie) {
            InterruptedIOException iioe = new InterruptedIOException();
            iioe.initCause(ie);
            throw iioe;
          }
        }
      }
      assert sf != null;
      sfs.add(sf);
    }
    return sfs;
  }

再据此查看 moveFileIntoPlace()方法:首先检测新文件,然后调用HRegionFileSystem.commitStoreFile()方法将新文件移至正确位置,最后调用createStoreFileAndReader()方法创建StoreFile和Reader

  StoreFile moveFileIntoPlace(final Path newFile) throws IOException {
    //检测新文件
    validateStoreFile(newFile);
    //将文件移至正确地点
    Path destPath = fs.commitStoreFile(getColumnFamilyName(), newFile);
    //创建StoreFile和Reader
    return createStoreFileAndReader(destPath);
  }

查看HRegionFileSystem.commitStoreFile()方法 :根据列族名familyName获取存储路径,对该路径进行检测,如果不存在,则进行创建,通过buildPath来获取文件名,然后利用存储路径storeDir和文件名name构造目标路径storeDir,最后通过rename()方法实现文件从buildPath至dstPath的移动

  private Path commitStoreFile(final String familyName, final Path buildPath,
      final long seqNum, final boolean generateNewName) throws IOException {
    //根据列族名familyName获取存储路径
    Path storeDir = getStoreDir(familyName);
    //如果在fs中不存在该路径并且创建失败,则抛出异常
    if(!fs.exists(storeDir) && !createDir(storeDir))
      throw new IOException("Failed creating " + storeDir);

    String name = buildPath.getName();
    if (generateNewName) {
      name = generateUniqueName((seqNum < 0) ? null : "_SeqId_" + seqNum + "_");
    }
    Path dstPath = new Path(storeDir, name);
    if (!fs.exists(buildPath)) {
      throw new FileNotFoundException(buildPath.toString());
    }
    LOG.debug("Committing store file " + buildPath + " as " + dstPath);
    if (!rename(buildPath, dstPath)) {
      throw new IOException("Failed rename of " + buildPath + " to " + dstPath);
    }
    return dstPath;
  }

第九步的分析到此结束,现在再来分析第十步——在WAL中写入Compaction记录

查看writeCompactionWalRecord()方法:将被合并的文件路径添加至inputPaths列表,将合并后的文件路径添加至outputPaths列表,获取HRegionInfo,然后据此构造compact的描述信息CompactionDescriptor,最后通过writeCompactionMarker()写入WAL

  private void writeCompactionWalRecord(Collection filesCompacted,
      Collection newFiles) throws IOException {
    if (region.getLog() == null) return;
    //将被合并的文件路径添加至inputPaths列表
    List inputPaths = new ArrayList(filesCompacted.size());
    for (StoreFile f : filesCompacted) {
      inputPaths.add(f.getPath());
    }
    //将合并后的文件路径添加至outputPaths列表
    List outputPaths = new ArrayList(newFiles.size());
    for (StoreFile f : newFiles) {
      outputPaths.add(f.getPath());
    }
    //获取HRegionInfo
    HRegionInfo info = this.region.getRegionInfo();
    //构造compaction的描述信息CompactionDescriptor
    CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info,
        family.getName(), inputPaths, outputPaths, fs.getStoreDir(getFamily().getNameAsString()));
    //调用writeCompactionMarker()方法,在WAL中写入一个合并标记
    HLogUtil.writeCompactionMarker(region.getLog(), this.region.getTableDesc(),
        this.region.getRegionInfo(), compactionDescriptor, this.region.getSequenceId());
  }

查看CompactionDescriptor的结构和toCompactionDescriptor()方法:CompactionDescriptor包含表名table_name、Region名encoded_region_name、列族名family_name等关键信息

message CompactionDescriptor {
  required bytes table_name = 1; // TODO: WALKey already stores these, might remove
  required bytes encoded_region_name = 2;
  required bytes family_name = 3;
  repeated string compaction_input = 4;
  repeated string compaction_output = 5;
  required string store_home_dir = 6;
  optional bytes  region_name = 7; // full region name
}
  public static CompactionDescriptor toCompactionDescriptor(HRegionInfo info, byte[] family,
      List inputPaths, List outputPaths, Path storeDir) {
    // compaction descriptor contains relative paths.
    // input / output paths are relative to the store dir
    // store dir is relative to region dir
    CompactionDescriptor.Builder builder = CompactionDescriptor.newBuilder()
        .setTableName(ByteStringer.wrap(info.getTableName()))
        .setEncodedRegionName(ByteStringer.wrap(info.getEncodedNameAsBytes()))
        .setFamilyName(ByteStringer.wrap(family))
        .setStoreHomeDir(storeDir.getName()); //make relative
    for (Path inputPath : inputPaths) {
      builder.addCompactionInput(inputPath.getName()); //relative path
    }
    for (Path outputPath : outputPaths) {
      builder.addCompactionOutput(outputPath.getName());
    }
    builder.setRegionName(ByteStringer.wrap(info.getRegionName()));
    return builder.build();
  }

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/600215.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号