2021SC@SDUSC
- 一、ResourceTrackerService简介
- 二、ResourceTrackerService基本属性
- 三、ResourceTrackerService基本方法
- 3.1 serviceInit()方法
- 3.2 serviceStart()方法
- 3.3 registerNodeManager()方法
- 3.3 nodeHeartbeat()方法
- 3.4 unRegisterNodeManager()方法
NodeManager是执行在单个节点上的代理,负责管理Hadoop集群中的单个计算节点。主要有与ResourceManager保持通信,监控管理Container的生命周期和使用情况,管理日志和不同应用程序用到的附属服务等功能。
ResourceTrackerService负责处理来自NodeManager的请求,包括注册、心跳两种请求。注册是在NodeManager启动时发生的行为,请求包含节点id、可用的资源上限等信息。而心跳是周期性行为,包含各个Container的运行状态,运行的Application列表、节点健康状况等信息。ResourceTrackerService可为NodeManager返回待释放的Container列表、 Application列表等信息用来应答请求。
包括RM上下文信息、NodeList管理、NM监控等基本信息,还有心跳间隔以及相关的NM版本信息。
//org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService.Java //RM上下文 private final RMContext rmContext; //NodesListManager private final NodesListManager nodesListManager; //NM监控 private final NMLivelinessMonitor nmLivelinessMonitor; //安全 private final RMContainerTokenSecretManager containerTokenSecretManager; private final NMTokenSecretManagerInRM nmTokenSecretManager; //读锁 private final ReadLock readLock; //写锁 private final WriteLock writeLock; //下一次心跳间隔 private long nextHeartBeatInterval; //心跳间隔的相关参数,最大、最小值等 private boolean heartBeatIntervalScalingEnable; private long heartBeatIntervalMin; private long heartBeatIntervalMax; private float heartBeatIntervalSpeedupFactor; private float heartBeatIntervalSlowdownFactor; //rpc服务 private Server server; //绑定的地址 private InetSocketAddress resourceTrackerAddress; //最小的NM版本 private String minimumNodeManagerVersion; //最小分配内存 private int minAllocMb; //最小分配core private int minAllocVcores; private DecommissioningNodesWatcher decommissioningWatcher; private boolean isDistributedNodeLabelsConf; private boolean isDelegatedCentralizedNodeLabelsConf; private DynamicResourceConfiguration drConf; //timeline相关参数 private final AtomicLong timelineCollectorVersion = new AtomicLong(0); private boolean checkIpHostnameInRegistration; private boolean timelineServiceV2Enabled;
构造方法
public ResourceTrackerService(RMContext rmContext,
NodesListManager nodesListManager,
NMLivelinessMonitor nmLivelinessMonitor,
RMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInRM nmTokenSecretManager) {
super(ResourceTrackerService.class.getName());
//RM上下文
this.rmContext = rmContext;
//NodesListManager
this.nodesListManager = nodesListManager;
//NM监控
this.nmLivelinessMonitor = nmLivelinessMonitor;
this.containerTokenSecretManager = containerTokenSecretManager;
this.nmTokenSecretManager = nmTokenSecretManager;
//锁的读写操作
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
this.writeLock = lock.writeLock();
this.decommissioningWatcher = new DecommissioningNodesWatcher(rmContext);
}
三、ResourceTrackerService基本方法
3.1 serviceInit()方法
主要进行初始化,并分配最小内存和最小CPU
protected void serviceInit(Configuration conf) throws Exception {
//地址
resourceTrackerAddress = conf.getSocketAddr(
YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
//初始化操作
RackResolver.init(conf);
//检查ip
checkIpHostnameInRegistration = conf.getBoolean(
YarnConfiguration.RM_NM_REGISTRATION_IP_HOSTNAME_CHECK_KEY,
YarnConfiguration.DEFAULT_RM_NM_REGISTRATION_IP_HOSTNAME_CHECK_KEY);
//最小分配内存
minAllocMb = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
//最小分配CPU
minAllocVcores = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
//最小NM版本号
minimumNodeManagerVersion = conf.get(
YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION,
YarnConfiguration.DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION);
timelineServiceV2Enabled = YarnConfiguration.
timelineServiceV2Enabled(conf);
if (YarnConfiguration.areNodeLabelsEnabled(conf)) {
isDistributedNodeLabelsConf =
YarnConfiguration.isDistributedNodeLabelConfiguration(conf);
isDelegatedCentralizedNodeLabelsConf =
YarnConfiguration.isDelegatedCentralizedNodeLabelConfiguration(conf);
}
//更新心跳信息
updateHeartBeatConfiguration(conf);
//加载动态资源信息
loadDynamicResourceConfiguration(conf);
decommissioningWatcher.init(conf);
super.serviceInit(conf);
}
3.2 serviceStart()方法
主要启动RPC服务,端口号为0.0.0.0 :8031
@Override
protected void serviceStart() throws Exception {
super.serviceStart();
//如果启用了安全性,ResourceTrackerServer将通过Kerberos对NodeManager进行身份验证,因此没有secretManager
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
this.server = rpc.getServer(
ResourceTracker.class, this, resourceTrackerAddress, conf, null,
conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT));
//启用服务授权
if (conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
false)) {
InputStream inputStream =
this.rmContext.getConfigurationProvider()
.getConfigurationInputStream(conf,
YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE);
if (inputStream != null) {
conf.addResource(inputStream);
}
refreshServiceAcls(conf, RMPolicyProvider.getInstance());
}
this.server.start();
//更新连接信息
conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
server.getListenerAddress());
}
3.3 registerNodeManager()方法
注册NM方法,负责获取NM上汇报的各种资源信息(NodeId, host,port,memory,资源)等,并向RM进行注册、恢复。
@SuppressWarnings("unchecked")
@Override
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnException,
IOException {
//nodeId,端口号,node状态等信息,cm端口、http端口
NodeId nodeId = request.getNodeId();
String host = nodeId.getHost();
int cmPort = nodeId.getPort();
int httpPort = request.getHttpPort();
//获取资源容量
Resource capability = request.getResource();
//获取NM版本
String nodeManagerVersion = request.getNMVersion();
//获取物理资源
Resource physicalResource = request.getPhysicalResource();
NodeStatus nodeStatus = request.getNodeStatus();
//更新NM版本信息
RegisterNodeManagerResponse response = recordFactory
.newRecordInstance(RegisterNodeManagerResponse.class);
if (!minimumNodeManagerVersion.equals("NONE")) {
if (minimumNodeManagerVersion.equals("EqualToRM")) {
minimumNodeManagerVersion = YarnVersionInfo.getVersion();
}
if ((nodeManagerVersion == null) ||
(VersionUtil.compareVersions(nodeManagerVersion,minimumNodeManagerVersion)) < 0) {
String message =
"Disallowed NodeManager Version " + nodeManagerVersion
+ ", is less than the minimum version "
+ minimumNodeManagerVersion + " sending SHUTDOWN signal to "
+ "NodeManager.";
LOG.info(message);
response.setDiagnosticsMessage(message);
response.setNodeAction(NodeAction.SHUTDOWN);
return response;
}
}
//验证node是否有效
if (checkIpHostnameInRegistration) {
InetSocketAddress nmAddress =
NetUtils.createSocketAddrForHost(host, cmPort);
InetAddress inetAddress = Server.getRemoteIp();
if (inetAddress != null && nmAddress.isUnresolved()) {
//拒绝注册未解析的NM,以防止RM在分配时陷入困境
final String message =
"hostname cannot be resolved (ip=" + inetAddress.getHostAddress()
+ ", hostname=" + host + ")";
LOG.warn("Unresolved nodemanager registration: " + message);
response.setDiagnosticsMessage(message);
response.setNodeAction(NodeAction.SHUTDOWN);
return response;
}
}
//检查是否是一个有效的指针
if (!this.nodesListManager.isValidNode(host) &&
!isNodeInDecommissioning(nodeId)) {
String message =
"Disallowed NodeManager from " + host
+ ", Sending SHUTDOWN signal to the NodeManager.";
LOG.info(message);
response.setDiagnosticsMessage(message);
response.setNodeAction(NodeAction.SHUTDOWN);
return response;
}
//检查节点的容量是否从 dynamic-resources.xml 加载
String nid = nodeId.toString();
Resource dynamicLoadCapability = loadNodeResourceFromDRConfiguration(nid);
if (dynamicLoadCapability != null) {
LOG.debug("Resource for node: {} is adjusted from: {} to: {} due to"
+ " settings in dynamic-resources.xml.", nid, capability,
dynamicLoadCapability);
capability = dynamicLoadCapability;
//与新资源同步回退
response.setResource(capability);
}
//检测该node是否有最小资源限制 , 发送SHUTDOWN 指令
if (capability.getMemorySize() < minAllocMb
|| capability.getVirtualCores() < minAllocVcores) {
String message = "NodeManager from " + host
+ " doesn't satisfy minimum allocations, Sending SHUTDOWN"
+ " signal to the NodeManager. Node capabilities are " + capability
+ "; minimums are " + minAllocMb + "mb and " + minAllocVcores
+ " vcores";
LOG.info(message);
response.setDiagnosticsMessage(message);
response.setNodeAction(NodeAction.SHUTDOWN);
return response;
}
response.setContainerTokenMasterKey(containerTokenSecretManager
.getCurrentKey());
response.setNMTokenMasterKey(nmTokenSecretManager
.getCurrentKey());
//构建RMNode对象
RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort,
resolve(host), capability, nodeManagerVersion, physicalResource);
//构建旧的node对象
RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
if (oldNode == null) {
//新构建RMnode
RMNodeStartedEvent startEvent = new RMNodeStartedEvent(nodeId,
request.getNMContainerStatuses(),
request.getRunningApplications(), nodeStatus);
if (request.getLogAggregationReportsForApps() != null
&& !request.getLogAggregationReportsForApps().isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Found the number of previous cached log aggregation "
+ "status from nodemanager:" + nodeId + " is :"
+ request.getLogAggregationReportsForApps().size());
}
startEvent.setLogAggregationReportsForApps(request
.getLogAggregationReportsForApps());
}
this.rmContext.getDispatcher().getEventHandler().handle(
startEvent);
} else {
//移除注册
LOG.info("Reconnect from the node at: " + host);
this.nmLivelinessMonitor.unregister(nodeId);
if (CollectionUtils.isEmpty(request.getRunningApplications())
&& rmNode.getState() != NodeState.DECOMMISSIonING
&& rmNode.getHttpPort() != oldNode.getHttpPort()) {
// Reconnected node differs, so replace old node and start new node
switch (rmNode.getState()) {
case RUNNING:
ClusterMetrics.getMetrics().decrNumActiveNodes();
break;
case UNHEALTHY:
ClusterMetrics.getMetrics().decrNumUnhealthyNMs();
break;
default:
LOG.debug("Unexpected Rmnode state");
}
//重新连接
this.rmContext.getDispatcher().getEventHandler()
.handle(new NodeRemovedSchedulerEvent(rmNode));
this.rmContext.getRMNodes().put(nodeId, rmNode);
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeStartedEvent(nodeId, null, null, nodeStatus));
} else {
//重新设置heartbeat ID
oldNode.resetLastNodeHeartBeatResponse();
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeReconnectEvent(nodeId, rmNode,
request.getRunningApplications(),
request.getNMContainerStatuses()));
}
}
// 在每个节点管理器寄存器上,我们将清除任何正在运行的应用程序的NMToken密钥(如果存在)
this.nmTokenSecretManager.removeNodeKey(nodeId);
this.nmLivelinessMonitor.register(nodeId);
//处理接收到的容器状态,这应该在插入新RMNode后处理
if (!rmContext.isWorkPreservingRecoveryEnabled()) {
if (!request.getNMContainerStatuses().isEmpty()) {
LOG.info("received container statuses on node manager register :"
+ request.getNMContainerStatuses());
for (NMContainerStatus status : request.getNMContainerStatuses()) {
handleNMContainerStatus(status, nodeId);
}
}
}
//将节点标签更新到RM的NodeLabelManager
Set nodeLabels = NodeLabelsUtils.convertToStringSet(
request.getNodeLabels());
if (isDistributedNodeLabelsConf && nodeLabels != null) {
try {
updateNodeLabelsFromNMReport(nodeLabels, nodeId);
response.setAreNodeLabelsAcceptedByRM(true);
} catch (IOException ex) {
//确保在响应中捕获异常
response.setDiagnosticsMessage(ex.getMessage());
response.setAreNodeLabelsAcceptedByRM(false);
}
} else if (isDelegatedCentralizedNodeLabelsConf) {
this.rmContext.getRMDelegatedNodeLabelsUpdater().updateNodeLabels(nodeId);
}
// Update node's attributes to RM's NodeAttributesManager.
if (request.getNodeAttributes() != null) {
try {
// update node attributes if necessary then update heartbeat response
updateNodeAttributesIfNecessary(nodeId, request.getNodeAttributes());
response.setAreNodeAttributesAcceptedByRM(true);
} catch (IOException ex) {
//确保捕获并发送错误消息作为响应
String errorMsg = response.getDiagnosticsMessage() == null ?
ex.getMessage() :
response.getDiagnosticsMessage() + "n" + ex.getMessage();
response.setDiagnosticsMessage(errorMsg);
response.setAreNodeAttributesAcceptedByRM(false);
}
}
StringBuilder message = new StringBuilder();
message.append("NodeManager from node ").append(host).append("(cmPort: ")
.append(cmPort).append(" httpPort: ");
message.append(httpPort).append(") ")
.append("registered with capability: ").append(capability);
message.append(", assigned nodeId ").append(nodeId);
if (response.getAreNodeLabelsAcceptedByRM()) {
message.append(", node labels { ").append(
StringUtils.join(",", nodeLabels) + " } ");
}
if (response.getAreNodeAttributesAcceptedByRM()) {
message.append(", node attributes { ")
.append(request.getNodeAttributes() + " } ");
}
LOG.info(message.toString());
response.setNodeAction(NodeAction.NORMAL);
response.setRMIdentifier(ResourceManager.getClusterTimeStamp());
response.setRMVersion(YarnVersionInfo.getVersion());
return response;
}
3.3 nodeHeartbeat()方法
NodeManager 节点上的 NodeStatusUpdaterImpl进程, 会通过线程StatusUpdaterRunnable 中的run() 方法 . 定时向ResourceTrackerService发送心跳数据。
node心跳顺序:
1.检查node是否有效
2.检查是否注册,并更新心跳信息
3.检查是否为新的心跳,并且不是重复的
4.发送心跳状态给RMNode
5.若启用了分布式节点标签配置,则更新节点标签
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnException, IOException {
//node状态
NodeStatus remoteNodeStatus = request.getNodeStatus();
NodeId nodeId = remoteNodeStatus.getNodeId();
// 1. 检查node是否有效, 检查是否退役中
if (!this.nodesListManager.isValidNode(nodeId.getHost())
&& !isNodeInDecommissioning(nodeId)) {
String message =
"Disallowed NodeManager nodeId: " + nodeId + " hostname: "
+ nodeId.getHost();
LOG.info(message);
return YarnServerBuilderUtils.newNodeHeartbeatResponse(
NodeAction.SHUTDOWN, message);
}
// 2. 检查node是否被注册
RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
if (rmNode == null) {
//node不存在
String message = "Node not found resyncing " + remoteNodeStatus.getNodeId();
LOG.info(message);
return YarnServerBuilderUtils.newNodeHeartbeatResponse(NodeAction.RESYNC,
message);
}
//发送心跳数据给监控
this.nmLivelinessMonitor.receivedPing(nodeId);
this.decommissioningWatcher.update(rmNode, remoteNodeStatus);
// 3. 检查心跳是否为新的,而不是重复的
NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse();
if (getNextResponseId(
remoteNodeStatus.getResponseId()) == lastNodeHeartbeatResponse
.getResponseId()) {
LOG.info("Received duplicate heartbeat from node "
+ rmNode.getNodeAddress()+ " responseId=" + remoteNodeStatus.getResponseId());
return lastNodeHeartbeatResponse;
} else if (remoteNodeStatus.getResponseId() != lastNodeHeartbeatResponse
.getResponseId()) {
String message =
"Too far behind rm response id:"
+ lastNodeHeartbeatResponse.getResponseId() + " nm response id:"
+ remoteNodeStatus.getResponseId();
LOG.info(message);
// 直接给RMNode发送重启命令
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(nodeId, RMNodeEventType.REBOOTING));
return YarnServerBuilderUtils.newNodeHeartbeatResponse(NodeAction.RESYNC,
message);
}
// 检查rmNode是否在退役中或者已退役
if (rmNode.getState() == NodeState.DECOMMISSIonING &&
decommissioningWatcher.checkReadyToBeDecommissioned(
rmNode.getNodeID())) {
String message = "DECOMMISSIonING " + nodeId +
" is ready to be decommissioned";
LOG.info(message);
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
this.nmLivelinessMonitor.unregister(nodeId);
return YarnServerBuilderUtils.newNodeHeartbeatResponse(
NodeAction.SHUTDOWN, message);
}
if (timelineServiceV2Enabled) {
// 检查并且更新集合的请求信息
updateAppCollectorsMap(request);
}
// 构建心跳响应
long newInterval = nextHeartBeatInterval;
if (heartBeatIntervalScalingEnable) {
newInterval = rmNode.calculateHeartBeatInterval(
nextHeartBeatInterval, heartBeatIntervalMin,
heartBeatIntervalMax, heartBeatIntervalSpeedupFactor,
heartBeatIntervalSlowdownFactor);
}
NodeHeartbeatResponse nodeHeartBeatResponse =
YarnServerBuilderUtils.newNodeHeartbeatResponse(
getNextResponseId(lastNodeHeartbeatResponse.getResponseId()),
NodeAction.NORMAL, null, null, null, null, newInterval);
rmNode.setAndUpdateNodeHeartbeatResponse(nodeHeartBeatResponse);
populateKeys(request, nodeHeartBeatResponse);
populateTokenSequenceNo(request, nodeHeartBeatResponse);
if (timelineServiceV2Enabled) {
// 返回node需要的映射
setAppCollectorsMapToResponse(rmNode.getRunningApps(),
nodeHeartBeatResponse);
}
// 4. 发送响应给RMNode,并保存最后一次请求
RMNodeStatusEvent nodeStatusEvent =
new RMNodeStatusEvent(nodeId, remoteNodeStatus);
if (request.getLogAggregationReportsForApps() != null
&& !request.getLogAggregationReportsForApps().isEmpty()) {
nodeStatusEvent.setLogAggregationReportsForApps(request
.getLogAggregationReportsForApps());
}
this.rmContext.getDispatcher().getEventHandler().handle(nodeStatusEvent);
// 5. 更新node的标签信息
if (isDistributedNodeLabelsConf && request.getNodeLabels() != null) {
try {
updateNodeLabelsFromNMReport(
NodeLabelsUtils.convertToStringSet(request.getNodeLabels()),
nodeId);
nodeHeartBeatResponse.setAreNodeLabelsAcceptedByRM(true);
} catch (IOException ex) {
//确保捕获并发送错误消息作为响应
nodeHeartBeatResponse.setDiagnosticsMessage(ex.getMessage());
nodeHeartBeatResponse.setAreNodeLabelsAcceptedByRM(false);
}
}
// 6. 检查节点的容量是否是从dynamic-resources.xml 加载, 如果是的话,发送更新资源信息
String nid = nodeId.toString();
Resource capability = loadNodeResourceFromDRConfiguration(nid);
// 如果不为空,则与新资源同步回退
if (capability != null) {
nodeHeartBeatResponse.setResource(capability);
}
// Check if we got an event (AdminService) that updated the resources
if (rmNode.isUpdatedCapability()) {
nodeHeartBeatResponse.setResource(rmNode.getTotalCapability());
rmNode.resetUpdatedCapability();
}
// 7. 发送Container的数量限制给node, 如果超出node中队列限制,则进行截取操作
if (this.rmContext.getNodeManagerQueueLimitCalculator() != null) {
nodeHeartBeatResponse.setContainerQueuingLimit(
this.rmContext.getNodeManagerQueueLimitCalculator()
.createContainerQueuingLimit());
}
// 8. 获取node 属性, 并进行更新操作
if (request.getNodeAttributes() != null) {
try {
// 如果必要的话,更新node属性和心跳响应
updateNodeAttributesIfNecessary(nodeId, request.getNodeAttributes());
nodeHeartBeatResponse.setAreNodeAttributesAcceptedByRM(true);
} catch (IOException ex) {
//确保捕获并发送错误消息作为响应
String errorMsg =
nodeHeartBeatResponse.getDiagnosticsMessage() == null ?
ex.getMessage() :
nodeHeartBeatResponse.getDiagnosticsMessage() + "n" + ex
.getMessage();
nodeHeartBeatResponse.setDiagnosticsMessage(errorMsg);
nodeHeartBeatResponse.setAreNodeAttributesAcceptedByRM(false);
}
}
return nodeHeartBeatResponse;
}
3.4 unRegisterNodeManager()方法
根据请求信息中的nodeId,解除注册
public UnRegisterNodeManagerResponse unRegisterNodeManager(
UnRegisterNodeManagerRequest request) throws YarnException, IOException {
UnRegisterNodeManagerResponse response = recordFactory
.newRecordInstance(UnRegisterNodeManagerResponse.class);
//获取nodeId
NodeId nodeId = request.getNodeId();
//定义RMNode
RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
if (rmNode == null) {
LOG.info("Node not found, ignoring the unregister from node id : "
+ nodeId);
return response;
}
LOG.info("Node with node id : " + nodeId
+ " has shutdown, hence unregistering the node.");
this.nmLivelinessMonitor.unregister(nodeId);
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeEvent(nodeId, RMNodeEventType.SHUTDOWN));
return response;
}



