栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

2021SC@SDUSC HBase项目分析:compact流程分析(三)

2021SC@SDUSC HBase项目分析:compact流程分析(三)

2021SC@SDUSC

目录

compact源码分析


2021SC@SDUSC 

接上一篇继续分析

compact源码分析

现在来看第十一步——替换StoreFiles

查看replaceStoreFiles()方法:调用addCompactionResults()去除已被合并的文件compactedFiles,将合并后的文件添加到StoreFileManager的storefiles中;把filesCompacting中已被合并的文件删除

  private void replaceStoreFiles(final Collection compactedFiles,
      final Collection result) throws IOException {
    this.lock.writeLock().lock();
    try {
      this.storeEngine.getStoreFileManager().addCompactionResults(compactedFiles, result);
      filesCompacting.removeAll(compactedFiles); // safe bc: lock.writeLock();
    } finally {
      this.lock.writeLock().unlock();
    }
  }

第十三步——归档旧文件,关闭旧文件上的Reader,并更新store大小

查看completeCompaction()方法:

@VisibleForTesting
  protected void completeCompaction(final Collection compactedFiles)
      throws IOException {
    try {
      notifyChangedReadersObservers();
      LOG.debug("Removing store files after compaction...");
      //遍历已被合并的文件,关闭其上的Reader
      for (StoreFile compactedFile : compactedFiles) {
        compactedFile.closeReader(true);
      }
      //归档,将旧的文件从原位置移到归档目录下
      this.fs.removeStoreFiles(this.getColumnFamilyName(), compactedFiles);
    } catch (IOException e) {
      e = RemoteExceptionHandler.checkIOException(e);
      LOG.error("Failed removing compacted files in " + this +
        ". Files we were trying to remove are " + compactedFiles.toString() +
        "; some of them may have been already removed", e);
    }
    //计算新的store大小
    this.storeSize = 0L;
    this.totalUncompressedBytes = 0L;
    for (StoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) {
      StoreFile.Reader r = hsf.getReader();
      if (r == null) {
        LOG.warn("StoreFile " + hsf + " has a null Reader");
        continue;
      }
      this.storeSize += r.length();
      this.totalUncompressedBytes += r.getTotalUncompressedBytes();
    }
  }

据此查看HRegionFileSystem.removeStoreFiles()方法 :可以看到它是通过HFileArchiver.archiveStoreFiles()方法来完成

public void removeStoreFiles(final String familyName, final Collection storeFiles)
      throws IOException {
    HFileArchiver.archiveStoreFiles(this.conf, this.fs, this.regionInfo,
        this.tableDir, Bytes.toBytes(familyName), storeFiles);
  }

再来查看 HFileArchiver.archiveStoreFiles()方法:

public static void archiveStoreFiles(Configuration conf, FileSystem fs, HRegionInfo regionInfo,
      Path tableDir, byte[] family, Collection compactedFiles) throws IOException {
    if (fs == null) {
      LOG.warn("Passed filesystem is null, so just deleting the files without archiving for region:"
          + Bytes.toString(regionInfo.getRegionName()) + ", family:" + Bytes.toString(family));
      deleteStoreFilesWithoutArchiving(compactedFiles);
      return;
    }
    //判断被合并文件列表compactedFiles是否为空,如果为空,直接返回
    if (compactedFiles.size() == 0) {
      LOG.debug("No store files to dispose, done!");
      return;
    }
    if (regionInfo == null || family == null) throw new IOException(
        "Need to have a region and a family to archive from.");
    //获取归档路径
    Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, regionInfo, tableDir, family);
    if (!fs.mkdirs(storeArchiveDir)) {
      throw new IOException("Could not make archive directory (" + storeArchiveDir + ") for store:"
          + Bytes.toString(family) + ", deleting compacted files instead.");
    }
    if (LOG.isTraceEnabled()) LOG.trace("Archiving compacted store files.");
    StoreToFile getStorePath = new StoreToFile(fs);
    Collection storeFiles = Collections2.transform(compactedFiles, getStorePath);
    //调用resolveAndArchive()方法执行归档
    if (!resolveAndArchive(fs, storeArchiveDir, storeFiles)) {
      throw new IOException("Failed to archive/delete all the files for region:"
          + Bytes.toString(regionInfo.getRegionName()) + ", family:" + Bytes.toString(family)
          + " into " + storeArchiveDir + ". Something is probably awry on the filesystem.");
    }
  }

 再来查看resolveAndArchive()方法,这里经过了多层调用,我们直接看最后一层的关键代码:第十二步中不是删除文件,而是通过rename()方法将旧文件移至了归档路径

      Path backedupArchiveFile = new Path(archiveDir, filename + SEPARATOR + archiveStartTime);
      if (!fs.rename(archiveFile, backedupArchiveFile)) {
        LOG.error("Could not rename archive file to backup: " + backedupArchiveFile
            + ", deleting existing file in favor of newer.");
        if (!fs.delete(archiveFile, false)) {
          throw new IOException("Couldn't delete existing archive file (" + archiveFile
              + ") or rename it to the backup file (" + backedupArchiveFile
              + ") to make room for similarly named file.");
        }
      }

最后再来查看一下HStore.compact()方法中最后调用的finishCompactionRequest()方法:Region将合并请求汇报至中端;删除filesCompacting中的待合并文件

  private void finishCompactionRequest(CompactionRequest cr) {
    //Region将合并请求汇报至中端
    this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize());
    if (cr.isOffPeak()) {
      offPeakCompactionTracker.set(false);
      cr.setOffPeak(false);
    }
    //删除filesCompacting中的待合并文件
    synchronized (filesCompacting) {
      filesCompacting.removeAll(cr.getFiles());
    }
  }

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/652560.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号