基于源码hadoop-3.3.0
1 概述
众所周知,dn主要是用来存储hadoop集群中的具体的数据的。但实际上,Datanode还是需要保存一部分Datanode自身的元数据的, 这些元数据是通过Datanode磁盘存储上的一些文件和目录来保存的。
Datanode可以定义多个存储目录保存数据块,Datanode的多个存储目录存储的数据块并不相同,并且不同的存储目录可以是异构的, 这样的设计可以提高数据块IO的吞吐率[比如多块磁盘]。
1.1 实际存储
下面看一个实际中的存储:
我们简单了解一下:
- BP-607470660-10.200.50.133-1568802623014:BP表示blockpool,所以这个目录是一个块池目录,块池目录保存了一个块池在当前存储目录下存储的所有数据块, 在Federation部署方式中, Datanode的一个存储目录会包含多个以“BP”开头的块池目录。 BP后面会紧跟一个唯一的随机块池ID, 在这个示例中就是607470660。 接下来的IP地址10.200.50.133是当前块池对应的Namenode的IP地址。 最后一个部分是这个块池的创建时间。
- VERSION文件:在current中存在一个VERSION文件,截图未显示,内容如下
块池目录的VERSION文件同样包含了文件系统布局版本(layoutVersion) 、 HDFS集群ID(clusterld) 以及创建时间(cTime) 等集群信息。 除此之外, 块池目录的VERSION文件还包含了以下信息。
-
- storageType:存储类型,这里是DATA_NODE
- current/BP-607470660-10.200.50.133-1568802623014/current目录:
-
- finalized/rbw:finalized和rbw目录都是用于存储数据块的, 包括数据块文件以及对应的校验和文件。 rbw(replica being written, 正在写入副本) 目录保存了正在由HDFS客户端写入当前Datanode的数据块。 finalized目录包含了已经完成写入操作的数据块, 由于这样的数据块可能非常多, 所以finalized目录会以特定的目录结构存储这些数据块。每个数据块对应 2 个文件,blk 文件存放数据,另外一个以 meta 结尾的存放校验和等元数据
- VERSION:这里会记录namespaceID,cTime,blockpoolId,layoutVersion
- current/BP-607470660-10.200.50.133-1568802623014/scanner.cursor:DataNode 会定期的对每个 blk 文件做校验,这个文件是用来记录校验到哪个位置的
- in_use.lock:被dn线程持有的锁文件,用于防止多个Datanode线程启动并且并发修改这个存储目录
- lazyPersist: HDFS 2.X中引入了一个新的特性, 用于支持将临时数据写入内存, 然后通过懒持久化(lazyPersist) 方式写入磁盘。 如果用户开启了这个特性,lazyPersist目录就用于将内存中的临时数据懒持久化到磁盘。
1.2 功能划分
Datanode最重要的功能就是管理磁盘上存储的HDFS数据块(block)。
Datanode将这个管理功能切分为两个部分:
① 管理与组织磁盘存储目录(由dfs.data.dir指定) , 如current、previous、 detach、 tmp等, 这个功能由DataStorage类实现;
②管理与组织数据块及其元数据文件, 这个功能主要由FsDatasetImpl(对应每一个存储目录)相关类实现。
本文主要介绍第一部分:磁盘存储目录的管理。
2 磁盘存储目录管理
在dn中负责管理磁盘存储的主要继承结构如下:
因此我们分别了解一下这几个概念。
2.1 Storage
根据注释,此类用于存储storage file信息,本地的storage 信息存储在VERSION文件中,主要包括node的类型,存储布局版本,namespaceId,fs state create time。
本地存储可以驻留在多个目录中。每个目录都应包含与其他目录相同的VERSION 文件。在启动Hadoop服务器(名称节点和数据节点)期间读取它们的本地存储来自他们的信息。
服务器在运行时为每个存储目录持有一个锁,以便其他节点无法启动共享相同的存储。当服务器停止(正常或异常)时释放锁。
Storage是一个抽象类, 为Datanode、 Namenode提供抽象的存储服务。 Storage类管理着当前节点上( 可以是Datanode或者Namenode) 所有的存储目录, 每个存储目录都由一个StorageDirectory对象管理,StorageDirectory类定义了存储目录上的通用操作。 由于HDFS 2.X版本引入了Federation机制 , Datanode会为多个块池保存数据块, HDFS定义了BlockPoolSliceStorage类来管理Datanode上的一个块池, 这个块池分布在Datanode配置的所有存储目录中。Storage用一个线性表字段storageDirs存储它管理的所有StorageDirectory, 并通过Dirlterator迭代器进行遍历。
private final ListstorageDirs = new CopyOnWriteArrayList<>();
2.1.1 Storage.StorageState
StorageState定义了存储空间可能出现的所有状态。 在升级、 回滚、 升级提交、 检查点等操作中, 节点(Datanode或者Namenode) 的存储空间可能出现各种异常, 例如误操作、 断电、 宕机等情况, 这个时候存储空间就可能处于某种中间状态, 引入中间状态, 有利于HDFS从错误中恢复过来。 存储状态的确定, 是在StorageDirectory.analyzeStorage()方法中进行的.
public enum StorageState {
NON_EXISTENT, // 存储不存在
NOT_FORMATTED, // 存储未格式化
COMPLETE_UPGRADE, // 完成升级
RECOVER_UPGRADE, // 恢复升级
COMPLETE_FINALIZE, // 完成升级提交
COMPLETE_ROLLBACK, // 完成回滚操作
RECOVER_ROLLBACK, // 恢复回滚
COMPLETE_CHECKPOINT, // 完成检查点操作
RECOVER_CHECKPOINT, // 恢复检查点操作
NORMAL; // 正常状态
}
2.1.2 Storage.StorageDirectory
我们知道Datanode和Namenode都可以定义多个存储目录来存储数据,StorageDirectory是Storage的内部类, 定义了管理存储目录的通用方法。
重点字段:
// 存储目录的根, 就是java.io.File文件。
final File root; // root directory
// 指示当前目录是否是共享的。
// 例如在HA部署中, 不同的Namenode之间共享存储目录,
// 或者在Federation部署中不同的块池之间共享存储目录。
final boolean isShared;
// 当前存储目录的类型。
final StorageDirType dirType; // storage dir type
// 独占锁, java.nio.FileLock类型,
// 用来支持Datanode或者Namenode线程独占存储目录的锁操作。
FileLock lock; // storage lock
// 权限信息
private final FsPermission permission;
// 存储目录的标识符。
private String storageUuid = null; // Storage directory identifier.
// 位置信息
private final StorageLocation location;
StorageDirectory的操作主要包含:
2.1.2.1 获取文件夹
主要是获取存储目录中的各个文件和目录的方法:
包括:
- current和current Version文件
- previous和previous Version文件
- previous tmp目录
- removed tmp目录
- finlized tmp目录
- lastCheckpoint tmp目录
- previous.checkpoint文件
public File getCurrentDir() {
if (root == null) {
return null;
}
return new File(root, STORAGE_DIR_CURRENT);
}
public File getVersionFile() {
if (root == null) {
return null;
}
return new File(new File(root, STORAGE_DIR_CURRENT), STORAGE_FILE_VERSION);
}
public File getPreviousVersionFile() {
if (root == null) {
return null;
}
return new File(new File(root, STORAGE_DIR_PREVIOUS), STORAGE_FILE_VERSION);
}
public File getPreviousDir() {
if (root == null) {
return null;
}
return new File(root, STORAGE_DIR_PREVIOUS);
}
public File getPreviousTmp() {
if (root == null) {
return null;
}
return new File(root, STORAGE_TMP_PREVIOUS);
}
public File getRemovedTmp() {
if (root == null) {
return null;
}
return new File(root, STORAGE_TMP_REMOVED);
}
public File getFinalizedTmp() {
if (root == null) {
return null;
}
return new File(root, STORAGE_TMP_FINALIZED);
}
public File getLastCheckpointTmp() {
if (root == null) {
return null;
}
return new File(root, STORAGE_TMP_LAST_CKPT);
}
public File getPreviousCheckpoint() {
if (root == null) {
return null;
}
return new File(root, STORAGE_PREVIOUS_CKPT);
}
2.1.2.2 加锁解锁操作
Datanode磁盘存储结构中,存储目录下会有一个in_use.lock文件,这个文件用于对当前存储目录加锁, 以保证Datanode进程对存储目录的独占使用。 当Datanode进程退出执行时, in_use.lock文件会被删除。 StorageDirectory提供了tryLock()与unlock()两个锁方法, 分别实现了对存储目录加锁以及解锁的功能。
StorageDirectory中真正进行加锁操作的是tryLock()方法。tryLock()方法会首先构造锁文件, 然后调用file.getChannel.lock()方法尝试获得存储目录的独占锁, 如果已经有进程占有锁文件, 那么file.getChannel.lock()就会返回一个null的引用, 表明有另一个节点运行在当前的存储目录上, tryLock()方法会抛出异常并退出执行。 如果加锁成功, tryLock()方法会在锁文件中写入虚拟机信息。
加锁成功后, tryLock()方法会调用deleteonExit()方法, 在Java虚拟机运行结束时删除in_use.lock文件。
@SuppressWarnings("resource")
FileLock tryLock() throws IOException {
boolean deletionHookAdded = false;
// 构造in_use.lock
File lockF = new File(root, STORAGE_FILE_LOCK);
// 如果in_use.lock创建失败
if (!lockF.exists()) {
lockF.deleteonExit();
deletionHookAdded = true;
}
RandomAccessFile file = new RandomAccessFile(lockF, "rws");
String jvmName = ManagementFactory.getRuntimeMXBean().getName();
FileLock res = null;
try {
// 给文件加锁
res = file.getChannel().tryLock();
// 已有程序获取了锁
if (null == res) {
LOG.error("Unable to acquire file lock on path {}", lockF);
throw new OverlappingFileLockException();
}
// 加锁成功,往文件中添加虚拟机信息
file.write(jvmName.getBytes(Charsets.UTF_8));
LOG.info("Lock on {} acquired by nodename {}", lockF, jvmName);
} catch(OverlappingFileLockException oe) {
// Cannot read from the locked file on Windows.
// 已有程序获取了锁,关闭锁文件
String lockingJvmName = Path.WINDOWS ? "" : (" " + file.readLine());
LOG.error("It appears that another node {} has already locked the "
+ "storage directory: {}", lockingJvmName, root, oe);
file.close();
return null;
} catch(IOException e) {
// 读取锁文件失败,关闭锁
LOG.error("Failed to acquire lock on {}. If this storage directory is"
+ " mounted via NFS, ensure that the appropriate nfs lock services"
+ " are running.", lockF, e);
file.close();
throw e;
}
// 在虚拟机任务运行成功后,删除锁文件
if (!deletionHookAdded) {
// If the file existed prior to our startup, we didn't
// call deleteonExit above. But since we successfully locked
// the dir, we can take care of cleaning it up.
lockF.deleteonExit();
}
return res;
}
public void unlock() throws IOException {
if (this.lock == null)
return;
this.lock.release();
lock.channel().close();
lock = null;
}
2.1.2.3 状态恢复操作
Datanode在执行升级、 回滚、 提交操作的过程中会出现各种异常, 例如误操作、 断电、 宕机等情况。
StorageDirectory提供了doRecover()和analyzeStorage()两个方法, Datanode会首先调用analyzeStorage()方法分析当前节点的存储状态, 然后根据分析所得的存储状态调用doRecover()方法执行恢复操作。
- analyzeStorage
public StorageState analyzeStorage(StartupOption startOpt, Storage storage,
boolean checkCurrentIsEmpty)
throws IOException {
if (location != null &&
location.getStorageType() == StorageType.PROVIDED) {
// currently we assume that PROVIDED storages are always NORMAL
// 如果是外部存储 目前,我们假设提供的存储始终为“正常”
return StorageState.NORMAL;
}
assert root != null : "root is null";
boolean hadMkdirs = false;
String rootPath = root.getCanonicalPath();
// NON_EXISTENT: 以非FORMAT选项启动时,目录不存在;
// 或者目录不可写、路径为文件时, 存储目录状态都为NOT_EXISTENT状态。
try {
// check that storage exists
// 判断存储是否存在
if (!root.exists()) {
// storage directory does not exist
// 如果启动方式不是format和hotswap,则标识存储不存在
if (startOpt != StartupOption.FORMAT &&
startOpt != StartupOption.HOTSWAP) {
LOG.warn("Storage directory {} does not exist", rootPath);
return StorageState.NON_EXISTENT;
}
// directory无法创建
LOG.info("{} does not exist. Creating ...", rootPath);
if (!root.mkdirs()) {
throw new IOException("Cannot create directory " + rootPath);
}
hadMkdirs = true;
}
// 指定的存储是否是目录
// or is inaccessible
if (!root.isDirectory()) {
LOG.warn("{} is not a directory", rootPath);
return StorageState.NON_EXISTENT;
}
// 是否有写的权限
if (!FileUtil.canWrite(root)) {
LOG.warn("Cannot access storage directory {}", rootPath);
return StorageState.NON_EXISTENT;
}
} catch(SecurityException ex) {
// 权限异常
LOG.warn("Cannot access storage directory {}", rootPath, ex);
return StorageState.NON_EXISTENT;
}
// 存储存在,加锁
this.lock(); // lock storage if it exists
// If startOpt is HOTSWAP, it returns NOT_FORMATTED for empty directory,
// while it also checks the layout version.
// 如果startOpt是hotswap,则状态未not_formatted
if (startOpt == HdfsServerConstants.StartupOption.FORMAT ||
(startOpt == StartupOption.HOTSWAP && hadMkdirs)) {
if (checkCurrentIsEmpty) {
checkEmptyCurrent();
}
return StorageState.NOT_FORMATTED;
}
// 启动方式是否为导入检查点
if (startOpt != HdfsServerConstants.StartupOption.import) {
storage.checkOldLayoutStorage(this);
}
// check whether current directory is valid
// 如果没有version file则创建,这里主要做判断是否存在
File versionFile = getVersionFile();
boolean hasCurrent = versionFile.exists();
// check which directories exist
boolean hasPrevious = getPreviousDir().exists();
boolean hasPreviousTmp = getPreviousTmp().exists();
boolean hasRemovedTmp = getRemovedTmp().exists();
// 检查是否有Finalized文件
boolean hasFinalizedTmp = getFinalizedTmp().exists();
boolean hasCheckpointTmp = getLastCheckpointTmp().exists();
// 各个指定的文件存在才执行下面的判断 ,如果current存在,则存储状态为normal
// 如果前一个文件也存在,则抛出非一致性异常,如果判断不通过,则标识为not_formatted
if (!(hasPreviousTmp || hasRemovedTmp
|| hasFinalizedTmp || hasCheckpointTmp)) {
// no temp dirs - no recovery
if (hasCurrent)
return StorageState.NORMAL;
if (hasPrevious)
throw new InconsistentFSStateException(root,
"version file in current directory is missing.");
if (checkCurrentIsEmpty) {
checkEmptyCurrent();
}
return StorageState.NOT_FORMATTED;
}
// 判断tmp目录存在多于多个,抛出非一致性异常
if ((hasPreviousTmp?1:0) + (hasRemovedTmp?1:0)
+ (hasFinalizedTmp?1:0) + (hasCheckpointTmp?1:0) > 1)
// more than one temp dirs
throw new InconsistentFSStateException(root,
"too many temporary directories.");
// # of temp dirs == 1 should either recover or complete a transition
// 标识状态是否完成checkpoint
if (hasCheckpointTmp) {
return hasCurrent ? StorageState.COMPLETE_CHECKPOINT
: StorageState.RECOVER_CHECKPOINT;
}
// 判断是否完成finalize
if (hasFinalizedTmp) {
if (hasPrevious)
throw new InconsistentFSStateException(root,
STORAGE_DIR_PREVIOUS + " and " + STORAGE_TMP_FINALIZED
+ "cannot exist together.");
return StorageState.COMPLETE_FINALIZE;
}
if (hasPreviousTmp) {
if (hasPrevious)
throw new InconsistentFSStateException(root,
STORAGE_DIR_PREVIOUS + " and " + STORAGE_TMP_PREVIOUS
+ " cannot exist together.");
// 如果current目录存在,标识完成更新,否则标识为recover_upgrade
if (hasCurrent)
return StorageState.COMPLETE_UPGRADE;
return StorageState.RECOVER_UPGRADE;
}
assert hasRemovedTmp : "hasRemovedTmp must be true";
if (!(hasCurrent ^ hasPrevious))
throw new InconsistentFSStateException(root,
"one and only one directory " + STORAGE_DIR_CURRENT
+ " or " + STORAGE_DIR_PREVIOUS
+ " must be present when " + STORAGE_TMP_REMOVED
+ " exists.");
if (hasCurrent)
return StorageState.COMPLETE_ROLLBACK;
return StorageState.RECOVER_ROLLBACK;
}
总结起来就是包含这几种状态:
NON_EXISTENT: 以非FORMAT选项启动时,目录不存在;或者目录不可写、路径为文件时, 存储目录状态都为NOT_EXISTENT状态。 NOT_FORMATTED: 以FORMAT选项启动时, 都为NOT_FORMATTED状态。 NORMAL: 没有tmp中间状态文件夹, 则存储目录为正常状态。 COMPLETE_UPGRADE: 存在current/VERSION文件, 存在previous.tmp文件夹,则存储目录为升级完成状态。 RECOVER_UPGRADE: 存在previous.tmp文件夹, 不存在current/VERSION文件, 存储目录应该从升级中恢复。 COMPLETE_ROLLBACK: 存在removed.tmp文件夹, 也存在current/VERSION文件, 则存储目录的回滚操作成功完成。 RECOVER_ROLLBACK: 存在removed.tmp文件夹, 不存在current/VERSION文件, 存储目录应该从回滚中恢复。 COMPLETE_FINALIZE: 存在finalized.tmp文件夹, 存储目录可以继续执行提交操作。
- doRecover
调用analyzeStorage()之后, Datanode就可以确定存储目录的状态了。 对于异常状态,可以通过调用doRecover()方法进行恢复, 使存储空间的状态恢复到NORMAL状态。
恢复过程源码中的注释很清晰。
public void doRecover(StorageState curState) throws IOException {
File curDir = getCurrentDir();
if (curDir == null || root == null) {
// at this point, we do not support recovery on PROVIDED storages
return;
}
String rootPath = root.getCanonicalPath();
switch(curState) {
case COMPLETE_UPGRADE: // mv previous.tmp -> previous
LOG.info("Completing previous upgrade for storage directory {}",
rootPath);
rename(getPreviousTmp(), getPreviousDir());
return;
case RECOVER_UPGRADE: // mv previous.tmp -> current
LOG.info("Recovering storage directory {} from previous upgrade",
rootPath);
if (curDir.exists())
deleteDir(curDir);
rename(getPreviousTmp(), curDir);
return;
case COMPLETE_ROLLBACK: // rm removed.tmp
LOG.info("Completing previous rollback for storage directory {}",
rootPath);
deleteDir(getRemovedTmp());
return;
case RECOVER_ROLLBACK: // mv removed.tmp -> current
LOG.info("Recovering storage directory {} from previous rollback",
rootPath);
rename(getRemovedTmp(), curDir);
return;
case COMPLETE_FINALIZE: // rm finalized.tmp
LOG.info("Completing previous finalize for storage directory {}",
rootPath);
deleteDir(getFinalizedTmp());
return;
case COMPLETE_CHECKPOINT: // mv lastcheckpoint.tmp -> previous.checkpoint
LOG.info("Completing previous checkpoint for storage directory {}",
rootPath);
File prevCkptDir = getPreviousCheckpoint();
if (prevCkptDir.exists())
deleteDir(prevCkptDir);
rename(getLastCheckpointTmp(), prevCkptDir);
return;
case RECOVER_CHECKPOINT: // mv lastcheckpoint.tmp -> current
LOG.info("Recovering storage directory {} from failed checkpoint",
rootPath);
if (curDir.exists())
deleteDir(curDir);
rename(getLastCheckpointTmp(), curDir);
return;
default:
throw new IOException("Unexpected FS state: " + curState
+ " for storage directory: " + rootPath);
}
}
2.2 StorageInfo
StorageInfo用于表示存储的基本信息。
@InterfaceAudience.Private
public class StorageInfo {
// 存储系统布局版本号, 当节点存储的目录结构发生改变或者fsimage和editlog的格式发生改变时,
// 存储系统布局版本号会更新。 这个版本号一般是负数
public int layoutVersion; // layout version of the storage data
// 存储名称空间id,每一个bp对应一个
public int namespaceID; // id of the file system
// 系统的集群id
public String clusterID; // id of the cluster
// fs state的创建时间
public long cTime; // creation time of the file system state
// 存储类型:有DATA_NODE、 NAME_NODE、 JOURNAL_NODE等类型
protected final NodeType storageType; // Type of the node using this storage
// 上述提及的信息都存储在VERSION中
protected static final String STORAGE_FILE_VERSION = "VERSION";
}
2.3 DataStorage
DataStorage继承自Storage抽象类, 提供了管理Datanode存储空间的功能。
在HDFSFederation架构中, 一个Datanode可以保存多个命名空间的数据块,每个命名空间在Datanode磁盘上都拥有一个独立的块池( BlockPool),这个块池会分布在Datanode的所有存储目录下,它们共同保存了这个块池在当前Datanode上的所有数据块。HDFS定义了BlockPoolSliceStorage类管理Datanode上单个块池的存储空间,DataStorage类则定义了bpStorageMap字段保存Datanode上所有块池BlockPoolSliceStorage对象的引用。
// Maps block pool IDs to block pool storage private final MapbpStorageMap = Collections.synchronizedMap(new HashMap ());
dn在启动时会调用DataStorage提供的方法初始化Datanode的存储空间, 在HDFS Federation架构中, Datanode会保存多个命名空间的数据块。 对于每一个命名空间,Datanode都会构造一个BPOfferService类维护与这个命名空间Namenode的通信 。 当BPOfferService中的BPServiceActor类与该命名空间的Namenode握手成功后, 就会调用DataNode.initBlockPool()初始化该命名空间的块池。 DataNode.initBlockPool()方法中会调用DataNode的initStorage方法完成storage的初始化。而intiStorage中会调用DataStorage.recoverTransitionRead()来执行块池存储的初始化操作。且DataNode#initStorage初始化只在和第一个namenode的握手完成时完成一次(第一次握手是在dn启动中的connectToNNAndHandshake 中完成)。
void initBlockPool(BPOfferService bpos) throws IOException {
NamespaceInfo nsInfo = bpos.getNamespaceInfo();
if (nsInfo == null) {
throw new IOException("NamespaceInfo not found: Block pool " + bpos
+ " should have retrieved namespace info before initBlockPool.");
}
setClusterId(nsInfo.clusterID, nsInfo.getBlockPoolID());
// Register the new block pool with the BP manager.
// 通过BP manager注册新的block pool
blockPoolManager.addBlockPool(bpos);
// In the case that this is the first block pool to connect, initialize
// the dataset, block scanners, etc.
// 调用此方法完成storage的初始化
initStorage(nsInfo);
try {
data.addBlockPool(nsInfo.getBlockPoolID(), getConf());
} catch (AddBlockPoolException e) {
handleAddBlockPoolError(e);
}
// HDFS-14993: check disk after add the block pool info.
checkDiskError();
blockScanner.enableBlockPoolId(bpos.getBlockPoolId());
initDirectoryScanner(getConf());
initDiskBalancer(data, getConf());
}
private void initStorage(final NamespaceInfo nsInfo) throws IOException {
final FsDatasetSpi.Factory extends FsDatasetSpi>> factory
= FsDatasetSpi.Factory.getFactory(getConf());
if (!factory.isSimulated()) {
final StartupOption startOpt = getStartupOption(getConf());
if (startOpt == null) {
throw new IOException("Startup option not set.");
}
final String bpid = nsInfo.getBlockPoolID();
// read storage info, lock data dirs and transition fs state if necessary
// 初始化storage info,加载数据目录,必要时转换fs state
synchronized (this) {
storage.recoverTransitionRead(this, nsInfo, dataDirs, startOpt);
}
final StorageInfo bpStorage = storage.getBPStorage(bpid);
LOG.info("Setting up storage: nsid={};bpid={};lv={};" +
"nsInfo={};dnuuid={}",
bpStorage.getNamespaceID(), bpid, storage.getLayoutVersion(),
nsInfo, storage.getDatanodeUuid());
}
// If this is a newly formatted DataNode then assign a new DatanodeUuid.
checkDatanodeUuid();
// 如果数据目录未初始化完成,则根据完成了初始化的storage初始化FsDatasetImpl实例
synchronized(this) {
if (data == null) {
data = factory.newInstance(this, storage, getConf());
}
}
}
在上述方法中调用recoverTransitionRead分析指定块池的存储目录,如果有需要,还可以从以前的状态中恢复,且这个方法同步时还需要在多个dn线程之间,只有第一个dn才可以执行dn级别的存储目录状态恢复或转换。
void recoverTransitionRead(DataNode datanode, NamespaceInfo nsInfo,
Collection dataDirs, StartupOption startOpt) throws IOException {
if (addStorageLocations(datanode, nsInfo, dataDirs, startOpt).isEmpty()) {
throw new IOException("All specified directories have failed to load.");
}
}
@VisibleForTesting
synchronized List addStorageLocations(
DataNode datanode,
NamespaceInfo nsInfo, Collection dataDirs,
StartupOption startOpt) throws IOException {
// 获取并行卷加载的线程数
final int numThreads = getParallelVolumeLoadThreadsNum(
dataDirs.size(), datanode.getConf());
// 创建一个线程池
final ExecutorService executor = Executors.newFixedThreadPool(numThreads);
try {
// 并发执行加载数据卷,返回完成加载的数据目录
final List successLocations = loadDataStorage(
datanode, nsInfo, dataDirs, startOpt, executor);
return loadBlockPoolSliceStorage(
datanode, nsInfo, successLocations, startOpt, executor);
} finally {
executor.shutdown();
}
}
使用loadDataStorage加载数据卷:
private ListloadDataStorage(DataNode datanode, NamespaceInfo nsInfo, Collection dataDirs, StartupOption startOpt, ExecutorService executor) throws IOException { // 用于记录完成加载的数据卷 final List success = Lists.newArrayList(); final List tasks = Lists.newArrayList(); // 遍历数据目录 for (StorageLocation dataDir : dataDirs) { // 是否完成了加载 if (!containsStorageDir(dataDir)) { try { // It first ensures the datanode level format is completed. final List > callables = Lists.newArrayList(); // 加载一个数据目录,如果需要从过去状态中恢复 final StorageDirectory sd = loadStorageDirectory( datanode, nsInfo, dataDir, startOpt, callables); // 如果在loadStorageDirectory中存在状态转换即升级时callables不为空 if (callables.isEmpty()) { // 如果不是升级状态,那么将存储目录添加到storageDirs addStorageDir(sd); success.add(dataDir); } else { for(Callable c : callables) { tasks.add(new UpgradeTask(dataDir, executor.submit(c))); } } } catch (IOException e) { LOG.warn("Failed to add storage directory {}", dataDir, e); } } else { LOG.info("Storage directory {} has already been used.", dataDir); success.add(dataDir); } } // 将更新状态中的数据卷添加到storageDirs中 if (!tasks.isEmpty()) { LOG.info("loadDataStorage: {} upgrade tasks", tasks.size()); for(UpgradeTask t : tasks) { try { addStorageDir(t.future.get()); success.add(t.dataDir); } catch (ExecutionException e) { LOG.warn("Failed to upgrade storage directory {}", t.dataDir, e); } catch (InterruptedException e) { throw DFSUtilClient.toInterruptedIOException("Task interrupted", e); } } } return success; }
下面的代码为具体的分析加载数据目录的逻辑:
private StorageDirectory loadStorageDirectory(
DataNode datanode,
NamespaceInfo nsInfo, StorageLocation location, StartupOption startOpt,
List> callables) throws IOException {
// 根据指定地址创建一个存储目录
StorageDirectory sd = new StorageDirectory(null, false, location);
try {
// 分析当前存储目录的状态供后续操作
// 调用analyzeStorage()方法分析当前StorageDirectory的状态
StorageState curState = sd.analyzeStorage(startOpt, this, true);
// sd is locked but not opened
switch (curState) {
// 如果状态正常,不做任何事
case NORMAL:
break;
// 如果不存在,则是抛出异常
case NON_EXISTENT:
LOG.info("Storage directory with location {} does not exist", location);
throw new IOException("Storage directory with location " + location
+ " does not exist");
// 未格式化则进行格式化
case NOT_FORMATTED: // format
LOG.info("Storage directory with location {} is not formatted for "
+ "namespace {}. Formatting...", location, nsInfo.getNamespaceID());
format(sd, nsInfo, datanode.getDatanodeUuid(), datanode.getConf());
break;
// 默认则从其他状态中恢复
default: // recovery part is common
sd.doRecover(curState);
}
// 2. Do transitions
// Each storage directory is treated individually.
// During startup some of them can upgrade or roll back
// while others could be up-to-date for the regular startup.
// doTransition()方法判断如果启动选项是ROLLBACK, 则调用doRollback()方法
// 进行回滚操作。 如果存储目录记录的文件系统布局版本号(VERSION文件记录)与
// 内存中的版本号一致, 则Datanode正常启动; 如果存储目录记录的版本号小于
// 内存中的版本号, 则调用doUpgrade()方法升级(注意layoutVersion为负数) 。
if (!doTransition(sd, nsInfo, startOpt, callables, datanode.getConf())) {
// 3. Update successfully loaded storage.
setServiceLayoutVersion(getServiceLayoutVersion());
writeProperties(sd);
}
return sd;
} catch (IOException ioe) {
sd.unlock();
throw ioe;
}
}
2.4 总结
在HDFS Federation架构中, 一个Datanode可以保存多个块池的数据块, 每个块池的数据块都会分布在Datanode所有的存储目录下。 HDFS定义了BlockPoolSliceStorage类管理Datanode上单个块池的存储空间, DataStorage类则定义了bpStorageMap字段保存Datanode上所有块池的BlockPoolSliceStorage对象的引用。
DataStorage类管理着整个Datanode的存储, 包括Datanode定义的多个存储目录。BlockPoolSliceStorage类则管理着一个块池的存储, 包括分布在Datanode的多个存储目录下的块池目录。因此从设计上看,BlockPoolSliceStorage和DataStorage的功能类似,只不过一个是管理存储,一个是管理块池。



