栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

2021年山东大学软件工程应用与实践项目——Hadoop源码分析(七)

2021年山东大学软件工程应用与实践项目——Hadoop源码分析(七)

2021SC@SDUSC

Hadoop源码分析(七)—— NameNode实现(3)

Hadoop源码分析(六)—— NameNode实现(1)
Hadoop源码分析(六)—— NameNode实现(2)

文章目录
  • Hadoop源码分析(七)—— NameNode实现(3)
    • 5. FSEditLog 文件系统的编辑日志
      • 5.1 内部类
        • ① EditLogFileOutputStream
        • ② EditLogFilelnputStream
        • ③BlockTwo
        • ④Transactionld
      • 5.2 成员变量
      • 5.3 成员方法

5. FSEditLog 文件系统的编辑日志

FSEditLog 所在的包为org.apache.hadoop.hdfs.server.namenode,该类主要用于 namenode 对 HDFS 的 namespace的修改操作进行日志记录。在namenode中,namespace (指文件系统中的目录树/文件等元数 据信息,但是不包括block信息)是被全部缓存在内存中的,所以一旦namenode重启或者宕机,这些元 数据信息都会丢失。那么,在namenode重启的时候必须要有一种方法来将整个namespace进行重建。namenode的当前的实现是将namespace信息记录到一个叫做fsimage的二进制文件中,当namenode重启的时候则根据读取这个fsimage文件中的信息来重建namespace目录树结构。但是,fsimage始终是磁 盘中的一个文件,不可能时时刻刻都与namenode内存中的数据结构保持同步,而是通过每隔一段时间 来更新一次fsimage文件,以此来保证fsimage跟namenode内存中的namespace的尽量同步。而在一个 新的fsimage和上一个fsimage之间的namenode的操作,都会被记录到editlog文件中,所以namenode 会对应着一个fsimage文件和一个editlog文件。FSEditLog类就是用来管理这个editlog文件的。

5.1 内部类 ① EditLogFileOutputStream

EditLogFileOutputStream用于将edit日志记录到本地磁盘中,该类继承自EditLogOutputStream抽象类。EditLogOutputStream 所在的包为 org.apache.hadoop.hdfs.server.namenode ,它进一步继承自 Outputstream 父类。EditLogOutputStream 的源代码如下:

private long numSync;

将输出流中的数据同步到editlog磁盘文件的次数。

private long totalTimeSync;

执行同步操作所花费的总时间。

EditLogOutputStream() throws lOException {
numSync = totalTimeSync = 0;
}

在构造方法中将上面的成员变量都初始化为0o

abstract String getName();

该方法用于取得输出流的名称。

abstract public void write(int b) throws IOException;

该方法用于向输出流中写入一个字节。

abstract void write(byte op, Writable ... writables) throws IOException;

该方法用于将一条日志记录写入到该输出流中。其中一条日志记录包括操作名称和一个Writable类 型的参数列表。

abstract void create () throws IOException;

创建并初始化一个新的用于保存日志记录的editlog文件。

abstract public void close() throws IOException;

关闭输出流对象。

abstract void setReadyToFlush() throws IOException;

准备对写入到该输出流中的所有数据执行flush操作。在执行flush的过程中,新的数据仍然可以写 入到该输岀流中。

abstract protected void f lush/kndSync () throws IOException;

将该输出流中所有准备好flush的数据同步到磁盘中。

abstract long length() throws IOException;

取得editlog日志文件的当前长度。取得editlog日志文件长度的目的是为了检查日志文件是否大到 需要需要启动一个检查点进程来进行检査点的处理。

