2021SC@SDUSC
Hadoop源码分析(七)—— NameNode实现(3)Hadoop源码分析(六)—— NameNode实现(1)
Hadoop源码分析(六)—— NameNode实现(2)
- Hadoop源码分析(七)—— NameNode实现(3)
- 5. FSEditLog 文件系统的编辑日志
- 5.1 内部类
- ① EditLogFileOutputStream
- ② EditLogFilelnputStream
- ③BlockTwo
- ④Transactionld
- 5.2 成员变量
- 5.3 成员方法
FSEditLog 所在的包为org.apache.hadoop.hdfs.server.namenode,该类主要用于 namenode 对 HDFS 的 namespace的修改操作进行日志记录。在namenode中,namespace (指文件系统中的目录树/文件等元数 据信息,但是不包括block信息)是被全部缓存在内存中的,所以一旦namenode重启或者宕机,这些元 数据信息都会丢失。那么,在namenode重启的时候必须要有一种方法来将整个namespace进行重建。namenode的当前的实现是将namespace信息记录到一个叫做fsimage的二进制文件中,当namenode重启的时候则根据读取这个fsimage文件中的信息来重建namespace目录树结构。但是,fsimage始终是磁 盘中的一个文件,不可能时时刻刻都与namenode内存中的数据结构保持同步,而是通过每隔一段时间 来更新一次fsimage文件,以此来保证fsimage跟namenode内存中的namespace的尽量同步。而在一个 新的fsimage和上一个fsimage之间的namenode的操作,都会被记录到editlog文件中,所以namenode 会对应着一个fsimage文件和一个editlog文件。FSEditLog类就是用来管理这个editlog文件的。
5.1 内部类 ① EditLogFileOutputStreamEditLogFileOutputStream用于将edit日志记录到本地磁盘中,该类继承自EditLogOutputStream抽象类。EditLogOutputStream 所在的包为 org.apache.hadoop.hdfs.server.namenode ,它进一步继承自 Outputstream 父类。EditLogOutputStream 的源代码如下:
private long numSync;
将输出流中的数据同步到editlog磁盘文件的次数。
private long totalTimeSync;
执行同步操作所花费的总时间。
EditLogOutputStream() throws lOException {
numSync = totalTimeSync = 0;
}
在构造方法中将上面的成员变量都初始化为0o
abstract String getName();
该方法用于取得输出流的名称。
abstract public void write(int b) throws IOException;
该方法用于向输出流中写入一个字节。
abstract void write(byte op, Writable ... writables) throws IOException;
该方法用于将一条日志记录写入到该输出流中。其中一条日志记录包括操作名称和一个Writable类 型的参数列表。
abstract void create () throws IOException;
创建并初始化一个新的用于保存日志记录的editlog文件。
abstract public void close() throws IOException;
关闭输出流对象。
abstract void setReadyToFlush() throws IOException;
准备对写入到该输出流中的所有数据执行flush操作。在执行flush的过程中,新的数据仍然可以写 入到该输岀流中。
abstract protected void f lush/kndSync () throws IOException;
将该输出流中所有准备好flush的数据同步到磁盘中。
abstract long length() throws IOException;
取得editlog日志文件的当前长度。取得editlog日志文件长度的目的是为了检查日志文件是否大到 需要需要启动一个检查点进程来进行检査点的处理。
public void flush() throws IOException {
numSync++;
long start = FSNamesystem.now();
flushAndSync();
long end = FSNamesystem.now();
totalTimeSync += (end - start);
该方法用于将输出流中的数据刷新同步到磁盘中,同时更新同步统计数据。
接下来看一下EditLogFileOutputStream内部静态类的具体实现:
private File file; //用于存储日志记录的本地文件。 private FileOutputStream fp; //存储日志记录的文件输出流对象。 private FileChannel fc; //执行sync同步操作的文件输出流所对应的文件通道对象。 private DataOutputBuffer bufCurrent; //当前用于执行写操作的数据缓冲区。 private DataOutputBuffer bufReady; //当前用于执行flush操作的数据缓冲区。 static ByteBuffer fill = ByteBuffer.allocateDirect(512); //预分配的大小为512的字节缓冲区。
EditLogFileOutputStream(File name) throws IOException {
super ();
file = name;
bufCurrent = new DataOutputBuffer(sizeFlushBuffer);
bufReady = new DataOutputBuffer(sizeFlushBuffer);
RandomAccessFile rp = new RandomAccessFile(name, "rwn");
fp = new FileOutputStream(rp.getFD());
fc = rp.getChannel();
fc.position(fc.size());
}
在构造方法中完成对上面成员变量的初始化。被创建的bufCurrent和bufReady数据缓冲区的大小都 为512KB,之后根据文件的名称来创建一个随机读写文件的RandomAccessFile对象,然后根据 RandomAccessFile对象来进一步创建用于向文件的末尾追加数据的输出流,然后取得输出流对应的通道, 并将输出流通道的位置设置为文件输出流的当前位置。
String getName() {
return file.getPath();
}
将用于保存日志记录的文件的路径做为输出流的名称。
public void write(int b) throws IOException { bufCurrent.write(b);
}
void write(byte op, Writable ... writables) throws IOException {
write(op);
for(Writable w : writables) {
w.write(bufCurrent);
}
}
写入的数据会被保存到bufCurrent缓冲区中。
create方法的处理逻辑如下:
fc.truncate(0);
设置被创建的文件的大小,如果文件的大小大于0,则将文件中的全部字节删除掉。
fc.position(0);
设置文件的写入位置。
bufCurrent.writelnt(FSConstants.LAYOUT_VERSION);
将版本号信息写入到bufCurrent缓冲区中。
setReadyToFlush();
设置准备刷新状态。
flush ();
执行刷新,即将版本号信息写入到日志文件中。
setReadyToFlush方法的执行逻辑如下:
write (OP_INVALID);
在文件的末尾写入一个OPJNVALID标记。
DataOutputBuffer trap = bufReady; bufReady = bufCurrent; bufCurrent = tmp;
切换缓冲区,将bufCurrent切换为要执行flush操作的bufReady,同时将bufReady切换为接收写入
数据的 bufCurrent。
flushAndSync方法的执行逻辑如下:
preallocate();
如果需要的话,首先调用preallocate方法来为文件预分配内存。
bufReady.writeTo(fp);
将bufReady缓冲区中的数据写入到日志文件中。
bufReady.reset();
将bufReady缓冲区进行重置即删除缓冲区中的所有数据。
fc.force(false);
因为调用preallocate。方法对文件进行了预分配,也就没必要更新该文件元数据信息
fc.position(fc.position()-1);
设置文件通道的当前写入位置,删除写入到文件末尾的OPJNVALID标记。
preallocate方法用于为日志文件预分配一个大的数据块,以保证可持续的对事务操作进行记录。方 法的执行逻辑如下:
long position = fc.position();
获取文件的当前写入位置。
if (position + 4096 >= fc.size())
如果文件的当前位置加上4096大于等于文件的大小,则执行如下的预分配操作。
long newsize = position + 1024*1024;
将文件的大小增加1MB。
fill.position(0);
预分配缓冲区fill的写入位置为0。
int written = fc.write(fill, newsize);
为日志文件预分配1MB的字节。
long length() throws IOException {
return fc.size() + bufReady.size() + bufCurrent.size();
}
当前日志文件的大小=文件的实际大小+bufReady缓冲区的大小+bufCurrent缓冲区的大小。
File getFile() {
return file;
}
取得与该输出流所对应的文件对象。
close方法会在所有的写事务执行flush和sync操作之后被调用,方法的执行逻辑如下:
int bufSize = bufCurrent.size();
if (bufSize != 0) {
throw new lOException ("FSEditStreain has " + bufSize +" bytes still to be flushed and cannot "+"be closed.");
}
如果bufCurrent缓冲区的大小不为0,表明buflCurrent中还有未被flush的数据,则抛出对应的异常。
bufCurrent.close(); bufReady.close();
关闭 bufCurrent 和 bufReady 缓冲区。
fc.truncate(fc.position()); fp.close();
因为在调用setReadyToFlush方法结束时,写入了 OP_INVALID标记,所以需要将OPJNVALID标 记从日志文件中删除掉。
然后关闭日志文件对应的输出流管道。
bufCurrent = bufReady = null;
最后,将bufCurrent和bufReady设置为null,以便JVM的GC来回收资源。
② EditLogFilelnputStreamEditLogFileInputStream用于从本地磁盘中读取日志记录,该类继承自EditLoglnputStream抽象类。EditLoglnputStream 所在的包为 org.apache.hadoop.hdfs.server.namenode,它进一步继承自 InputStream 父 类。EditLoglnputStream的源代码如下:
abstract String getName();
取得输入流的名称。
public abstract int available() throws IOException;
估算出可以读取的数据的字节数。
public abstract int read() throws IOException;
从该输入流中读取一个字节。
public abstract int read(byte[] b, int off, int len) throws IOException;
从该输入流。任位置处开始读取len个字节到字节数组b中
public abstract void close() throws IOException;
关闭该输入流。
abstract long length() throws IOException;
返回当前读取的日志文件的大小。
接下来看一下EditLogFilelnputStream内部静态类的具体实现:
private File file; //与本地日志文件对应的File对象。 private FilelnputStream fStream; //日志文件对应的输入流对象。
EditLogFilelnputStream(File name) throws IOException {
file = name;
fStream = new FilelnputStream (name);
}
在构造方法中完成了对以上成员变量的初始化。
String getName() {
return file.getPath();
}
将本地日志文件所对应的路径字符串做为输入流的名称。
public int available() throws IOException {
return fStream.available();
}
返回从文件输入流中可以读取的字节数。
public int read() throws IOException ( return fStream.read(); }
从文件输入流中读取一个字节。
public int read(byte[] b, int off, int len) throws IOException {
return fStream.read(b, off, len);
}
从文件输入流的。flf位置处开始读取len个字节到字节数组b中。
public void close() throws IOException {
fStream.close();
}
关闭文集输入流。
long length() throws IOException {
return file.length();
}
返回日志文件的长度,其中包括日志文件本身的实际长度以及bufReady和bufCurrent两个缓冲区的 长度。
③BlockTwo该类实现了 Writable接口,它能够从以旧格式存储的块中读写数据。它的源代码如下:
long blkid;
数据Block的Id
long len;
数据Block的长度。
BlockTwo() {
blkid = 0;
len = 0;
}
在构造方法中将上面的两个成员变量都初始化为0。
public void write(DataOutput out) throws IOException {
out.writeLong(blkid);
out.writeLong(len);
}
public void readFields(Datalnput in) throws IOException {
this.blkid = in.readLong();
this.len = in.readLong();
}
实现Writable接口中的write和readFields方法,从而将blkid和len进行序列化。
④TransactionldTransactionld是对事务ID的封装类,它在FSEditLog类中主要是用来作为当前执行事务的线程对应 的线程局部变量的复制。它的源代码如下:
private static class TransactionId {
public long txid;
TransactionId(long value) {
this.txid = value;
}
}
5.2 成员变量
FSEditLog将对HDFS文件系统命名空间的修改操作分成14种,每一种操作都有对应的代号。对于 每一种操作,editlog B志文件中都会记录不同的信息,但是有一个共同的原则就是:记录的信息能够反 映这次修改操作使namespace所发生的变化,随后fsimage和editlog进行merge的时候,能够产生正确 的新的fsimage,从而保证namenode的namespace的完整性。
此处拿mkdir来举例:比如客户端对HDFS执行了一个mkdir的命令,当mkdir命令执行成功之后, FSEditLog会将一条记录信息添加到editlog日志文件中。该记录的格式如下:
path(String), timestamp(long), atime(long), username(String)t groupname(String), permission(short)
从这条日志记录中就可以反映出,该mkdir命令创建的目录的路径/目录的时间戳/目录最后被访问的 时间/目录所属的用户和组/目录的操作权限等信息。这样当fsimage跟editlog下一次进行merge后,新的 fsimage中就会多岀来一个目录,并且记录了目录的元数据信息。
private static final byte OP_INVALID = -1; //无效的操作。 private static final byte OP_ADD = 0; //添加文件的操作。 private static final byte OP_RENAME = 1; //对文件或者目录执行的重命名操作 private static final byte OP_DELETE = 2; //对文件或者目录执行的删除操作 private static final byte OP_MKDIR = 3; //创建目录的操作。 private static final byte OP_SET_REPLICATION = 4; //设置副本因子的操作 @Deprecated private static final byte OP_DATANODE_ADD = 5; @Deprecated private static final byte OP_DATANODE_REMOVE = 6; //上面的两个变量是为了保证向后的兼容性而存在的,第一个用于标识添加DataNode的操作,第二 个用于标识删除DataNode的操作。 private static final byte OP_SET_PERMISSIONS = 7; //设置权限的操作, private static final byte OP_SET_OWNER = 8; //设置所有者的操作。 private static final byte OP_CLOSE = 9; //写操作完成后的关闭操作。 private static final byte OP_SET_GENSTAMP = 10; //设置时间戳的操作。 private static final byte OP_SET_NS_QUOTA = 11; //设置文件系统命名空间的配额的操作。 private static final byte OP_CLEAR_NS_QUOTA = 12; //清除文件系统命名空间的配额的操作。 private static final byte OP_TIMES = 13; //设置一个文件的最后访问时间和修改时间的操作。 private static final byte OP_SET_QUOTA = 14; //设置文件系统命名空间和磁盘空间的配额信息的操作。 private static final byte OP_GET_DELEGATION_TOKEN = 18; //取得授权令牌的操作 private static final byte OP_RENEW_DELEGATION_TOKEN == 19; //重新取得授权令牌的操作。 private static final byte OP_CANCEL_DELEGATION_TOKEN = 20; //取消授权令牌的操作。 private static final byte OP_UPDATE_MASTER_KEY = 21; //更新master键值的操作。 private static int sizeFlushBuffer = 512*1024; //数据缓冲区的大小为512KB,该变量主要用于定义写入日志文件时所使用的bufCurrent和bufReady 两个数据缓冲区的大小。 private ArrayList5.3 成员方法editStreams = null; //用于记录日志的输出流列表。每个日志文件可能存在多个副本,每个副本都有一个对应的输出流。 private FSImage fsimage = null; //FSImage镜像对象。 private long txid = 0; //用于记录事务ID的単调递增计数器. private long synctxid = 0; //最后执行同步操作的事务的ID。 private long lastPrintTime; //最后一次将统计信息输出到日志文件时的时间。 private boolean isSyncRunning; //该变量用于标识当前是否正在执行同步操作。 ~ private long numTransactions; //用于记录事务的数量的统计变量。 private long numTransactionsBatchedlnSync; //用于记录在Sync操作中批量执行的事务的数量的统计变量。 private long totalTimeTransactions; //用于记录厄有的事务的执行总时间的统计变量; private NameNodelnstrumentation metrics; //用于记录NameNode的统计数据的统计变量。 private static final ThreadLocal myTransactionld = new ThreadLocal () { protected synchronized Transactionld initialvalue() { return new Transactionld(Long.MAX_VALUE); } }; //存储线程的当前事务的线程局部变量。
FSEditLog(FSImage image) {
fsimage = image;
isSyncRunning = false;
metrics = NameNode.getNameNodeMetrics();
lastPrintTime = FSNamesystem.now();
}
通过FSlmage来创建FSEditLog对象。在构造方法中将isSyncRunning设置为false,从而标记此时 FSEditLog未执行同步操作,之后调用NameNode的getNameNodeMetrics方法来初始化用于统计 NameNode状态信息的metrics成员变量,最后将lastPrintTime设置为系统的当前时间。
boolean isOpen() {
return getNumEditStreams() > 0;
}
该方法用于判断日志文件是否被打开了。
open方法会创建一个空的日志文件,同时初始化与日志文件对应的输出流对象,方法的执行逻辑如下:
numTransactions = totalTimeTransactions = numTransactionsBatchedlnSync = 0;
首先将统计变量初始化为0.
if (editStreams == null) editStreams = new ArrayList();
初始化FSEditLog对应的EditLogOutputStream输出流列表。
for (Iteratorit = fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) { StorageDirectory sd = it.next(); File eFile = getEditFile(sd); try { EditLogOutputStream eStream = new EditLogFileOutputStream(eFile); editStreams.add(eStream); } catch (IOException e) { FSNamesystem.LOG.warn("Unable to open edit log file " + eFile); it. remove (); } }
首先通过fsimage的diriterator方法来获得EDITS类型的存储目录所对应的迭代器,然后为每一个 存储目录下的日志文件创建一个对应的EditLogOutputStream对象,并将其添加到editStreams列表中。
close方法用于关闭EditLog日志,方法的执行逻辑如下:
while (isSyncRunning) {
try {
wait(1000);
} catch (InterruptedException ie) {
}
}
如果FSEditLog当前正在执行同步操作,则循环等待同步操作的结束。
if (editStreams == null) {
return;
}
如果editStreams列表为空,则表明EditLog日志文件没有被打开,方法直接返回。
printstatistics(true);
将统计信息输出到日志文件中。
numTransactions = totalTimeTransactions = numTransactionsBatchedlnSync = 0;
重新将统计变量设置为0。
for (int idx = 0; idx < editStreams.size(); idx++){
EditLogOutputStream eStream = editStreams.get(idx);
try {
eStream.setReadyToFlush();
eStream.flush();
eStream.close();
} catch (IOException e) {
processIOError(idx);
idx--;
}
}
editStreams.clear();
关闭保存在editStreams列表中的所有EditLogOutputStream输出流对象,之后将editStreams列表清 除掉。
private File getEditFile(StorageDirectory sd) {
return fsimage.getEditFile(sd);
}
通过调用fsimage的getEditFile方法来获得某个存储目录下的Edit日志文件。
private File getEditNewFile(StorageDirectory sd) {
return fsimage.getEditNewFile(sd);
}
通过调用fsimage的getEditNewFile方法来获得某个存储目录下的最新的Edit日志文件edit.new.
boolean existsNew() throws IOException {
for (Iterator it =
fsimage.diriterator(NameNodeDirType.EDITS); it.hasNext();) {
if (getEditNewFile(it.next()).exists()){
return true;
}
}
return false;
}
该方法用于判断存储目录下是否存在editmew文件。
synchronized long getEditLogSize() throws IOException{
assert(getNumStorageDirs() == editStreams.size());
long size = 0;
for (int idx = 0; idx < editStreams.size(); idx++) {
long curSize = editStreams.get(idx).length();
assert (size == 0 || size = curSize) : "All streams must be the same";
size = curSize;
}
return size;
}
该方法用于取得edits日志文件的大小,每个EditLogOutputStream输出流所对应的edits日志文件的 大小必须相等。
public synchronized void createEditLogFile(File name) throws IOException {
EditLogOutputStream eStream = new EditLogFileOutputstream(name);
eStream.create();
eStream.close();
}
根据文件的名称来创建一个EditLog日志文件。首先根据文件名称来创建EditLogFileOutputStream对象,然后调用其create方法来创建EditLog H志文件,并写入版本号LAYOUT_VERSION信息,最后 将 EditLogFileOutputStream 输出流关闭。



