本文基于hadoop-3.3.0
目录
1 概述
2 注册过程
2.1 注册前置过程
2.2 注册过程
2.3 心跳发送
1 概述
dn在完成启动(源码阅读)之后,需要向nn进行注册,才能提供后续的数据存储服务,注册完成后定时进行心跳检测,确保dn的存活。
而注册的流程是在DataNode的构造函数中的startDataNode方法中调用refreshNamenode方法完成:
blockPoolManager.refreshNamenodes(getConf());
2 注册过程
2.1 注册前置过程
先看refreshNamenodes方法,获取到新的NS和NNLifeRpcAddress后调用doRefreshNamenodes完成后续操作
void refreshNamenodes(Configuration conf)
throws IOException {
LOG.info("Refresh request received for nameservices: " +
conf.get(DFSConfigKeys.DFS_NAMESERVICES));
Map> newAddressMap = null;
Map> newLifelineAddressMap = null;
try {
newAddressMap =
DFSUtil.getNNServiceRpcAddressesForCluster(conf);
newLifelineAddressMap =
DFSUtil.getNNLifelineRpcAddressesForCluster(conf);
} catch (IOException ioe) {
LOG.warn("Unable to get NameNode addresses.");
}
if (newAddressMap == null || newAddressMap.isEmpty()) {
throw new IOException("No services to connect, missing NameNode " +
"address.");
}
synchronized (refreshNamenodesLock) {
doRefreshNamenodes(newAddressMap, newLifelineAddressMap);
}
}
doRefreshNamenodes方法中主要包含5个步骤:
1. 对于每个新的名称服务,确定它是对现有 NS 的一组 NN 的更新,还是全新的名称服务
2. 我们目前拥有但不再存在的任何名称服务都需要删除
3. 开启新的ns
上面三步是在同步代码块中
4. 删除过时的ns,不在同步代码块,可能触发删除等操作,耗时多
5. 更新ns
在第三步的时候通过BPOfferService完成dn的注册和心跳(下一节)
private void doRefreshNamenodes(
Map> addrMap,
Map> lifelineAddrMap)
throws IOException {
assert Thread.holdsLock(refreshNamenodesLock);
Set toRefresh = Sets.newlinkedHashSet();
Set toAdd = Sets.newlinkedHashSet();
Set toRemove;
synchronized (this) {
// Step 1. For each of the new nameservices, figure out whether
// it's an update of the set of NNs for an existing NS,
// or an entirely new nameservice.
for (String nameserviceId : addrMap.keySet()) {
if (bpByNameserviceId.containsKey(nameserviceId)) {
toRefresh.add(nameserviceId);
} else {
toAdd.add(nameserviceId);
}
}
// Step 2. Any nameservices we currently have but are no longer present
// need to be removed.
toRemove = Sets.newHashSet(Sets.difference(
bpByNameserviceId.keySet(), addrMap.keySet()));
assert toRefresh.size() + toAdd.size() ==
addrMap.size() :
"toAdd: " + Joiner.on(",").useForNull("").join(toAdd) +
" toRemove: " + Joiner.on(",").useForNull("").join(toRemove) +
" toRefresh: " + Joiner.on(",").useForNull("").join(toRefresh);
// Step 3. Start new nameservices
if (!toAdd.isEmpty()) {
LOG.info("Starting BPOfferServices for nameservices: " +
Joiner.on(",").useForNull("").join(toAdd));
for (String nsToAdd : toAdd) {
Map nnIdToAddr = addrMap.get(nsToAdd);
Map nnIdToLifelineAddr =
lifelineAddrMap.get(nsToAdd);
ArrayList addrs =
Lists.newArrayListWithCapacity(nnIdToAddr.size());
ArrayList nnIds =
Lists.newArrayListWithCapacity(nnIdToAddr.size());
ArrayList lifelineAddrs =
Lists.newArrayListWithCapacity(nnIdToAddr.size());
for (String nnId : nnIdToAddr.keySet()) {
addrs.add(nnIdToAddr.get(nnId));
nnIds.add(nnId);
lifelineAddrs.add(nnIdToLifelineAddr != null ?
nnIdToLifelineAddr.get(nnId) : null);
}
// 根据传入的参数初始化BPOfferService,此方法是BlockPoolManager的成员函数,
// 而BlockPoolManager对象是在初始化DataNode的时候创建(构造函数中创建,创建时传入了dn)
BPOfferService bpos = createBPOS(nsToAdd, nnIds, addrs,
lifelineAddrs);
bpByNameserviceId.put(nsToAdd, bpos);
offerServices.add(bpos);
}
}
startAll();
}
// Step 4. Shut down old nameservices. This happens outside
// of the synchronized(this) lock since they need to call
// back to .remove() from another thread
if (!toRemove.isEmpty()) {
LOG.info("Stopping BPOfferServices for nameservices: " +
Joiner.on(",").useForNull("").join(toRemove));
for (String nsToRemove : toRemove) {
BPOfferService bpos = bpByNameserviceId.get(nsToRemove);
bpos.stop();
bpos.join();
// they will call remove on their own
}
}
// Step 5. Update nameservices whose NN list has changed
if (!toRefresh.isEmpty()) {
LOG.info("Refreshing list of NNs for nameservices: " +
Joiner.on(",").useForNull("").join(toRefresh));
for (String nsToRefresh : toRefresh) {
BPOfferService bpos = bpByNameserviceId.get(nsToRefresh);
Map nnIdToAddr = addrMap.get(nsToRefresh);
Map nnIdToLifelineAddr =
lifelineAddrMap.get(nsToRefresh);
ArrayList addrs =
Lists.newArrayListWithCapacity(nnIdToAddr.size());
ArrayList lifelineAddrs =
Lists.newArrayListWithCapacity(nnIdToAddr.size());
ArrayList nnIds = Lists.newArrayListWithCapacity(
nnIdToAddr.size());
for (String nnId : nnIdToAddr.keySet()) {
addrs.add(nnIdToAddr.get(nnId));
lifelineAddrs.add(nnIdToLifelineAddr != null ?
nnIdToLifelineAddr.get(nnId) : null);
nnIds.add(nnId);
}
try {
UserGroupInformation.getLoginUser()
.doAs(new PrivilegedExceptionAction
在这里调用BPOfferService的start方法,而后在start方法中调用BPServiceActor中的start方法
synchronized void startAll() throws IOException {
try {
UserGroupInformation.getLoginUser().doAs(
new PrivilegedExceptionAction
通过BPServiceActor#start方法可以看出,使用this对象作为线程启动,即调用BPServiceActor#run()方法:
这里包括两个流程:
- dn注册:connectToNNAndHandshake
- 发送心跳:offerService
@Override
public void run() {
LOG.info(this + " starting to offer service");
try {
while (true) {
// init stuff
try {
// setup storage
// bp连接到namenode,注册datanode
connectToNNAndHandshake();
break;
} catch (IOException ioe) {
// Initial handshake, storage recovery or registration failed
runningState = RunningState.INIT_FAILED;
if (shouldRetryInit()) {
// Retry until all namenode's of BPOS failed initialization
LOG.error("Initialization failed for " + this + " "
+ ioe.getLocalizedMessage());
sleepAndLogInterrupts(5000, "initializing");
} else {
runningState = RunningState.FAILED;
LOG.error("Initialization failed for " + this + ". Exiting. ", ioe);
return;
}
}
}
runningState = RunningState.RUNNING;
if (initialRegistrationComplete != null) {
initialRegistrationComplete.countDown();
}
while (shouldRun()) {
try {
offerService();
} catch (Exception ex) {
LOG.error("Exception in BPOfferService for " + this, ex);
sleepAndLogInterrupts(5000, "offering service");
}
}
runningState = RunningState.EXITED;
} catch (Throwable ex) {
LOG.warn("Unexpected exception in block pool " + this, ex);
runningState = RunningState.FAILED;
} finally {
LOG.warn("Ending block pool service for: " + this);
cleanUp();
}
}
// 注册dn到nn
private void connectToNNAndHandshake() throws IOException {
// get NN proxy
// 这里会创建一个DatanodeProtocolClientSideTranslatorPB对象
// 此对象的作用是将client端的请求转到rpc server端上
bpNamenode = dn.connectToNN(nnAddr);
// First phase of the handshake with NN - get the namespace
// info.
NamespaceInfo nsInfo = retrieveNamespaceInfo();
// Verify that this matches the other NN in this HA pair.
// This also initializes our block pool in the DN if we are
// the first NN connection for this BP.
bpos.verifyAndSetNamespaceInfo(this, nsInfo);
this.bpThread.setName(formatThreadName("heartbeating", nnAddr));
// Second phase of the handshake with the NN.
// 第二阶段的握手
register(nsInfo);
}
// 发送心跳
// 此处略,查看2.3节
2.2 注册过程
上一节提到了注册过程的前提流程,在这里终于走到了dn的注册过程中:
在这个方法中会完成dn的注册
void register(NamespaceInfo nsInfo) throws IOException {
// The handshake() phase loaded the block pool storage
// off disk - so update the bpRegistration object from that info
// 创建dn的注册信息到指定的blockPool
DatanodeRegistration newBpRegistration = bpos.createRegistration();
LOG.info(this + " beginning handshake with NN");
while (shouldRun()) {
try {
// Use returned registration from namenode with updated fields
// 调用DatanodeProtocolClientSideTranslatorPB#registerDatanode完成dn的注册
newBpRegistration = bpNamenode.registerDatanode(newBpRegistration);
newBpRegistration.setNamespaceInfo(nsInfo);
bpRegistration = newBpRegistration;
break;
} catch(EOFException e) { // namenode might have just restarted
LOG.info("Problem connecting to server: " + nnAddr + " :"
+ e.getLocalizedMessage());
} catch(SocketTimeoutException e) { // namenode is busy
LOG.info("Problem connecting to server: " + nnAddr);
} catch(RemoteException e) {
LOG.warn("RemoteException in register", e);
throw e;
} catch(IOException e) {
LOG.warn("Problem connecting to server: " + nnAddr);
}
// Try again in a second
sleepAndLogInterrupts(1000, "connecting to server");
}
if (bpRegistration == null) {
throw new IOException("DN shut down before block pool registered");
}
LOG.info(this + " successfully registered with NN");
bpos.registrationSucceeded(this, bpRegistration);
// reset lease id whenever registered to NN.
// ask for a new lease id at the next heartbeat.
fullBlockReportLeaseId = 0;
// random short delay - helps scatter the BR from all DNs
scheduler.scheduleBlockReport(dnConf.initialBlockReportDelayMs, true);
}
此方法完成dn的注册,过程中由于RpcEngine是使用的Protobuf,因此注册完成后会使用hadoop自行封装的PBHelper.convert将注册完成后的响应信息中的注册信息从protobuf中转换成DatanodeRegistration
// DatanodeProtocolClientSideTranslatorPB#registerDatanode
@Override
public DatanodeRegistration registerDatanode(DatanodeRegistration registration
) throws IOException {
// 封装注册信息,发送到rpc server(NameNodeRpcServer)
RegisterDatanodeRequestProto.Builder builder = RegisterDatanodeRequestProto
.newBuilder().setRegistration(PBHelper.convert(registration));
RegisterDatanodeResponseProto resp;
try {
resp = rpcProxy.registerDatanode(NULL_CONTROLLER, builder.build());
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
return PBHelper.convert(resp.getRegistration());
}
而后rpc server端接收到代理请求,这里是NameNodeRpcServer
// NameNodeRpcServer#registerDatanode
@Override // DatanodeProtocol
public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg)
throws IOException {
checkNNStartup();
verifySoftwareVersion(nodeReg);
namesystem.registerDatanode(nodeReg);
return nodeReg;
}
真实调用FSNamesystem的registerDatanode方法
void registerDatanode(DatanodeRegistration nodeReg) throws IOException {
writeLock();
try {
blockManager.registerDatanode(nodeReg);
} finally {
writeUnlock("registerDatanode");
}
}
// 而后调用blockManager#registerDataNode
public void registerDatanode(DatanodeRegistration nodeReg)
throws IOException {
assert namesystem.hasWriteLock();
// 调用DatanodeManager#registerDatanode完成最后的注册操作
datanodeManager.registerDatanode(nodeReg);
// 注册完成后还需要检测一下blockManager的安全模式
bmSafeMode.checkSafeMode();
}
最后看下DatanodeManager#registerDatanode方法,这里会根据三种不同的情况分别完成注册:
- 数据节点已经存在,但是用于服务新的数据存储,因此移除现在的
- 重复注册的情况,主要就是更新信息
- 处理从未注册的过的新节点注册的情况
public void registerDatanode(DatanodeRegistration nodeReg)
throws DisallowedDatanodeException, UnresolvedTopologyException {
InetAddress dnAddress = Server.getRemoteIp();
if (dnAddress != null) {
// Mostly called inside an RPC, update ip and peer hostname
String hostname = dnAddress.getHostName();
String ip = dnAddress.getHostAddress();
if (checkIpHostnameInRegistration && !isNameResolved(dnAddress)) {
// Reject registration of unresolved datanode to prevent performance
// impact of repetitive DNS lookups later.
final String message = "hostname cannot be resolved (ip="
+ ip + ", hostname=" + hostname + ")";
LOG.warn("Unresolved datanode registration: " + message);
throw new DisallowedDatanodeException(nodeReg, message);
}
// update node registration with the ip and hostname from rpc request
nodeReg.setIpAddr(ip);
nodeReg.setPeerHostName(hostname);
}
try {
nodeReg.setExportedKeys(blockManager.getBlockKeys());
// Checks if the node is not on the hosts list. If it is not, then
// it will be disallowed from registering.
if (!hostConfigManager.isIncluded(nodeReg)) {
throw new DisallowedDatanodeException(nodeReg);
}
NameNode.stateChangeLog.info("BLOCK* registerDatanode: from "
+ nodeReg + " storage " + nodeReg.getDatanodeUuid());
DatanodeDescriptor nodeS = getDatanode(nodeReg.getDatanodeUuid());
DatanodeDescriptor nodeN = host2DatanodeMap.getDatanodeByXferAddr(
nodeReg.getIpAddr(), nodeReg.getXferPort());
// 数据节点已经存在,但是用于服务新的数据存储,因此移除现在的
if (nodeN != null && nodeN != nodeS) {
NameNode.LOG.info("BLOCK* registerDatanode: " + nodeN);
// nodeN previously served a different data storage,
// which is not served by anybody anymore.
removeDatanode(nodeN);
// physically remove node from datanodeMap
wipeDatanode(nodeN);
nodeN = null;
}
// 重新注册的情况
if (nodeS != null) {
if (nodeN == nodeS) {
// The same datanode has been just restarted to serve the same data
// storage. We do not need to remove old data blocks, the delta will
// be calculated on the next block report from the datanode
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("BLOCK* registerDatanode: "
+ "node restarted.");
}
} else {
// nodeS is found
NameNode.stateChangeLog.info("BLOCK* registerDatanode: " + nodeS
+ " is replaced by " + nodeReg + " with the same storageID "
+ nodeReg.getDatanodeUuid());
}
boolean success = false;
// 更新一些dn描述信息
try {
// update cluster map
getNetworkTopology().remove(nodeS);
if(shouldCountVersion(nodeS)) {
decrementVersionCount(nodeS.getSoftwareVersion());
}
nodeS.updateRegInfo(nodeReg);
nodeS.setSoftwareVersion(nodeReg.getSoftwareVersion());
nodeS.setDisallowed(false); // Node is in the include list
// resolve network location
if(this.rejectUnresolvedTopologyDN) {
nodeS.setNetworkLocation(resolveNetworkLocation(nodeS));
nodeS.setDependentHostNames(getNetworkDependencies(nodeS));
} else {
nodeS.setNetworkLocation(
resolveNetworkLocationWithFallBackToDefaultLocation(nodeS));
nodeS.setDependentHostNames(
getNetworkDependenciesWithDefault(nodeS));
}
getNetworkTopology().add(nodeS);
resolveUpgradeDomain(nodeS);
// also treat the registration message as a heartbeat
heartbeatManager.register(nodeS);
incrementVersionCount(nodeS.getSoftwareVersion());
startAdminOperationIfNecessary(nodeS);
success = true;
} finally {
if (!success) {
removeDatanode(nodeS);
wipeDatanode(nodeS);
countSoftwareVersions();
}
}
return;
}
DatanodeDescriptor nodeDescr
= new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK);
boolean success = false;
try {
// 接下来处理完全新的注册节点
// resolve network location
//解析网络信息,将其加入集群的网络拓扑中
if(this.rejectUnresolvedTopologyDN) {
nodeDescr.setNetworkLocation(resolveNetworkLocation(nodeDescr));
nodeDescr.setDependentHostNames(getNetworkDependencies(nodeDescr));
} else {
nodeDescr.setNetworkLocation(
resolveNetworkLocationWithFallBackToDefaultLocation(nodeDescr));
nodeDescr.setDependentHostNames(
getNetworkDependenciesWithDefault(nodeDescr));
}
nodeDescr.setSoftwareVersion(nodeReg.getSoftwareVersion());
resolveUpgradeDomain(nodeDescr);
// register new datanode
// 这里调用addDatanode方法将dn添加到网络拓扑中
addDatanode(nodeDescr);
blockManager.getBlockReportLeaseManager().register(nodeDescr);
// also treat the registration message as a heartbeat
// no need to update its timestamp
// because its is done when the descriptor is created
// 这里将注册完成的dn添加到heartbeatManager中的dn数组中,便于后续维护dns的心跳
heartbeatManager.addDatanode(nodeDescr);
heartbeatManager.updateDnStat(nodeDescr);
incrementVersionCount(nodeReg.getSoftwareVersion());
startAdminOperationIfNecessary(nodeDescr);
success = true;
} finally {
if (!success) {
removeDatanode(nodeDescr);
wipeDatanode(nodeDescr);
countSoftwareVersions();
}
}
} catch (InvalidTopologyException e) {
// If the network location is invalid, clear the cached mappings
// so that we have a chance to re-add this DataNode with the
// correct network location later.
List invalidNodeNames = new ArrayList<>(3);
// clear cache for nodes in IP or Hostname
invalidNodeNames.add(nodeReg.getIpAddr());
invalidNodeNames.add(nodeReg.getHostName());
invalidNodeNames.add(nodeReg.getPeerHostName());
dnsToSwitchMapping.reloadCachedMappings(invalidNodeNames);
throw e;
}
}
addDatanode方法
void addDatanode(final DatanodeDescriptor node) {
// To keep host2DatanodeMap consistent with datanodeMap,
// remove from host2DatanodeMap the datanodeDescriptor removed
// from datanodeMap before adding node to host2DatanodeMap.
// 在后面将node添加到host2DatanodeMap中之前先将node移除,防止冲突
synchronized(this) {
host2DatanodeMap.remove(datanodeMap.put(node.getDatanodeUuid(), node));
}
networktopology.add(node); // may throw InvalidTopologyException
host2DatanodeMap.add(node);
// 检查集群是否是多机架
checkIfClusterIsNowMultiRack(node);
resolveUpgradeDomain(node);
if (LOG.isDebugEnabled()) {
LOG.debug(getClass().getSimpleName() + ".addDatanode: "
+ "node " + node + " is added to datanodeMap.");
}
}
2.3 心跳发送
根据2.1节中的run方法,心跳发送主要是BPServiceActor中的offerService方法
private void offerService() throws Exception {
LOG.info("For namenode " + nnAddr + " using"
+ " BLOCKREPORT_INTERVAL of " + dnConf.blockReportInterval + "msecs"
+ " CACHEREPORT_INTERVAL of " + dnConf.cacheReportInterval + "msecs"
+ " Initial delay: " + dnConf.initialBlockReportDelayMs + "msecs"
+ "; heartBeatInterval=" + dnConf.heartBeatInterval
+ (lifelineSender != null ?
"; lifelineIntervalMs=" + dnConf.getLifelineIntervalMs() : ""));
//
// Now loop for a long time....
//
while (shouldRun()) {
try {
DataNodeFaultInjector.get().startOfferService();
final long startTime = scheduler.monotonicNow();
//
// Every so often, send heartbeat or block-report
// 是否发送心跳,通过初始化Scheduler时设置的nextHeartbeatTime(
// nextHeartbeatTime + heartbeatIntervalMs(默认3s))
// - startTime 是否小于等于0判断
final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime);
HeartbeatResponse resp = null;
if (sendHeartbeat) {
//
// All heartbeat messages include following info:
// -- Datanode name
// -- data transfer port
// -- Total capacity
// -- Bytes remaining
//
boolean requestBlockReportLease = (fullBlockReportLeaseId == 0) &&
scheduler.isBlockReportDue(startTime);
if (!dn.areHeartbeatsDisabledForTests()) {
// 发送心跳
resp = sendHeartBeat(requestBlockReportLease);
assert resp != null;
if (resp.getFullBlockReportLeaseId() != 0) {
if (fullBlockReportLeaseId != 0) {
LOG.warn(nnAddr + " sent back a full block report lease " +
"ID of 0x" +
Long.toHexString(resp.getFullBlockReportLeaseId()) +
", but we already have a lease ID of 0x" +
Long.toHexString(fullBlockReportLeaseId) + ". " +
"Overwriting old lease ID.");
}
fullBlockReportLeaseId = resp.getFullBlockReportLeaseId();
}
dn.getMetrics().addHeartbeat(scheduler.monotonicNow() - startTime,
getRpcMetricSuffix());
// If the state of this NN has changed (eg STANDBY->ACTIVE)
// then let the BPOfferService update itself.
//
// important that this happens before processCommand below,
// since the first heartbeat to a new active might have commands
// that we should actually process.
bpos.updateActorStatesFromHeartbeat(
this, resp.getNameNodeHaState());
state = resp.getNameNodeHaState().getState();
if (state == HAServiceState.ACTIVE) {
handleRollingUpgradeStatus(resp);
}
commandProcessingThread.enqueue(resp.getCommands());
}
}
if (!dn.areIBRDisabledForTests() &&
(ibrManager.sendImmediately()|| sendHeartbeat)) {
ibrManager.sendIBRs(bpNamenode, bpRegistration,
bpos.getBlockPoolId(), getRpcMetricSuffix());
}
List cmds = null;
boolean forceFullBr =
scheduler.forceFullBlockReport.getAndSet(false);
if (forceFullBr) {
LOG.info("Forcing a full block report to " + nnAddr);
}
if ((fullBlockReportLeaseId != 0) || forceFullBr) {
cmds = blockReport(fullBlockReportLeaseId);
fullBlockReportLeaseId = 0;
}
commandProcessingThread.enqueue(cmds);
if (!dn.areCacheReportsDisabledForTests()) {
DatanodeCommand cmd = cacheReport();
commandProcessingThread.enqueue(cmd);
}
if (sendHeartbeat) {
dn.getMetrics().addHeartbeatTotal(
scheduler.monotonicNow() - startTime, getRpcMetricSuffix());
}
// There is no work to do; sleep until hearbeat timer elapses,
// or work arrives, and then iterate again.
ibrManager.waitTillNextIBR(scheduler.getHeartbeatWaitTime());
} catch(RemoteException re) {
String reClass = re.getClassName();
if (UnregisteredNodeException.class.getName().equals(reClass) ||
DisallowedDatanodeException.class.getName().equals(reClass) ||
IncorrectVersionException.class.getName().equals(reClass)) {
LOG.warn(this + " is shutting down", re);
shouldServiceRun = false;
return;
}
LOG.warn("RemoteException in offerService", re);
sleepAfterException();
} catch (IOException e) {
LOG.warn("IOException in offerService", e);
sleepAfterException();
} finally {
DataNodeFaultInjector.get().endOfferService();
}
processQueueMessages();
} // while (shouldRun())
} // offerService
进入BPServiceActor#sendHeartBeat
HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
throws IOException {
scheduler.scheduleNextHeartbeat();
StorageReport[] reports =
dn.getFSDataset().getStorageReports(bpos.getBlockPoolId());
if (LOG.isDebugEnabled()) {
LOG.debug("Sending heartbeat with " + reports.length +
" storage reports from service actor: " + this);
}
final long now = monotonicNow();
scheduler.updateLastHeartbeatTime(now);
VolumeFailureSummary volumeFailureSummary = dn.getFSDataset()
.getVolumeFailureSummary();
int numFailedVolumes = volumeFailureSummary != null ?
volumeFailureSummary.getFailedStorageLocations().length : 0;
final boolean outliersReportDue = scheduler.isOutliersReportDue(now);
final SlowPeerReports slowPeers =
outliersReportDue && dn.getPeerMetrics() != null ?
SlowPeerReports.create(dn.getPeerMetrics().getOutliers()) :
SlowPeerReports.EMPTY_REPORT;
// 获取各个disk的延迟数据,包括readio writeio metadataio
final SlowDiskReports slowDisks =
outliersReportDue && dn.getDiskMetrics() != null ?
SlowDiskReports.create(dn.getDiskMetrics().getDiskOutliersStats()) :
SlowDiskReports.EMPTY_REPORT;
// 发送心跳,bpNamenode是DatanodeProtocolClientSideTranslatorPB实例,
// 最终调用namenodeRpcServer的sendHeartbeat
HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
reports,
dn.getFSDataset().getCacheCapacity(),
dn.getFSDataset().getCacheUsed(),
dn.getXmitsInProgress(),
dn.getXceiverCount(),
numFailedVolumes,
volumeFailureSummary,
requestBlockReportLease,
slowPeers,
slowDisks);
if (outliersReportDue) {
// If the report was due and successfully sent, schedule the next one.
scheduler.scheduleNextOutlierReport();
}
return response;
}
而后bpNamenode.sendHeartBeat发送心跳,NameNodeRpcServer接受到心跳:
// NameNodeRpcServer#sendHeartbeat
@Override // DatanodeProtocol
public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] report, long dnCacheCapacity, long dnCacheUsed,
int xmitsInProgress, int xceiverCount,
int failedVolumes, VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease,
@Nonnull SlowPeerReports slowPeers,
@Nonnull SlowDiskReports slowDisks) throws IOException {
checkNNStartup();
verifyRequest(nodeReg);
// 这里调用FSNamesystem#handleHeartbeat
return namesystem.handleHeartbeat(nodeReg, report,
dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
failedVolumes, volumeFailureSummary, requestFullBlockReportLease,
slowPeers, slowDisks);
}
FSNamesystem#handleHeartbeat方法处理心跳,在此过程中主要是调用DatanodeManager#handleHeartbeat
HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] reports, long cacheCapacity, long cacheUsed,
int xceiverCount, int xmitsInProgress, int failedVolumes,
VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease,
@Nonnull SlowPeerReports slowPeers,
@Nonnull SlowDiskReports slowDisks) throws IOException {
readLock();
try {
//get datanode commands
final int maxTransfer = blockManager.getMaxReplicationStreams()
- xmitsInProgress;
// 这里调用DatanodeManager#handleHeartbeat方法处理心跳,返回心跳过程中携带的命令
DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed,
xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary,
slowPeers, slowDisks);
long blockReportLeaseId = 0;
if (requestFullBlockReportLease) {
blockReportLeaseId = blockManager.requestBlockReportLeaseId(nodeReg);
}
//create ha status
final NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat(
haContext.getState().getServiceState(),
getFSImage().getCorrectLastAppliedOrWrittenTxId());
// 返回一个心跳响应
return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo,
blockReportLeaseId);
} finally {
readUnlock("handleHeartbeat");
}
}
在DatanodeManager#handleHeartbeat中心跳的具体流程如下:
- 先获取datanode的信息,判断是否允许连接(比如在exclude中),如果不允许的话,直接抛出异常。
- 判断是否注册过,如果没注册过,直接返回注册命令
- 更新datanode的信息,主要就是更新DatanodeDescriptor中的信息,如使用空间,剩余空间等。
- 检查是否处于安全模式
- 获取块恢复命令:blockRecoverCommand
- 针对block执行处理命令
- 获取numReplicationTasks个普通block做后续处理
- 判断block是否被移除
- 生成复制命令
- 获取numECTasks个ecblock做后续处理
- 获取numReplicationTasks个普通block做后续处理
- 生成删除的命令
- 生成缓存相关的命令
- 生成带宽相关的命令
- 返回所有的命令
public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] reports, final String blockPoolId,
long cacheCapacity, long cacheUsed, int xceiverCount,
int maxTransfers, int failedVolumes,
VolumeFailureSummary volumeFailureSummary,
@Nonnull SlowPeerReports slowPeers,
@Nonnull SlowDiskReports slowDisks) throws IOException {
final DatanodeDescriptor nodeinfo;
try {
// 先获取datanode的信息
nodeinfo = getDatanode(nodeReg);
} catch (UnregisteredNodeException e) {
return new DatanodeCommand[]{RegisterCommand.REGISTER};
}
// Check if this datanode should actually be shutdown instead.
// 1.判断是否允许连接,如果不允许的话,直接抛出异常
if (nodeinfo != null && nodeinfo.isDisallowed()) {
setDatanodeDead(nodeinfo);
throw new DisallowedDatanodeException(nodeinfo);
}
// 2. 判断是否注册过,如果没注册过,直接返回注册命令
if (nodeinfo == null || !nodeinfo.isRegistered()) {
return new DatanodeCommand[]{RegisterCommand.REGISTER};
}
// 3.更新dn信息
heartbeatManager.updateHeartbeat(nodeinfo, reports, cacheCapacity,
cacheUsed, xceiverCount, failedVolumes, volumeFailureSummary);
// If we are in safemode, do not send back any recovery / replication
// requests. Don't even drain the existing queue of work.
// 4.检查是否处于安全模式
if (namesystem.isInSafeMode()) {
return new DatanodeCommand[0];
}
// block recovery command
// 5. 获取blockRecoverCommand
final BlockRecoveryCommand brCommand = getBlockRecoveryCommand(blockPoolId,
nodeinfo);
if (brCommand != null) {
return new DatanodeCommand[]{brCommand};
}
final List cmds = new ArrayList<>();
// Allocate _approximately_ maxTransfers pending tasks to DataNode.
// NN chooses pending tasks based on the ratio between the lengths of
// replication and erasure-coded block queues.
int totalReplicateBlocks = nodeinfo.getNumberOfReplicateBlocks();
int totalECBlocks = nodeinfo.getNumberOfBlocksToBeErasureCoded();
int totalBlocks = totalReplicateBlocks + totalECBlocks;
// 6. 针对block执行处理命令
if (totalBlocks > 0) {
int numReplicationTasks = (int) Math.ceil(
(double) (totalReplicateBlocks * maxTransfers) / totalBlocks);
int numECTasks = (int) Math.ceil(
(double) (totalECBlocks * maxTransfers) / totalBlocks);
if (LOG.isDebugEnabled()) {
LOG.debug("Pending replication tasks: " + numReplicationTasks
+ " erasure-coded tasks: " + numECTasks);
}
// check pending replication tasks
// 获取numReplicationTasks个block做后续处理
List pendingList = nodeinfo.getReplicationCommand(
numReplicationTasks);
if (pendingList != null && !pendingList.isEmpty()) {
// If the block is deleted, the block size will become
// BlockCommand.NO_ACK (LONG.MAX_VALUE) . This kind of block we don't
// need
// to send for replication or reconstruction
Iterator iterator = pendingList.iterator();
while (iterator.hasNext()) {
BlockTargetPair cmd = iterator.next();
// 判断是否已经被移除了
if (cmd.block != null
&& cmd.block.getNumBytes() == BlockCommand.NO_ACK) {
// block deleted
DatanodeStorageInfo.decrementBlocksScheduled(cmd.targets);
iterator.remove();
}
}
// 添加复制的命令
if (!pendingList.isEmpty()) {
cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
pendingList));
}
}
// check pending erasure coding tasks
// 获取numEcTasks个ecBlock做后续处理
List pendingECList = nodeinfo
.getErasureCodeCommand(numECTasks);
if (pendingECList != null && !pendingECList.isEmpty()) {
cmds.add(new BlockECReconstructionCommand(
DNA_ERASURE_CODING_RECONSTRUCTION, pendingECList));
}
}
// check block invalidation
// 7.添加删除块的命令
Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
if (blks != null) {
cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, blockPoolId,
blks));
}
// cache commands
// 8.添加缓存相关命令
addCacheCommands(blockPoolId, nodeinfo, cmds);
// key update command
// 更新access key
blockManager.addKeyUpdateCommand(cmds, nodeinfo);
// check for balancer bandwidth update
// 9.添加带宽相关命令
if (nodeinfo.getBalancerBandwidth() > 0) {
cmds.add(new BalancerBandwidthCommand(nodeinfo.getBalancerBandwidth()));
// set back to 0 to indicate that datanode has been sent the new value
nodeinfo.setBalancerBandwidth(0);
}
if (slowPeerTracker != null) {
final Map slowPeersMap = slowPeers.getSlowPeers();
if (!slowPeersMap.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("DataNode " + nodeReg + " reported slow peers: " +
slowPeersMap);
}
for (String slowNodeId : slowPeersMap.keySet()) {
slowPeerTracker.addReport(slowNodeId, nodeReg.getIpcAddr(false));
}
}
}
if (slowDiskTracker != null) {
if (!slowDisks.getSlowDisks().isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("DataNode " + nodeReg + " reported slow disks: " +
slowDisks.getSlowDisks());
}
slowDiskTracker.addSlowDiskReport(nodeReg.getIpcAddr(false), slowDisks);
}
slowDiskTracker.checkAndUpdateReportIfNecessary();
}
// 10. 返回所有命令
if (!cmds.isEmpty()) {
return cmds.toArray(new DatanodeCommand[cmds.size()]);
}
return new DatanodeCommand[0];
}