public void flush() throws IOException {
numSync++;
long start = FSNamesystem.now();
flushAndSync();
long end = FSNamesystem.now();
totalTimeSync += (end - start);

该方法用于将输出流中的数据刷新同步到磁盘中,同时更新同步统计数据。

接下来看一下EditLogFileOutputStream内部静态类的具体实现:

private File file;
//用于存储日志记录的本地文件。
private FileOutputStream fp;
//存储日志记录的文件输出流对象。
private FileChannel fc;
//执行sync同步操作的文件输出流所对应的文件通道对象。
private DataOutputBuffer bufCurrent;
//当前用于执行写操作的数据缓冲区。
private DataOutputBuffer bufReady;
//当前用于执行flush操作的数据缓冲区。
static ByteBuffer fill = ByteBuffer.allocateDirect(512);
//预分配的大小为512的字节缓冲区。
EditLogFileOutputStream(File name) throws IOException {
super ();
file = name;
bufCurrent = new DataOutputBuffer(sizeFlushBuffer);
bufReady = new DataOutputBuffer(sizeFlushBuffer);
RandomAccessFile rp = new RandomAccessFile(name, "rwn");
fp = new FileOutputStream(rp.getFD());
fc = rp.getChannel();
fc.position(fc.size());
}

在构造方法中完成对上面成员变量的初始化。被创建的bufCurrent和bufReady数据缓冲区的大小都 为512KB,之后根据文件的名称来创建一个随机读写文件的RandomAccessFile对象,然后根据 RandomAccessFile对象来进一步创建用于向文件的末尾追加数据的输出流,然后取得输出流对应的通道, 并将输出流通道的位置设置为文件输出流的当前位置。

String getName() {
return file.getPath();
}

将用于保存日志记录的文件的路径做为输出流的名称。

public void write(int b) throws IOException { bufCurrent.write(b);
}
void write(byte op, Writable ... writables) throws IOException {
write(op);
for(Writable w : writables) {
w.write(bufCurrent);
}
}

写入的数据会被保存到bufCurrent缓冲区中。
create方法的处理逻辑如下:

fc.truncate(0);

设置被创建的文件的大小,如果文件的大小大于0,则将文件中的全部字节删除掉。

fc.position(0);

设置文件的写入位置。

bufCurrent.writelnt(FSConstants.LAYOUT_VERSION);

将版本号信息写入到bufCurrent缓冲区中。

setReadyToFlush();

设置准备刷新状态。

flush ();

执行刷新,即将版本号信息写入到日志文件中。

setReadyToFlush方法的执行逻辑如下:

write (OP_INVALID);

在文件的末尾写入一个OPJNVALID标记。

DataOutputBuffer trap = bufReady;
bufReady = bufCurrent;
bufCurrent = tmp;

切换缓冲区,将bufCurrent切换为要执行flush操作的bufReady,同时将bufReady切换为接收写入
数据的 bufCurrent。

flushAndSync方法的执行逻辑如下:

preallocate();

如果需要的话,首先调用preallocate方法来为文件预分配内存。

bufReady.writeTo(fp);

将bufReady缓冲区中的数据写入到日志文件中。

bufReady.reset();

将bufReady缓冲区进行重置即删除缓冲区中的所有数据。

fc.force(false);

因为调用preallocate。方法对文件进行了预分配,也就没必要更新该文件元数据信息

fc.position(fc.position()-1);

设置文件通道的当前写入位置,删除写入到文件末尾的OPJNVALID标记。
preallocate方法用于为日志文件预分配一个大的数据块,以保证可持续的对事务操作进行记录。方 法的执行逻辑如下:

long position = fc.position();

获取文件的当前写入位置。

if (position + 4096 >= fc.size())

如果文件的当前位置加上4096大于等于文件的大小,则执行如下的预分配操作。

long newsize = position + 1024*1024;

将文件的大小增加1MB。

fill.position(0);

预分配缓冲区fill的写入位置为0。

int written = fc.write(fill, newsize);

为日志文件预分配1MB的字节。

long length() throws IOException {
return fc.size() + bufReady.size() + bufCurrent.size();
}

当前日志文件的大小=文件的实际大小+bufReady缓冲区的大小+bufCurrent缓冲区的大小。

File getFile() {
return file;
}

取得与该输出流所对应的文件对象。

close方法会在所有的写事务执行flush和sync操作之后被调用,方法的执行逻辑如下:

int bufSize = bufCurrent.size();
if (bufSize != 0) {
throw new lOException ("FSEditStreain has " + bufSize +" bytes still to be flushed and cannot "+"be closed.");
}

如果bufCurrent缓冲区的大小不为0,表明buflCurrent中还有未被flush的数据,则抛出对应的异常。

bufCurrent.close();
bufReady.close();

关闭 bufCurrent 和 bufReady 缓冲区。

fc.truncate(fc.position());
fp.close();

因为在调用setReadyToFlush方法结束时,写入了 OP_INVALID标记,所以需要将OPJNVALID标 记从日志文件中删除掉。
然后关闭日志文件对应的输出流管道。

bufCurrent = bufReady = null;

最后,将bufCurrent和bufReady设置为null,以便JVM的GC来回收资源。

② EditLogFilelnputStream

EditLogFileInputStream用于从本地磁盘中读取日志记录,该类继承自EditLoglnputStream抽象类。EditLoglnputStream 所在的包为 org.apache.hadoop.hdfs.server.namenode,它进一步继承自 InputStream 父 类。EditLoglnputStream的源代码如下:

abstract String getName();

取得输入流的名称。

public abstract int available() throws IOException;

估算出可以读取的数据的字节数。

public abstract int read() throws IOException;

从该输入流中读取一个字节。

public abstract int read(byte[] b, int off, int len) throws IOException;

从该输入流。任位置处开始读取len个字节到字节数组b中

public abstract void close() throws IOException;

关闭该输入流。

abstract long length() throws IOException;

返回当前读取的日志文件的大小。

接下来看一下EditLogFilelnputStream内部静态类的具体实现:

private File file;
//与本地日志文件对应的File对象。
private FilelnputStream fStream;
//日志文件对应的输入流对象。
EditLogFilelnputStream(File name) throws IOException {
file = name;
fStream = new FilelnputStream (name);
}

在构造方法中完成了对以上成员变量的初始化。

String getName() {
return file.getPath();
}

将本地日志文件所对应的路径字符串做为输入流的名称。

public int available() throws IOException {
return fStream.available();
}

返回从文件输入流中可以读取的字节数。

public int read() throws IOException (
return fStream.read();
}

从文件输入流中读取一个字节。

public int read(byte[] b, int off, int len) throws IOException {
return fStream.read(b, off, len);
}

从文件输入流的。flf位置处开始读取len个字节到字节数组b中。

public void close() throws IOException {
fStream.close();
}

关闭文集输入流。

long length() throws IOException {
return file.length();
}

返回日志文件的长度,其中包括日志文件本身的实际长度以及bufReady和bufCurrent两个缓冲区的 长度。

③BlockTwo

该类实现了 Writable接口,它能够从以旧格式存储的块中读写数据。它的源代码如下:

long blkid;

数据Block的Id

long len;

数据Block的长度。

BlockTwo() {
blkid = 0;
len = 0;
}

在构造方法中将上面的两个成员变量都初始化为0。

public void write(DataOutput out) throws IOException {
out.writeLong(blkid);
out.writeLong(len);
}
public void readFields(Datalnput in) throws IOException {
this.blkid = in.readLong();
this.len = in.readLong();
}

实现Writable接口中的write和readFields方法,从而将blkid和len进行序列化。

④Transactionld

Transactionld是对事务ID的封装类,它在FSEditLog类中主要是用来作为当前执行事务的线程对应 的线程局部变量的复制。它的源代码如下:

private static class TransactionId {
public long txid;
TransactionId(long value) {
this.txid = value;
}
}
5.2 成员变量

FSEditLog将对HDFS文件系统命名空间的修改操作分成14种,每一种操作都有对应的代号。对于 每一种操作,editlog B志文件中都会记录不同的信息,但是有一个共同的原则就是:记录的信息能够反 映这次修改操作使namespace所发生的变化,随后fsimage和editlog进行merge的时候,能够产生正确 的新的fsimage,从而保证namenode的namespace的完整性。
此处拿mkdir来举例:比如客户端对HDFS执行了一个mkdir的命令,当mkdir命令执行成功之后, FSEditLog会将一条记录信息添加到editlog日志文件中。该记录的格式如下:

path(String), timestamp(long), atime(long), username(String)t groupname(String), permission(short)

从这条日志记录中就可以反映出,该mkdir命令创建的目录的路径/目录的时间戳/目录最后被访问的 时间/目录所属的用户和组/目录的操作权限等信息。这样当fsimage跟editlog下一次进行merge后,新的 fsimage中就会多岀来一个目录,并且记录了目录的元数据信息。

private static final byte OP_INVALID = -1;
//无效的操作。
private static final byte OP_ADD = 0;
//添加文件的操作。
private static final byte OP_RENAME = 1;
//对文件或者目录执行的重命名操作
private static final byte OP_DELETE = 2;
//对文件或者目录执行的删除操作
private static final byte OP_MKDIR = 3;
//创建目录的操作。
private static final byte OP_SET_REPLICATION = 4;
//设置副本因子的操作
@Deprecated private static final byte OP_DATANODE_ADD = 5;
@Deprecated private static final byte OP_DATANODE_REMOVE = 6;
//上面的两个变量是为了保证向后的兼容性而存在的,第一个用于标识添加DataNode的操作,第二 个用于标识删除DataNode的操作。
private static final byte OP_SET_PERMISSIONS = 7;
//设置权限的操作,
private static final byte OP_SET_OWNER = 8;
//设置所有者的操作。
private static final byte OP_CLOSE = 9;
//写操作完成后的关闭操作。
private static final byte OP_SET_GENSTAMP = 10;
//设置时间戳的操作。
private static final byte OP_SET_NS_QUOTA = 11;
//设置文件系统命名空间的配额的操作。
private static final byte OP_CLEAR_NS_QUOTA = 12;
//清除文件系统命名空间的配额的操作。
private static final byte OP_TIMES = 13;
//设置一个文件的最后访问时间和修改时间的操作。
private static final byte OP_SET_QUOTA = 14; 
//设置文件系统命名空间和磁盘空间的配额信息的操作。
private static final byte OP_GET_DELEGATION_TOKEN = 18;
//取得授权令牌的操作
private static final byte OP_RENEW_DELEGATION_TOKEN == 19;
//重新取得授权令牌的操作。
private static final byte OP_CANCEL_DELEGATION_TOKEN = 20;
//取消授权令牌的操作。
private static final byte OP_UPDATE_MASTER_KEY = 21;
//更新master键值的操作。
private static int sizeFlushBuffer = 512*1024;
//数据缓冲区的大小为512KB,该变量主要用于定义写入日志文件时所使用的bufCurrent和bufReady 两个数据缓冲区的大小。
private ArrayList editStreams = null;
//用于记录日志的输出流列表。每个日志文件可能存在多个副本,每个副本都有一个对应的输出流。
private FSImage fsimage = null;
//FSImage镜像对象。
private long txid = 0;
//用于记录事务ID的単调递增计数器.
private long synctxid = 0;
//最后执行同步操作的事务的ID。
private long lastPrintTime;
//最后一次将统计信息输出到日志文件时的时间。
private boolean isSyncRunning;
//该变量用于标识当前是否正在执行同步操作。	~
private long numTransactions;
//用于记录事务的数量的统计变量。	 	
private long numTransactionsBatchedlnSync;
//用于记录在Sync操作中批量执行的事务的数量的统计变量。	 	
private long totalTimeTransactions;
//用于记录厄有的事务的执行总时间的统计变量;
private NameNodelnstrumentation metrics;
//用于记录NameNode的统计数据的统计变量。
private static final ThreadLocal myTransactionld = new ThreadLocal () { protected synchronized Transactionld initialvalue() {
return new Transactionld(Long.MAX_VALUE);
}
};
//存储线程的当前事务的线程局部变量。
5.3 成员方法
FSEditLog(FSImage image) {
fsimage = image;
isSyncRunning = false;
metrics = NameNode.getNameNodeMetrics();
lastPrintTime = FSNamesystem.now();
}

通过FSlmage来创建FSEditLog对象。在构造方法中将isSyncRunning设置为false,从而标记此时 FSEditLog未执行同步操作,之后调用NameNode的getNameNodeMetrics方法来初始化用于统计 NameNode状态信息的metrics成员变量,最后将lastPrintTime设置为系统的当前时间。

boolean isOpen() {
return getNumEditStreams() > 0;
}

该方法用于判断日志文件是否被打开了。
open方法会创建一个空的日志文件,同时初始化与日志文件对应的输出流对象,方法的执行逻辑如下:

numTransactions = totalTimeTransactions = numTransactionsBatchedlnSync = 0;

首先将统计变量初始化为0.

if (editStreams == null)
editStreams = new ArrayList();

初始化FSEditLog对应的EditLogOutputStream输出流列表。

for (Iterator it = fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
StorageDirectory sd = it.next();
File eFile = getEditFile(sd);
try {
EditLogOutputStream eStream = new EditLogFileOutputStream(eFile); editStreams.add(eStream);
} catch (IOException e) {
FSNamesystem.LOG.warn("Unable to open edit log file " + eFile);
it. remove ();
}
}

首先通过fsimage的diriterator方法来获得EDITS类型的存储目录所对应的迭代器,然后为每一个 存储目录下的日志文件创建一个对应的EditLogOutputStream对象,并将其添加到editStreams列表中。
close方法用于关闭EditLog日志,方法的执行逻辑如下:

while (isSyncRunning) {
try {
wait(1000);
} catch (InterruptedException ie) {
}
}

如果FSEditLog当前正在执行同步操作,则循环等待同步操作的结束。

if (editStreams == null) {
return;
}

如果editStreams列表为空,则表明EditLog日志文件没有被打开,方法直接返回。

printstatistics(true);

将统计信息输出到日志文件中。

numTransactions = totalTimeTransactions = numTransactionsBatchedlnSync = 0;

重新将统计变量设置为0。

for (int idx = 0; idx < editStreams.size(); idx++){
EditLogOutputStream eStream = editStreams.get(idx);
try {
eStream.setReadyToFlush();
eStream.flush();
eStream.close();
} catch (IOException e) {
processIOError(idx);
idx--;
}
}
editStreams.clear();

关闭保存在editStreams列表中的所有EditLogOutputStream输出流对象,之后将editStreams列表清 除掉。

private File getEditFile(StorageDirectory sd) {
return fsimage.getEditFile(sd);
}

通过调用fsimage的getEditFile方法来获得某个存储目录下的Edit日志文件。

private File getEditNewFile(StorageDirectory sd) {
return fsimage.getEditNewFile(sd);
}

通过调用fsimage的getEditNewFile方法来获得某个存储目录下的最新的Edit日志文件edit.new.

boolean existsNew() throws IOException {
for (Iterator it =
fsimage.diriterator(NameNodeDirType.EDITS); it.hasNext();) {
if (getEditNewFile(it.next()).exists()){
return true;
}
}
return false;
}

该方法用于判断存储目录下是否存在editmew文件。

synchronized long getEditLogSize() throws IOException{
assert(getNumStorageDirs() == editStreams.size());
long size = 0;
for (int idx = 0; idx < editStreams.size(); idx++) {
long curSize = editStreams.get(idx).length();
assert (size == 0 || size = curSize) : "All streams must be the same";
size = curSize;
}
return size;
}

该方法用于取得edits日志文件的大小,每个EditLogOutputStream输出流所对应的edits日志文件的 大小必须相等。

public synchronized void createEditLogFile(File name) throws IOException {
EditLogOutputStream eStream = new EditLogFileOutputstream(name);
eStream.create();
eStream.close();
}

根据文件的名称来创建一个EditLog日志文件。首先根据文件名称来创建EditLogFileOutputStream对象,然后调用其create方法来创建EditLog H志文件,并写入版本号LAYOUT_VERSION信息,最后 将 EditLogFileOutputStream 输出流关闭。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/654332.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号