2021SC@SDUSC
目录
compact源码分析
2021SC@SDUSC
接上一篇继续分析
compact源码分析
现在来看第十一步——替换StoreFiles
查看replaceStoreFiles()方法:调用addCompactionResults()去除已被合并的文件compactedFiles,将合并后的文件添加到StoreFileManager的storefiles中;把filesCompacting中已被合并的文件删除
private void replaceStoreFiles(final CollectioncompactedFiles, final Collection result) throws IOException { this.lock.writeLock().lock(); try { this.storeEngine.getStoreFileManager().addCompactionResults(compactedFiles, result); filesCompacting.removeAll(compactedFiles); // safe bc: lock.writeLock(); } finally { this.lock.writeLock().unlock(); } }
第十三步——归档旧文件,关闭旧文件上的Reader,并更新store大小
查看completeCompaction()方法:
@VisibleForTesting protected void completeCompaction(final CollectioncompactedFiles) throws IOException { try { notifyChangedReadersObservers(); LOG.debug("Removing store files after compaction..."); //遍历已被合并的文件,关闭其上的Reader for (StoreFile compactedFile : compactedFiles) { compactedFile.closeReader(true); } //归档,将旧的文件从原位置移到归档目录下 this.fs.removeStoreFiles(this.getColumnFamilyName(), compactedFiles); } catch (IOException e) { e = RemoteExceptionHandler.checkIOException(e); LOG.error("Failed removing compacted files in " + this + ". Files we were trying to remove are " + compactedFiles.toString() + "; some of them may have been already removed", e); } //计算新的store大小 this.storeSize = 0L; this.totalUncompressedBytes = 0L; for (StoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) { StoreFile.Reader r = hsf.getReader(); if (r == null) { LOG.warn("StoreFile " + hsf + " has a null Reader"); continue; } this.storeSize += r.length(); this.totalUncompressedBytes += r.getTotalUncompressedBytes(); } }
据此查看HRegionFileSystem.removeStoreFiles()方法 :可以看到它是通过HFileArchiver.archiveStoreFiles()方法来完成
public void removeStoreFiles(final String familyName, final CollectionstoreFiles) throws IOException { HFileArchiver.archiveStoreFiles(this.conf, this.fs, this.regionInfo, this.tableDir, Bytes.toBytes(familyName), storeFiles); }
再来查看 HFileArchiver.archiveStoreFiles()方法:
public static void archiveStoreFiles(Configuration conf, FileSystem fs, HRegionInfo regionInfo,
Path tableDir, byte[] family, Collection compactedFiles) throws IOException {
if (fs == null) {
LOG.warn("Passed filesystem is null, so just deleting the files without archiving for region:"
+ Bytes.toString(regionInfo.getRegionName()) + ", family:" + Bytes.toString(family));
deleteStoreFilesWithoutArchiving(compactedFiles);
return;
}
//判断被合并文件列表compactedFiles是否为空,如果为空,直接返回
if (compactedFiles.size() == 0) {
LOG.debug("No store files to dispose, done!");
return;
}
if (regionInfo == null || family == null) throw new IOException(
"Need to have a region and a family to archive from.");
//获取归档路径
Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, regionInfo, tableDir, family);
if (!fs.mkdirs(storeArchiveDir)) {
throw new IOException("Could not make archive directory (" + storeArchiveDir + ") for store:"
+ Bytes.toString(family) + ", deleting compacted files instead.");
}
if (LOG.isTraceEnabled()) LOG.trace("Archiving compacted store files.");
StoreToFile getStorePath = new StoreToFile(fs);
Collection storeFiles = Collections2.transform(compactedFiles, getStorePath);
//调用resolveAndArchive()方法执行归档
if (!resolveAndArchive(fs, storeArchiveDir, storeFiles)) {
throw new IOException("Failed to archive/delete all the files for region:"
+ Bytes.toString(regionInfo.getRegionName()) + ", family:" + Bytes.toString(family)
+ " into " + storeArchiveDir + ". Something is probably awry on the filesystem.");
}
}
再来查看resolveAndArchive()方法,这里经过了多层调用,我们直接看最后一层的关键代码:第十二步中不是删除文件,而是通过rename()方法将旧文件移至了归档路径
Path backedupArchiveFile = new Path(archiveDir, filename + SEPARATOR + archiveStartTime);
if (!fs.rename(archiveFile, backedupArchiveFile)) {
LOG.error("Could not rename archive file to backup: " + backedupArchiveFile
+ ", deleting existing file in favor of newer.");
if (!fs.delete(archiveFile, false)) {
throw new IOException("Couldn't delete existing archive file (" + archiveFile
+ ") or rename it to the backup file (" + backedupArchiveFile
+ ") to make room for similarly named file.");
}
}
最后再来查看一下HStore.compact()方法中最后调用的finishCompactionRequest()方法:Region将合并请求汇报至中端;删除filesCompacting中的待合并文件
private void finishCompactionRequest(CompactionRequest cr) {
//Region将合并请求汇报至中端
this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize());
if (cr.isOffPeak()) {
offPeakCompactionTracker.set(false);
cr.setOffPeak(false);
}
//删除filesCompacting中的待合并文件
synchronized (filesCompacting) {
filesCompacting.removeAll(cr.getFiles());
}
}



