在文档(8)中分析了namenode的构造方法,其中的initialize方法会加载 namenode的元数据并启动一些服务。其中加载原数据是通过调用FSNamesystem 类的loadFromDisk方法来实现的。在这个方法中会创建FSImage对象, FSNamesystem对象,并使用FSNamesystem对象加载元数据。
在文档(8)中详细分析了FSImage的创建,这里继续分析 FSNamesystem对象的创建。其构造方法如下:
FSNamesystem(Configuration conf, FSImage fsImage, boolean ignoreRetryCache)
throws IOException {
provider = DFSUtil.createKeyProviderCryptoExtension(conf);
if (provider == null) {
LOG.info("No KeyProvider found.");
} else {
LOG.info("Found KeyProvider: " + provider.toString());
}
if (conf.getBoolean(DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY,
DFS_NAMENODE_AUDIT_LOG_ASYNC_DEFAULT)) {
LOG.info("Enabling async auditlog");
enableAsyncAuditLog();
}
fsLock = new FSNamesystemLock(conf, detailedLockHoldTimeMetrics);
cond = fsLock.newWriteLockCondition();
cpLock = new ReentrantLock();
this.fsImage = fsImage;
try {
resourceRecheckInterval = conf.getLong(
DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY,
DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT);
this.blockManager = new BlockManager(this, conf);
this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics();
this.blockIdManager = new BlockIdManager(blockManager);
this.fsOwner = UserGroupInformation.getCurrentUser();
this.supergroup = conf.get(DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
this.isPermissionEnabled = conf.getBoolean(DFS_PERMISSIONS_ENABLED_KEY,
DFS_PERMISSIONS_ENABLED_DEFAULT);
LOG.info("fsOwner = " + fsOwner);
LOG.info("supergroup = " + supergroup);
LOG.info("isPermissionEnabled = " + isPermissionEnabled);
// block allocation has to be persisted in HA using a shared edits directory
// so that the standby has up-to-date namespace information
nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
this.haEnabled = HAUtil.isHAEnabled(conf, nameserviceId);
// Sanity check the HA-related config.
if (nameserviceId != null) {
LOG.info("Determined nameservice ID: " + nameserviceId);
}
LOG.info("HA Enabled: " + haEnabled);
if (!haEnabled && HAUtil.usesSharedEditsDir(conf)) {
LOG.warn("Configured NNs:n" + DFSUtil.nnAddressesAsString(conf));
throw new IOException("Invalid configuration: a shared edits dir " +
"must not be specified if HA is not enabled.");
}
// Get the checksum type from config
String checksumTypeStr = conf.get(DFS_CHECKSUM_TYPE_KEY, DFS_CHECKSUM_TYPE_DEFAULT);
DataChecksum.Type checksumType;
try {
checksumType = DataChecksum.Type.valueOf(checksumTypeStr);
} catch (IllegalArgumentException iae) {
throw new IOException("Invalid checksum type in "
+ DFS_CHECKSUM_TYPE_KEY + ": " + checksumTypeStr);
}
this.serverDefaults = new FsServerDefaults(
conf.getLongBytes(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT),
conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, DFS_BYTES_PER_CHECKSUM_DEFAULT),
conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT),
(short) conf.getInt(DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT),
conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT),
conf.getBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, DFS_ENCRYPT_DATA_TRANSFER_DEFAULT),
conf.getLong(FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT),
checksumType);
this.maxFsObjects = conf.getLong(DFS_NAMENODE_MAX_OBJECTS_KEY,
DFS_NAMENODE_MAX_OBJECTS_DEFAULT);
this.minBlockSize = conf.getLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY,
DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_DEFAULT);
this.maxBlocksPerFile = conf.getLong(DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY,
DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_DEFAULT);
this.accessTimePrecision = conf.getLong(DFS_NAMENODE_ACCESSTIME_PRECISION_KEY,
DFS_NAMENODE_ACCESSTIME_PRECISION_DEFAULT);
this.supportAppends = conf.getBoolean(DFS_SUPPORT_APPEND_KEY, DFS_SUPPORT_APPEND_DEFAULT);
LOG.info("Append Enabled: " + supportAppends);
this.dtpReplaceDatanodeonFailure = ReplaceDatanodeOnFailure.get(conf);
this.standbyShouldCheckpoint = conf.getBoolean(
DFS_HA_STANDBY_CHECKPOINTS_KEY, DFS_HA_STANDBY_CHECKPOINTS_DEFAULT);
// # edit autoroll threshold is a multiple of the checkpoint threshold
this.editLogRollerThreshold = (long)
(conf.getFloat(
DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD,
DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD_DEFAULT) *
conf.getLong(
DFS_NAMENODE_CHECKPOINT_TXNS_KEY,
DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT));
this.editLogRollerInterval = conf.getInt(
DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS,
DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS_DEFAULT);
this.lazyPersistFileScrubIntervalSec = conf.getInt(
DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC_DEFAULT);
if (this.lazyPersistFileScrubIntervalSec == 0) {
throw new IllegalArgumentException(
DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC + " must be non-zero.");
}
// For testing purposes, allow the DT secret manager to be started regardless
// of whether security is enabled.
alwaysUseDelegationTokensForTests = conf.getBoolean(
DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT);
this.dtSecretManager = createDelegationTokenSecretManager(conf);
this.dir = new FSDirectory(this, conf);
this.snapshotManager = new SnapshotManager(dir);
this.cacheManager = new CacheManager(this, conf, blockManager);
this.safeMode = new SafeModeInfo(conf);
this.topConf = new TopConf(conf);
this.auditLoggers = initAuditLoggers(conf);
this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
auditLoggers.get(0) instanceof DefaultAuditLogger;
this.retryCache = ignoreRetryCache ? null : initRetryCache(conf);
Class extends INodeAttributeProvider> klass = conf.getClass(
DFS_NAMENODE_INODE_ATTRIBUTES_PROVIDER_KEY,
null, INodeAttributeProvider.class);
if (klass != null) {
inodeAttributeProvider = ReflectionUtils.newInstance(klass, conf);
LOG.info("Using INode attribute provider: " + klass.getName());
}
} catch(IOException e) {
LOG.error(getClass().getSimpleName() + " initialization failed.", e);
close();
throw e;
} catch (RuntimeException re) {
LOG.error(getClass().getSimpleName() + " initialization failed.", re);
close();
throw re;
}
}
这个方法很长,主要是对参数进行赋值。其中较为重要的是第24行 的BlockManager,它负责管理集群的块信息,还有就是第117行创建的 FSDirectory对象,它负责管理namenode的目录结构。
接着继续分析FSNamesystem对象加载元数据的方法,即loadFSImage 方法。其内容如下:
private void loadFSImage(StartupOption startOpt) throws IOException {
final FSImage fsImage = getFSImage();
// format before starting up if requested
if (startOpt == StartupOption.FORMAT) {
fsImage.format(this, fsImage.getStorage().determineClusterId());// reuse current id
startOpt = StartupOption.REGULAR;
}
boolean success = false;
writeLock();
try {
// We shouldn't be calling saveNamespace if we've come up in standby state.
metaRecoveryContext recovery = startOpt.createRecoveryContext();
final boolean staleImage
= fsImage.recoverTransitionRead(startOpt, this, recovery);
if (RollingUpgradeStartupOption.ROLLBACK.matches(startOpt) ||
RollingUpgradeStartupOption.DOWNGRADE.matches(startOpt)) {
rollingUpgradeInfo = null;
}
final boolean needToSave = staleImage && !haEnabled && !isRollingUpgrade();
LOG.info("Need to save fs image? " + needToSave
+ " (staleImage=" + staleImage + ", haEnabled=" + haEnabled
+ ", isRollingUpgrade=" + isRollingUpgrade() + ")");
if (needToSave) {
fsImage.saveNamespace(this);
} else {
updateStorageVersionForRollingUpgrade(fsImage.getLayoutVersion(),
startOpt);
// No need to save, so mark the phase done.
StartupProgress prog = NameNode.getStartupProgress();
prog.beginPhase(Phase.SAVING_CHECKPOINT);
prog.endPhase(Phase.SAVING_CHECKPOINT);
}
// This will start a new log segment and write to the seen_txid file, so
// we shouldn't do it when coming up in standby state
if (!haEnabled || (haEnabled && startOpt == StartupOption.UPGRADE)
|| (haEnabled && startOpt == StartupOption.UPGRADEONLY)) {
fsImage.openEditLogForWrite();
}
success = true;
} finally {
if (!success) {
fsImage.close();
}
writeUnlock("loadFSImage");
}
imageLoadComplete();
}
这个方法的重点在第17行,这里调用了fsImage的 recoverTransitionRead方法,这里的fsImage是通过getFSImage 方法获得的。而这个getFSImage方法很简单返回的就是其自身的fsImage 属性,而这个fsImage属性是在这个FSNamesystem对象创建的时候传入的 (可参考上述构造方法)。而传入fsImage是文档(8)中分析创建的对象。
这里调用的recoverTransitionRead方法内容如下:
boolean recoverTransitionRead(StartupOption startOpt, FSNamesystem target,
metaRecoveryContext recovery)
throws IOException {
assert startOpt != StartupOption.FORMAT :
"NameNode formatting should be performed before reading the image";
Collection imageDirs = storage.getImageDirectories();
Collection editsDirs = editLog.getEditURIs();
// none of the data dirs exist
if((imageDirs.size() == 0 || editsDirs.size() == 0)
&& startOpt != StartupOption.import)
throw new IOException(
"All specified directories are not accessible or do not exist.");
// 1. For each data directory calculate its state and
// check whether all is consistent before transitioning.
Map dataDirStates =
new HashMap();
boolean isFormatted = recoverStorageDirs(startOpt, storage, dataDirStates);
if (LOG.isTraceEnabled()) {
LOG.trace("Data dir states:n " +
Joiner.on("n ").withKeyValueSeparator(": ")
.join(dataDirStates));
}
if (!isFormatted && startOpt != StartupOption.ROLLBACK
&& startOpt != StartupOption.import) {
throw new IOException("NameNode is not formatted.");
}
int layoutVersion = storage.getLayoutVersion();
if (startOpt == StartupOption.metaDATAVERSION) {
System.out.println("HDFS Image Version: " + layoutVersion);
System.out.println("Software format version: " +
HdfsConstants.NAMENODE_LAYOUT_VERSION);
return false;
}
if (layoutVersion < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION) {
NNStorage.checkVersionUpgradable(storage.getLayoutVersion());
}
if (startOpt != StartupOption.UPGRADE
&& startOpt != StartupOption.UPGRADEonLY
&& !RollingUpgradeStartupOption.STARTED.matches(startOpt)
&& layoutVersion < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION
&& layoutVersion != HdfsConstants.NAMENODE_LAYOUT_VERSION) {
throw new IOException(
"nFile system image contains an old layout version "
+ storage.getLayoutVersion() + ".nAn upgrade to version "
+ HdfsConstants.NAMENODE_LAYOUT_VERSION + " is required.n"
+ "Please restart NameNode with the ""
+ RollingUpgradeStartupOption.STARTED.getOptionString()
+ "" option if a rolling upgrade is already started;"
+ " or restart NameNode with the ""
+ StartupOption.UPGRADE.getName() + "" option to start"
+ " a new upgrade.");
}
storage.processStartupOptionsForUpgrade(startOpt, layoutVersion);
// 2. Format unformatted dirs.
for (Iterator it = storage.dirIterator(); it.hasNext();) {
StorageDirectory sd = it.next();
StorageState curState = dataDirStates.get(sd);
switch(curState) {
case NON_EXISTENT:
throw new IOException(StorageState.NON_EXISTENT +
" state cannot be here");
case NOT_FORMATTED:
// Create a dir structure, but not the VERSION file. The presence of
// VERSION is checked in the inspector's needToSave() method and
// saveNamespace is triggered if it is absent. This will bring
// the storage state uptodate along with a new VERSION file.
// If HA is enabled, NNs start up as standby so saveNamespace is not
// triggered.
LOG.info("Storage directory " + sd.getRoot() + " is not formatted.");
LOG.info("Formatting ...");
sd.clearDirectory(); // create empty currrent dir
// For non-HA, no further action is needed here, as saveNamespace will
// take care of the rest.
if (!target.isHaEnabled()) {
continue;
}
// If HA is enabled, save the dirs to create a version file later when
// a checkpoint image is saved.
if (newDirs == null) {
newDirs = new HashSet();
}
newDirs.add(sd);
break;
default:
break;
}
}
// 3. Do transitions
switch(startOpt) {
case UPGRADE:
case UPGRADEONLY:
doUpgrade(target);
return false; // upgrade saved image already
case import:
doimportCheckpoint(target);
return false; // import checkpoint saved image already
case ROLLBACK:
throw new AssertionError("Rollback is now a standalone command, "
+ "NameNode should not be starting with this option.");
case REGULAR:
default:
// just load the image
}
return loadFSImage(target, startOpt, recovery);
}
这段代码的重点就在最后一行,这里调用了FSImage类的 loadFSImage方法来加载数据。



