2021SC@SDUSC
Hadoop源码分析(十一)—— Datanode实现(3)Hadoop源码分析(九)—— Datanode实现(1)
Hadoop源码分析(十)—— Datanode实现(2)
- Hadoop源码分析(十一)—— Datanode实现(3)
- 5. BlockReceiver 数据块接收器
- 5.1 成员变量
- 5.2 成员方法
- 6. DataBlockScanner 数据块扫描器
- 6.1 成员变量
- 6.2 成员方法
BlockReceiver 所在的包为 org.apache.hadoop.hdfs.server.Datanode,该类是 Datanode 节点上的数据块接收器。BlockReceiver的源代码如下:
5.1 成员变量private Block block; //该变量代表待接收的Block。 protected boolean finalized; //该变量用于标识接收过程是否结束。 private DatalnputStream in = null; //该变量代表Block的数据读取流。 private DataChecksum checksum; //该变量代表Block数据的校验器 private Outputstream out = null; //该变量代表Block数据的写入流(写入Datanode的本地磁盘) private DataOutputStream checksumOut = null; //该变量代表Block数据校验和写入流(写入Datanode的本地磁盘) private int bytesPerChecksum; //该变量代表数据校验块大小。 private int checksumsize; //该变量代表数据校验快对应的校验和大小。 private ByteBuffer buf; //该变量代表packet数据缓存块。 private int bufRead; //该变量代表缓存块中已读取的数据大小。 private int maxPacketReadLen; //该变量代表数据包读取数据的最大长度。 protected long offsetlnBlock; //该变量代表接受的packet在Block中的起始位置。 protected final String inAddr; //该变量用于标识发送端(Datanode/Client)的ip地址。 protected final String myAddr; //该变量用于标识当前接收端(Datanode)的ip地址。 private String mirrorAddr; //该变量代表下一个接收端(Datanode)的ip地址。 private DataOutputStream mirrorOut; //该变量标识向下一个接收端发送packet的写入流。 private Daemon responder = null; //该变量代表packet的响应器。 private FSDataset.BlockWriteStreams streams; //该变量代表Block写入流。 private boolean isRecovery = false; //该变量用于标识是否执行恢复操作。 private String clientName; //该变量代表发送数据的客户端。 Datanodelnfo srcDatanode = null; //该变量代表数据源所在的数据结点。 private Checksum partialCrc = null; //该变量代表校验和。5.2 成员方法
BlockReceiver (Block block, DataInputStream in, String inAddr, String myAddr, boolean isRecovery, String clientName, DatanodeInfo srcDatanode, Datanode Datanode) throws IOException {
try{
this.block = block;
this.in = in;
this.inAddr = inAddr;
this.myAddr = myAddr;
this.isRecovery = isRecovery;
this.clientName = clientName;
this.offsetlnBlock = 0;
this.srcDatanode = srcDatanode;
this.Datanode = Datanode;
this.checksum = DataChecksum.newDataChecksum(in) ; //从头部信息中创建数据校验器
this.bytesPerChecksum = checksum.getBytesPerChecksum(); //数据校验块的大小
this.checksumSize = checksum.getChecksumSize(); //数据校验块对应的校大小
//为即将接受的Block数据创建临时存储空间(创建Block的数据文件和校验和文件)
streams = Datanode.data.writeToBlock(block, isRecovery);
this.finalized = Datanode.data.isValidBlock(block);
if (streams != null) {
this.out = streams.dataOut;
this.checksumOut = new DataOutputStream(new BufferedOutputStream ( streams.checksumOut, SMALL_BUFFER_SIZE));
if (Datanode.blockscanner != null && isRecovery) { Datanode.blockscanner.deleteBlock(block);
}
}
} catch (BlockAlreadyExistsException bae) {
throw bae;
} catch(IOException ioe) {
IOUtils.closeStream(this);
cleanupBlock();
IOException cause = FSDataset.getCauselfDiskError(ioe);
if (cause != null) { // possible disk error
ioe = cause;
Datanode.checkDiskError(ioe); // may throw an exception here
}
throw ioe;
}
}
通过构造方法完成BlockReceiver对象的初始化工作,这里的初始化是指为即将到来的Block数据申 请本地磁盘上的存储空间,并且根据接受的头部信息为该Block创建校验器,以及获取对应的校验配置信息。
private int readToBuf(int toRead) throws IOException {
if (toRead < 0) {
toRead = (maxPacketReadLen > 0 ? maxPacketReadLen : buf.capacity()) - buf.limit();
}
//读取数据到buf中
int nRead = in.read(buf.array(), buf.limit(), toRead);
if (nRead < 0) {
throw new EOFException("while trying to read " + toread + " bytes");
}
bufRead = buf.limit() + nRead;
buf.limit(bufRead);
//返回本次读取数据的大小
return nRead;
}
该方法用于接收当前packet中的数据,并保存到缓存块中。
6. DataBlockScanner 数据块扫描器DataBlockScanner 所在的包为 org.apachc.hadoop.hdfs.server.Datanode,该类主要用于保证 Datanode 上数据块的完整性。此外Hadoop还提供了校验和的方式来保证数据的完整性。在Datanode节点上开启一个DataBlockScanner后台线程,来定期验证存储在其上的所有块,这个是防止物理介质出现损减情况而造成的数据损坏。DataBlockScanner的源代码如下:
6.1 成员变量private static final int MAX_SCAN_RATE = 8 * 1024 * 1024; //该变量用于表示最大的扫描速度为8MB/So private static final int MIN_SCAN_RATE = 1 * 1024 * 1024; //该变量用于表示最小的扫描速度为IMB/s。 static final long DEFAULT_SCAN_PERIOD_HOURS = 21*24L; //该变量用于表示默认扫描周期是3周,扫描周期可通过配置$dfs.Datanode.scan.period.hours}来设置。 static final String verificationlogFile = "dncp_block_verification.log"; //该变量用于表示保存扫描日志信息的文件名前缀。其中在DataBlockScanner工作的过程中共产生两个日志:当前日志,文件名后缀是.curr;前一个日志,文件名后缀是.prev。 static final int verficationLogLimit = 5; //该变量用于表示每扫描5个数据块就进行一次日志记录操作。 FSDataset dataset; //该变量代表数据块管理器。 TreeSet6.2 成员方法blockInfoSet; //该变量代表数据块扫描信息集合,按照上一次扫描时间和数据块id升序排序,以便快速获取验证到 期的数据块。 HashMap blockMap; //该变量代表数据块和数据块扫描信息的映射,以便能够根据数据块快速获取对应的扫描信息。 long totalScans = 0; //该变量代表扫描的总次数。 long totalverifications = 0; //该变量代表进行数据块验证的总次数。 long totalScanErrors =0; //该变量代表扫描出错的总次数。 long totalTransientErrors = 0; //该变量代表扫描过程中出现的短暂错误的总次数。 long bytesLeft = 0; //该变量代表一个扫描周期中还剩下需要扫描的数据量。 long totalBytesToScan = 0; //该变量代表一个扫描周期中需要扫描的总数据量。 private LogFileHandler verificationLog; //该变量代表数据块的扫描验证日志记录器。 BlockTransferThrottler throttler = null; //该变量代表扫描时I/O速度控制器,需要根据totalBytesToScan和bytesLeft信息来衡量。 private static enum ScanType { //当某个数据块被远程客户端访问时,所进行的扫描操作 REMOTE_READ, //当进有数据块的完整,性验证时,所进行的扫描操作 VERIFTCATI ON_SCAN, NONE, } 该变量代表扫描操作的类型。
private void init() {
//从“磁盘”上获取所有的数据块基本信息
Block arr[] = dataset.getBlockReport();
Collections.shuffle(Arrays.asList(arr));
blockInfoSet = new TreeSet();
blockMap = new HashMap();
long scanTime = -1;
for (Block block : arr) {
//为每一个Block建立扫描验证信息
BlockScanInfo info = new BlockScanInfo( block ); info.lastScanTime = scanTime--;
addBlocklnfo(info);
}
//寻找一个合适的扫描验证日志文件
File dir = null;
FSDataset.FSVolume[] volumes = dataset.volumes.volumes;
for(FSDataset.FSVolume vol : volumes) {
if (LogFileHandler.isFilePresent(vol.getDir(), verificationLogFile)) {
dir = vol.getDir();
break;
}
}
if (dir == null) {
dir = volumes[0].getDir();
}
try {
//创建一个日志记录器
verificationLog = new LogFileHandler(dir, verificationLogFile, 100);
} catch (IOException e) {
LOG.warn("Could not open verfication log. " + "Verification times are not stored.");
}
synchronized (this) {
//创建一个扫描速度控制器
throttler = new BlockTransferThrottler(200, MAX_SCAN_RATE);
)
}
初始化方法主要完成的工作包括:为每一个Block创建对应的BlockScanlnfo对象,创建扫描日志记 录器,创建扫描速度控制器等。
private void updateBytesToScan(long len, long lastScanTime) {
totalBytesToScan += len;
//新添加的Block需要在需要在此次中扫描验证
if ( lastScanTime < currentPeriodstart ) {
bytesLeft += len;
}
}
该方法用于更新此次扫描操作需要扫描的字节数。
private synchronized void addBlockInfo(BlockScanInfo info) {
boolean added = blockInfoSet.add(info);
blockMap.put(info.block, info);
if ( added ) {
LogFileHandler log = verificationLog;
if (log != null) {
log.setMaxNumLines(blockMap.size() * verficationLogLimit);
}
updateBytesToScan(info.block.getNumBytes(), info.lastScanTime);
}
}
private synchronized void delBlocklnfo(BlockScanlnfo info) {
boolean exists = blockInfoSet.remove(info);
blockMap.remove(info.block);
if ( exists ) {
LogFileHandler log = verificationLog;
if (log != null) {
log.setMaxNumLines(blockMap.size() * verficationLogLimit);
}
updateBytesToScan(-info.block.getNumBytes(), info.lastScanTime);
}
}
上面的两个方法分别用于添加和删除数据块对应的扫描信息。
private boolean assignlnitialVerificationTimes() {
int numBlocks = 1;
synchronized (this) {
numBlocks = Math.max(blockMap.size(), 1);
}
//读取数据块的验证日志文件
LogFileHandler.Reader logReader = null;
try {
if (verificationLog != null) {
logReader = verificationLog.new Reader(false);
}
} catch (IOException e) {
LOG.warn("Could not read previous verification times :" + StringUtils.stringifyException(e)); }
if (verificationLog != null) { verificationLog.updateCurNumLines();
}
try {
//用日志信息来更新记录的Block上一次验证时间
while (logReader != null && logReader.hasNext()) {
if (!Datanode.shouldRun || Thread.interrupted()) { return false;
}
LogEntry entry = LogEntry.parseEntry(logReader.next());
if (entry != null) {
updateBlocklnfo(entry);
)
}
} finally {
IOUtils.closeStream(logReader);
}
//计算Blocks之间验证的间隔时间
long verifylnterval = (long) (Math.min( scanPeriod/2.O/numBlocks, 10*60*1000 ));
long lastScanTime = System.currentTimeMillis() - scanPeriod;
//初始化剩余Blocks的上一次验证时间
synchronized (this) {
if (blocklnfoSet.size() > 0 ) {
BlockScanlnfo info;
while ((info = blocklnfoSet.first()).lastScanTime < 0) {
delBlockInfo(info);
info.lastScanTime = lastScanTime;
lastScanTime += verifyInterval;
addBlocklnfo(info);
}
}
}
return true;
该方法用于为每一个Block分配上一次验证的时间。
private synchronized void adjustThrottler() {
//本次扫描验证还剰余的时间
long timeLeft = currentPeriodStart+scanPeriod - System.currentTimeMillis();
//根据本次验证扫描剩余的工作量和时间来计算速度
long bw = Math.max(bytesLeft*1000/timeLeft, MIN_SCAN_RATE);
throttler.setBandwidth(Math.min(bw, MAX_SCAN_RATE));
}
该方法用于调整扫描速度。在一次Blocks扫描验证周期中,DataBlockScanner需要进行大量的磁盘 I/O,为了不影响Datanode节点上其他线程的工作资源,同时也为了自身工作的有效性,所以 DataBlockScanner采用了扫描验证速度控制器,并根据当前的工作量来控制当前数据块的验证速度。



