2021SC@SDUSC
Hadoop源码分析(十)——Datanode实现(2)Hadoop源码分析(九)——Datanode实现(1)
文章目录- Hadoop源码分析(十)——Datanode实现(2)
- 3. DatanodeInfo 类
- 3.1 成员变量
- 3.2 成员方法
- 4. BlockSender 数据块发送器
- 4.1 成员变量
- 4.2 成员方法
Datanodelnfo所在的包为 org.apache.hadoop.hdfs.protocol,该类继承自 DatanodelD 并且实现了 Node 接口。Datanodelnfo对象,存储了一个Datanode的相关状态信息,主要用于Datanode和Client之间的 communication。Node接口定义了网络拓扑结构中的一个结点。一个Node可能代表一个Datanode,也可 能代表数据中心或者机架内部的一个结点。在网络中的每一个data都有自己的name和location信息, 并且通过一个和文件名相似的字符串标识符来表示。例如,一个Datanode的名称格式为hostname:port, 如果该数据结点放在名为dog的数据中心的orange机架上,那么这个Datanode在整个网络中的位置就 表示为/dog/orange。Datanodelnfb的源代码如下:
3.1 成员变量protected long capacity;
//该变量代表此数据结点的总容量。
protected long dfsUsed;
//该变量代表此数据结点巳经使用的空间。
protected long remaining;
//该变量代表此数据结点未使用的空间o
protected long lastUpdate;
//该变量代表此数据结点最后的更新时间。
protected int xceiverCount;
//该变量代表与此数据结点相连的活动的连接数量。
protected String location = NetworkTopology.DEFAULT_RACK;
//该变量代表此数据结点在网络拓扑结构中的位置,默认为 "/default-rack"
protected String hostName = null;
//该变量代表此数据结点的主机名,主机名在Datanode注册它的名称的时候会被使用到。数据结点使 用它的1P地址作为它的name。
public enum AdminStates {NORMAL, //正常态
DECOMMISSION_INPROGRESS, //正在退役的状态
DECOMMISSIONED; //已经退彼的状态
}
该枚举类型定义了一个枚举类型来表示Datanode可监控的状态。
3.2 成员方法
public String getDatanodeReport(); public String dumpDatanode()
使用report方法来生成数据结点状态的格式化字符串,而使用print方法来打印数据结点状态的格式 化符串。
public void startDecommission() {
adminState = AdminStates.DECOMMISSION_INPROGRESS;
}
该方法用于开始将此Datanode标记为无效。
public void stopDecommission () {
adminState = null;
}
该方法用于终止将此Datanode标记为无效的过程。
public boolean isDecommissionInProgress() {
if (adminState == AdminStates.DECOMMISSION_INPROGRESS) {
return true;
}
return false;
}
public boolean isDecommissioned() {
if (adminState == AdminStates.DECOMMISSIONED) {
return true;
}
return false;
}
上面的两个方法用于判断此Datanode是否已经无效。
4. BlockSender 数据块发送器BlockSender 所在的包为 org.apache.hadoop.hdfs.server.Datanode,该类用于从本地文件中读取 Block 信息,并发送给接收者。BlockSender用数据包的方式向接收端发送数据,一个数据包可能包含若干个校 验数据块,但它并不需要接收端发送对数据包的确认帧,自己也不接受这些确认帧。在Datanode节点上, 主要有四个地方会用到数据块发送器BlockSender:
- 当用户向HDFS读取某一个文件时,客户端会根据数据所在的位置转向具体的Datanode节点并请求对应数据块的数据,此时Datanode节点会用BlockSender向该客户端发送数据;
- 当NameNode节点发现某个Block的副本不足时,它会要求某一个存储了该Block的Datanode节点向其他Datanode节点复制该Block,当然此时仍然会釆用流水线的复制方式,只不过数据来源 变成了 一个Datanode节点;
- HDFS开了一个调节Datanode负载均衡的工具Balacer,当它发现某一个Datanode节点存储的Block过多时,就会让这个Datanode节点转移一部分Blocks到新添加到集群的Datanode节点或者存储负载轻的Datanode节点上;
- Datanode节点在后台开启了一个用于对存储的所有Block进行扫描验证的后台线程,它会定期利用BlockSender来检查一个Block的数据是否损坏。
private Block block; //该变量代表将要读取的Block. private InputStream blockin; //该变量代表数据Block输入流。 private long blocklnPosition = -1; //该变量的值会在调用transferTo方法的时候被改变。 private DatalnputStream checksumin; //该变量代表校验和输入流。 private DataChecksum checksum; //该变量用于存储读取到的校验和。 private long offset; //该变量代表数据包中的数据在Block中的开始位置。 private long endoffset; //该变量代表数据包中的数据在Block中的结束位置。 private long blockLength; //该变量代表数据Block的长度。 private int bytesPerChecksum; //该变量代表读取的数据chunk的大小。 private int checksumSize; //该变量代表校验和的大小。 private boolean corruptChecksumOk; //该变量用于标识是否进行校验和验证。 private boolean chunkoffsetOK; //该变量用于标识是否需要发送数据chunk的偏移量。 private long seqno; //该变量用于标识数据包的序列号。 private boolean transferToAllowed = true; //该变量用于标识是否允许发送数据。 private boolean blockReadFully; //该变量用于标识是否读取完了整个数据Blocko private boolean verifyChecksum; //该变量用于标识是否需要进行校验和验证。4.2 成员方法
private int sendChunks(ByteBuffer pkt, int maxChunks, Outputstream out) throws IOException {
//计算数据包的长度
int len = Math.min((int) (endOffset - offset), bytesPerChecksum*maxChunks);
if (len == 0) {
return 0;
}
//计算这个数据包中应该包含有多少个校验数据块
int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum;
int packetLen = len + numChunks*checksumSize + 4;
pkt.clear();
//数据包头部信息写入缓存
pkt.putInt(packetLen);
pkt.putLong(offset);
pkt.putLong(seqno);
pkt.put((byte)((offset + len >= endoffset) ? 1 : 0));
pkt.putInt(len);
int checksumOff = pkt.position();
int checksumLen = numChunks * checksumsize;
byte[] buf = pkt.array();
//数据对应的校验和信息写入缓存
if (checksumSize > 0 && checksumIn != null) {
try {
checksumln.readFully(buf, checksumOff, checksumLen);
} catch (lOException e) {
LOG. warn (" Could not read or failed to veirfy checksum for data" + " at offset " + offset + " for block " + block. + " got : " + StringUtils.stringifyException(e));
IOUtils.closeStream(checksumIn);
checksumln = null;
if (corruptChecksumOk) {
if (checksumOff < checksumLen) {
Arrays.fill(buf, checksumOff, checksumLen, (byte) 0);
)
} else {
throw e;
}
}
}
int dataOff = checksumOff + checksumLen;
if (blocklnPosition < 0) {
//数据写入缓存
IOUtils.readFully(blockIn, buf, dataOff, len);
//对发送的数据验证校验和
if(verifyChecksum){
int dOff = dataOff;
int cOff = checksumOff;
int dLeft = len;
for (int i = 0; i < numChunks; i++){
checksum.reset();
int dLen = Math.min(dLeft, bytesPerChecksum);
checksum.update(buf, dOff, dLen);
if (!checksum.compare(buf, cOff)) throw new ChecksumException("Checksum failed at " + (offset + len - dLeft), len);}
dLeft -= dLen;
dOff += dLen;
cOff += checksumsize;
}
}
}
try (
if (blockInPosition >= 0) {
SocketOutputStream sockOut = (SocketOutputStream)out;
//发送缓存的数据包
sockOut.write(buf, 0, dataOff);
sockOut.transferToFully(((FilelnputStream)blockIn).getChannel(), blockInPosition, len);
blocklnPosition += len;
} else {
//发送缓存的数据包
out.write(buf, 0, dataOff + len);
}
} catch (IOException e) {
throw ioeToSocketException(e);
}
if (throttler != null) {
//调整发送速度
throttler.throttle(packetLen);
}
return len;
}
该方法用于发送一个数据包。
long sendBlock(DataOutputStream out, Outputstream baseStream, BlockTransferThrottler throttler) throws IOException {
if( out == null ) {
throw new IOException( "out stream is null");
}
this.throttler = throttler;
long initialoffset = offset;
long totalRead = 0;
OutputStream streamForSendChunks = out;
try {
try {
checksum.writeHeader(out); //发送校验器信息
if ( chunkOffsetOK ) {
out.writeLong( offset );
}
out.flush();
} catch (IOException e) { //socket error
throw ioeToSocketException(e);
}
int maxChunksPerPacket;
int pktSize = Datanode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
if (transferToAllowed && !verifyChecksum && baseStream instanceof SocketOutputStream && blockIn instanceof Fileinputstream) {
FileChannel fileChannel = ((FileInputstream)blockIn).getChannel();
blocklnPosition = fileChannel.position ();
streamForSendChunks = baseStream;
//计算一个数据包最多包含多少个数据校验快块
maxChunksPerPacket = (Math.max(BUFFER_SIZE, MIN_BUFFER_WITH_TRANSFERTO) + bytesPerChecksum -1)/bytesPerChecksum;
//计算一个数据包的大小
pktSize += checksumsize * maxChunksPerPacket;
} else {
//计算一个数据包最多包含多少个数据检验块
maxChunksPerPacket = Math.max(1, (BUFFER_SIZE + bytesPerChecksum - 1)/bytesPerChecksum);
//计算一个数据包的大小
pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket;
}
ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
//一个一个数据包发送数据
while (endoffset > offset) {
long len = sendChunks(pktBuf, maxChunksPerPacket, streamForSendChunks);
offset += len;
totalRead += len + ((len + bytesPerChecksum - 1)/bytesPerChecksum* checksumSize);
seqno++;
}
try {
out.writeInt(0);
out. flush ();
} catch (IOException e) ( //socket error
throw ioeToSocketException(e);
}
} finally {
if (clientTraceFmt != null) {
ClientTraceLog.info(String.format(clientTraceFmt, totalRead));
}
close ();
}
blockReadFully = (initialOffset == 0 && offset >= blockLength);
return totalRead;
}
该方法用于向接收端发送数据块。



