2021SC@SDUSC
目录
compact源码分析
2021SC@SDUSC
接上一篇继续分析
compact源码分析
查看HRegion.compact()方法中调用的HStore.compact()方法:
- 从CompactionContext类型的compaction中获取合并请求CompactionRequest
- 设置comapct的开始时间
- 确保合并请求compaction不为空
- 从cr中获取需要合并的文件集合filesToCompact
- 确保filesToCompact不为空
- 确保filesCompacting中包含所有的待合并文件
- 调用CompactionContext的compact()方法进行合并,该方法会返回合并后的新文件
- 根据参数hbase.hstore.compaction.complete判断是否要完整的进行compact,默认为true。如果配置为false,则创建StoreFile列表sfs,遍历的合并后的文件newFiles,针对每个文件创建StoreFile和Reader,关闭StoreFile上的Reader,并将创建的StoreFile添加至列表sfs,最后返回sfs,该方法至此结束;如果配置为true,则不进行上述过程,继续向下执行
- 调用moveCompatedFilesIntoPlace()方法将合并后的新文件移至正确地方,并创建StoreFile和Reader,该方法会返回StoreFile列表
- 在WAL中写入Compaction记录
- 替换StoreFiles
- 根据合并的类型,对不同的计数器进行累加,方便系统性能指标监控
- 归档旧文件,关闭旧文件上的Reader,并更新store大小
- 记录日志
- 返回sfs
@Override public Listcompact(CompactionContext compaction, CompactionThroughputController throughputController, User user) throws IOException { assert compaction != null; List sfs = null; //从CompactionContext类型的compaction中获取合并请求CompactionRequest CompactionRequest cr = compaction.getRequest(); try { //设置compact的开始时间 long compactionStartTime = EnvironmentEdgeManager.currentTimeMillis(); //确保合并请求request不为空 assert compaction.hasSelection(); // 从cr中获得需要合并的文件集合filesToCompact Collection filesToCompact = cr.getFiles(); //确保filesToCompact不为空 assert !filesToCompact.isEmpty(); //确保filesCompact包含所有的待合并文件 synchronized (filesCompacting) { Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact)); } LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in " + this + " of " + this.getRegionInfo().getRegionNameAsString() + " into tmpdir=" + fs.getTempDir() + ", totalSize=" + StringUtils.humanReadableInt(cr.getSize())); // 调用CompactionContext的compact()方法进行合并,该方法会返回合并后的新文件 List newFiles = compaction.compact(throughputController, user); //根据参数hbase.hstore.compaction.complete判断是否要完整的进行compact if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) { LOG.warn("hbase.hstore.compaction.complete is set to false"); //创建StoreFile列表sfs sfs = new ArrayList (newFiles.size()); // 遍历合并后的文件newFiles,针对每个文件创建StoreFile和Reader,关闭StoreFile上的Reader,并将创建的StoreFile添加至sfs for (Path newFile : newFiles) { StoreFile sf = createStoreFileAndReader(newFile); sf.closeReader(true); sfs.add(sf); } return sfs; } //调用moveCompatedFilesIntoPlace()将合并后的新文件移至正确的地方,创建StoreFile和Reader,该方法会返回StoreFile列表 sfs = moveCompatedFilesIntoPlace(cr, newFiles, user); //在WAL中写入Compaction记录 writeCompactionWalRecord(filesToCompact, sfs); //替换StoreFiles replaceStoreFiles(filesToCompact, sfs); //根据合并的类型,对不同的计数器进行累加,方便系统性能指标监控 if (cr.isMajor()) { majorCompactedCellsCount += getCompactionProgress().totalCompactingKVs; majorCompactedCellsSize += getCompactionProgress().totalCompactedSize; } else { compactedCellsCount += getCompactionProgress().totalCompactingKVs; compactedCellsSize += getCompactionProgress().totalCompactedSize; } //归档旧文件,关闭旧文件上的Reader,并更新store大小 completeCompaction(filesToCompact); // Archive old files & update store size. //记录日志 logCompactionEndMessage(cr, sfs, compactionStartTime); //返回sfs return sfs; } finally { finishCompactionRequest(cr); } }
现在HStore.compact()方法的基本流程已经分析完成,我们再来分析一下其中的一些细节
第九步:调用moveCompatedFilesIntoPlace()方法将合并后的新文件移至正确地方,并创建StoreFile和Reader,该方法会返回StoreFile列表
查看moveCompatedFilesIntoPlace()方法:首先创建StoreFile列表sfs,遍历合并后的文件newFiles:调用moveFileIntoPlace()方法将newFile移动至正确地点,为其创建StoreFile和Reader,然后将创建的StoreFile添加至sfs中。
private ListmoveCompatedFilesIntoPlace( final CompactionRequest cr, List newFiles, User user) throws IOException { //创建StoreFile列表 List sfs = new ArrayList (newFiles.size()); //遍历newFiles for (Path newFile : newFiles) { assert newFile != null; //将newFile移动至正确地点,并为其创建StoreFile和Reader final StoreFile sf = moveFileIntoPlace(newFile); if (this.getCoprocessorHost() != null) { final Store thisStore = this; if (user == null) { getCoprocessorHost().postCompact(thisStore, sf, cr); } else { try { user.getUGI().doAs(new PrivilegedExceptionAction () { @Override public Void run() throws Exception { getCoprocessorHost().postCompact(thisStore, sf, cr); return null; } }); } catch (InterruptedException ie) { InterruptedIOException iioe = new InterruptedIOException(); iioe.initCause(ie); throw iioe; } } } assert sf != null; sfs.add(sf); } return sfs; }
再据此查看 moveFileIntoPlace()方法:首先检测新文件,然后调用HRegionFileSystem.commitStoreFile()方法将新文件移至正确位置,最后调用createStoreFileAndReader()方法创建StoreFile和Reader
StoreFile moveFileIntoPlace(final Path newFile) throws IOException {
//检测新文件
validateStoreFile(newFile);
//将文件移至正确地点
Path destPath = fs.commitStoreFile(getColumnFamilyName(), newFile);
//创建StoreFile和Reader
return createStoreFileAndReader(destPath);
}
查看HRegionFileSystem.commitStoreFile()方法 :根据列族名familyName获取存储路径,对该路径进行检测,如果不存在,则进行创建,通过buildPath来获取文件名,然后利用存储路径storeDir和文件名name构造目标路径storeDir,最后通过rename()方法实现文件从buildPath至dstPath的移动
private Path commitStoreFile(final String familyName, final Path buildPath,
final long seqNum, final boolean generateNewName) throws IOException {
//根据列族名familyName获取存储路径
Path storeDir = getStoreDir(familyName);
//如果在fs中不存在该路径并且创建失败,则抛出异常
if(!fs.exists(storeDir) && !createDir(storeDir))
throw new IOException("Failed creating " + storeDir);
String name = buildPath.getName();
if (generateNewName) {
name = generateUniqueName((seqNum < 0) ? null : "_SeqId_" + seqNum + "_");
}
Path dstPath = new Path(storeDir, name);
if (!fs.exists(buildPath)) {
throw new FileNotFoundException(buildPath.toString());
}
LOG.debug("Committing store file " + buildPath + " as " + dstPath);
if (!rename(buildPath, dstPath)) {
throw new IOException("Failed rename of " + buildPath + " to " + dstPath);
}
return dstPath;
}
第九步的分析到此结束,现在再来分析第十步——在WAL中写入Compaction记录
查看writeCompactionWalRecord()方法:将被合并的文件路径添加至inputPaths列表,将合并后的文件路径添加至outputPaths列表,获取HRegionInfo,然后据此构造compact的描述信息CompactionDescriptor,最后通过writeCompactionMarker()写入WAL
private void writeCompactionWalRecord(CollectionfilesCompacted, Collection newFiles) throws IOException { if (region.getLog() == null) return; //将被合并的文件路径添加至inputPaths列表 List inputPaths = new ArrayList (filesCompacted.size()); for (StoreFile f : filesCompacted) { inputPaths.add(f.getPath()); } //将合并后的文件路径添加至outputPaths列表 List outputPaths = new ArrayList (newFiles.size()); for (StoreFile f : newFiles) { outputPaths.add(f.getPath()); } //获取HRegionInfo HRegionInfo info = this.region.getRegionInfo(); //构造compaction的描述信息CompactionDescriptor CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info, family.getName(), inputPaths, outputPaths, fs.getStoreDir(getFamily().getNameAsString())); //调用writeCompactionMarker()方法,在WAL中写入一个合并标记 HLogUtil.writeCompactionMarker(region.getLog(), this.region.getTableDesc(), this.region.getRegionInfo(), compactionDescriptor, this.region.getSequenceId()); }
查看CompactionDescriptor的结构和toCompactionDescriptor()方法:CompactionDescriptor包含表名table_name、Region名encoded_region_name、列族名family_name等关键信息
message CompactionDescriptor {
required bytes table_name = 1; // TODO: WALKey already stores these, might remove
required bytes encoded_region_name = 2;
required bytes family_name = 3;
repeated string compaction_input = 4;
repeated string compaction_output = 5;
required string store_home_dir = 6;
optional bytes region_name = 7; // full region name
}
public static CompactionDescriptor toCompactionDescriptor(HRegionInfo info, byte[] family,
List inputPaths, List outputPaths, Path storeDir) {
// compaction descriptor contains relative paths.
// input / output paths are relative to the store dir
// store dir is relative to region dir
CompactionDescriptor.Builder builder = CompactionDescriptor.newBuilder()
.setTableName(ByteStringer.wrap(info.getTableName()))
.setEncodedRegionName(ByteStringer.wrap(info.getEncodedNameAsBytes()))
.setFamilyName(ByteStringer.wrap(family))
.setStoreHomeDir(storeDir.getName()); //make relative
for (Path inputPath : inputPaths) {
builder.addCompactionInput(inputPath.getName()); //relative path
}
for (Path outputPath : outputPaths) {
builder.addCompactionOutput(outputPath.getName());
}
builder.setRegionName(ByteStringer.wrap(info.getRegionName()));
return builder.build();
}



