2021SC@SDUSC
- 一、ApplicationMasterService简介
- 二、ApplicationMasterService接口
- 三、ApplicationMasterService源码分析
- 3.1 构造方法
- 3.2 属性
- 3.3 serviceInit 方法
- 3.4 serviceStart方法
- 3.5 registerApplicationMaster方法
- 3.6 finishApplicationMaster方法
- 3.7 allocate方法
上一章,介绍了ApplicationMasterLauncher的有关内容,ApplicationMasterLauncher是ApplicationMaster管理的一部分,它主要负责与NodeManager通信,以完成ApplicationMaster的启动。而ApplicationMasterService主要是处理来自ApplicationMaster的请求,包括注册和心跳两种请求。其中注册请求包含Application启动节点、对外RPC端口号和tracking URL等信息,心跳是周期性行为,汇报信息包含所需资源描述、待释放的Container列表、黑名单列表等。ApplicationMasterService为返回新分配的Container、失败的Container、待抢占的Container列表等信息。
ApplicationMasterProtocol协议是AM与RM通信的协议,用于对所有提交的AM进行管理。主要有以下任务:
注册新的AM、来自任意正在结束的AM的终止/取消注册请求、认证来自不同AM的所有请求,
确保合法的AM发送的请求传递给RM中的应用程序对象、获取来自所有运行AM的Container的分配和释放请求、异步的转发给Yarn调度器。
ApplicaitonMaster Service确保了任意时间点、任意AM只有一个线程可以发送请求给RM,因为在RM上所有来自AM的RPC请求都串行化了。
该协议主要有三个方法:
registerApplicationMaster,finishApplicationMaster,allocate
//新的ApplicationMaster向RM注册
//ApplicationMaster会提供RPC端口,url等信息给RM,响应信息会返回集群所能响应的最大资源能力
public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request)
throws YarnException, IOException;
//ApplicationMaster通知RM自己的状态为成功/失败
public FinishApplicationMasterResponse finishApplicationMaster(
FinishApplicationMasterRequest request)
throws YarnException, IOException;
//ApplicationMaster向ResourceManager申请资源/心跳
public AllocateResponse allocate(AllocateRequest request)
throws YarnException, IOException;
三、ApplicationMasterService源码分析
3.1 构造方法
通过ResourceManager 的serviceInit 方法构建
public ApplicationMasterService(RMContext rmContext,
YarnScheduler scheduler) {
this(ApplicationMasterService.class.getName(), rmContext, scheduler);
}
public ApplicationMasterService(String name, RMContext rmContext,
YarnScheduler scheduler) {
super(name);
this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
this.rScheduler = scheduler;
this.rmContext = rmContext;
// AMSProcessingChain通过责任链模式处理ApplicationMaster的注册
// 责任链上的processor的头节点目前是DefaultAMSProcessor
this.amsProcessingChain = new AMSProcessingChain(new DefaultAMSProcessor());
}
3.2 属性
// AM监控 private final AMLivelinessMonitor amLivelinessMonitor; // 调度器 private YarnScheduler rScheduler; // 接口地址 protected InetSocketAddress masterServiceAddress; // 服务实体 protected Server server; protected final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); // 存储响应实体 private final ConcurrentMap3.3 serviceInit 方法responseMap = new ConcurrentHashMap (); // ApplicationAttemptId 状态 private final ConcurrentHashMap finishedAttemptCache = new ConcurrentHashMap<>(); // RM信息 protected final RMContext rmContext; // 存放AM的处理Chain private final AMSProcessingChain amsProcessingChain; // 是否启用timelineServiceV2 , 默认 false private boolean timelineServiceV2Enabled;
初始化了masterServiceAddress 服务地址: 0.0.0.0/0.0.0.0:8030。然后开始初始化 initializeProcessingChain
初始化了masterServiceAddress 服务地址
@Override
protected void serviceInit(Configuration conf) throws Exception {
// 构建 rpc 服务
// 0.0.0.0/0.0.0.0:8030
masterServiceAddress = conf.getSocketAddr(
YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
// 初始化amsProcessingChain
initializeProcessingChain(conf);
}
初始化initializeProcessingChain
private void initializeProcessingChain(Configuration conf) {
amsProcessingChain.init(rmContext, null);
// 处理放置策略, 默认 拒绝
// yarn.resourcemanager.placement-constraints.handler : disabled
addPlacementConstraintHandler(conf);
// 从配置文件中获取 ApplicationMasterServiceProcessor 添加到amsProcessingChain 中.
List processors = getProcessorList(conf);
if (processors != null) {
Collections.reverse(processors);
for (ApplicationMasterServiceProcessor p : processors) {
// Ensure only single instance of PlacementProcessor is included
if (p instanceof AbstractPlacementProcessor) {
LOG.warn("Found PlacementProcessor=" + p.getClass().getCanonicalName()
+ " defined in "
+ YarnConfiguration.RM_APPLICATION_MASTER_SERVICE_PROCESSORS
+ ", however PlacementProcessor handler should be configured "
+ "by using " + YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER
+ ", this processor will be ignored.");
continue;
}
this.amsProcessingChain.addProcessor(p);
}
}
}
3.4 serviceStart方法
核心就是启动server服务: BoYi-Pro.local/192.168.xx.xxx:8030
@Override
protected void serviceStart() throws Exception {
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
Configuration serverConf = conf;
// If the auth is not-simple, enforce it to be token-based.
serverConf = new Configuration(conf);
serverConf.set( CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, SaslRpcServer.AuthMethod.TOKEN.toString());
// ProtobufRpcEngin$Server ==> 0.0.0.0: 8030
this.server = getServer(rpc, serverConf, masterServiceAddress, this.rmContext.getAMRMTokenSecretManager());
// TODO more exceptions could be added later.
this.server.addTerseExceptions(ApplicationMasterNotRegisteredException.class);
// Enable service authorization?
if (conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
false)) {
InputStream inputStream =
this.rmContext.getConfigurationProvider()
.getConfigurationInputStream(conf,
YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE);
if (inputStream != null) {
conf.addResource(inputStream);
}
refreshServiceAcls(conf, RMPolicyProvider.getInstance());
}
this.server.start();
// 刷新配置 BoYi-Pro.local/192.168.xx.xxx:8030
this.masterServiceAddress = conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
server.getListenerAddress());
this.timelineServiceV2Enabled = YarnConfiguration.timelineServiceV2Enabled(conf);
super.serviceStart();
}
3.5 registerApplicationMaster方法
ApplicationMasterProtocol协议中定义的方法. 用于注册application.
获取注册请求, 校验通过之后, 构建response , 然后使用amsProcessingChain进行注册操作。
public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request) throws YarnException,
IOException {
AMRMTokenIdentifier amrmTokenIdentifier =
YarnServerSecurityUtils.authorizeRequest();
ApplicationAttemptId applicationAttemptId =
amrmTokenIdentifier.getApplicationAttemptId();
// 获取ApplicationId
ApplicationId appID = applicationAttemptId.getApplicationId();
AllocateResponseLock lock = responseMap.get(applicationAttemptId);
if (lock == null) {
RMAuditLogger.logFailure(this.rmContext.getRMApps().get(appID).getUser(),
AuditConstants.REGISTER_AM, "Application doesn't exist in cache "
+ applicationAttemptId, "ApplicationMasterService",
"Error in registering application master", appID,
applicationAttemptId);
throwApplicationDoesNotExistInCacheException(applicationAttemptId);
}
// 同一时间只能有一个线程注册
// Allow only one thread in AM to do registerApp at a time.
synchronized (lock) {
AllocateResponse lastResponse = lock.getAllocateResponse();
if (hasApplicationMasterRegistered(applicationAttemptId)) {
// allow UAM re-register if work preservation is enabled
ApplicationSubmissionContext appContext =
rmContext.getRMApps().get(appID).getApplicationSubmissionContext();
if (!(appContext.getUnmanagedAM()
&& appContext.getKeepContainersAcrossApplicationAttempts())) {
String message =
AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE + appID;
LOG.warn(message);
RMAuditLogger.logFailure(
this.rmContext.getRMApps().get(appID).getUser(),
AuditConstants.REGISTER_AM, "", "ApplicationMasterService",
message, appID, applicationAttemptId);
throw new InvalidApplicationMasterRequestException(message);
}
}
// 更新心跳时间
this.amLivelinessMonitor.receivedPing(applicationAttemptId);
// 将 response id 设置为0,以标识应用程序主机是否注册了相应的 attemptid
lastResponse.setResponseId(0);
// 更新lastResponse
lock.setAllocateResponse(lastResponse);
RegisterApplicationMasterResponse response =
recordFactory.newRecordInstance(
RegisterApplicationMasterResponse.class);
// 执行注册操作
this.amsProcessingChain.registerApplicationMaster(amrmTokenIdentifier.getApplicationAttemptId(), request, response);
return response;
}
}
3.6 finishApplicationMaster方法
ApplicationMasterProtocol协议中定义的方法. 用于 App Master 通知ApplicationMasterService .
直接调用this.amsProcessingChain .finishApplicationMaster 执行注册操作。
@Override
public FinishApplicationMasterResponse finishApplicationMaster(
FinishApplicationMasterRequest request) throws YarnException,
IOException {
// 获取 applicationAttemptId
ApplicationAttemptId applicationAttemptId =
YarnServerSecurityUtils.authorizeRequest().getApplicationAttemptId();
// 获取 ApplicationId
ApplicationId appId = applicationAttemptId.getApplicationId();
// 获取RMApp
RMApp rmApp =
rmContext.getRMApps().get(applicationAttemptId.getApplicationId());
// Remove collector address when app get finished.
if (timelineServiceV2Enabled) {
((RMAppImpl) rmApp).removeCollectorData();
}
if (rmApp.isAppFinalStateStored()) {
LOG.info(rmApp.getApplicationId() + " unregistered successfully. ");
return FinishApplicationMasterResponse.newInstance(true);
}
AllocateResponseLock lock = responseMap.get(applicationAttemptId);
if (lock == null) {
throwApplicationDoesNotExistInCacheException(applicationAttemptId);
}
// Allow only one thread in AM to do finishApp at a time.
synchronized (lock) {
if (!hasApplicationMasterRegistered(applicationAttemptId)) {
String message =
"Application Master is trying to unregister before registering for: "
+ appId;
LOG.error(message);
RMAuditLogger.logFailure(
this.rmContext.getRMApps()
.get(appId).getUser(),
AuditConstants.UNREGISTER_AM, "", "ApplicationMasterService",
message, appId,
applicationAttemptId);
throw new ApplicationMasterNotRegisteredException(message);
}
FinishApplicationMasterResponse response =
FinishApplicationMasterResponse.newInstance(false);
// finishedAttemptCache 是否存在applicationAttemptId
if (finishedAttemptCache.putIfAbsent(applicationAttemptId, true)
== null) {
// 没有处理过,直接处理
this.amsProcessingChain
.finishApplicationMaster(applicationAttemptId, request, response);
}
// 处理监控心跳
this.amLivelinessMonitor.receivedPing(applicationAttemptId);
return response;
}
}
3.7 allocate方法
还是调用amsProcessingChain.allocate 处理请求。
@Override
public AllocateResponse allocate(AllocateRequest request)
throws YarnException, IOException {
AMRMTokenIdentifier amrmTokenIdentifier = YarnServerSecurityUtils.authorizeRequest();
ApplicationAttemptId appAttemptId = amrmTokenIdentifier.getApplicationAttemptId();
// 更新心跳时间
this.amLivelinessMonitor.receivedPing(appAttemptId);
//如果缓存中没有数据,直接抛出异常.
AllocateResponseLock lock = responseMap.get(appAttemptId);
if (lock == null) {
String message =
"Application attempt " + appAttemptId
+ " doesn't exist in ApplicationMasterService cache.";
LOG.error(message);
throw new ApplicationAttemptNotFoundException(message);
}
synchronized (lock) {
AllocateResponse lastResponse = lock.getAllocateResponse();
if (!hasApplicationMasterRegistered(appAttemptId)) {
String message =
"AM is not registered for known application attempt: "
+ appAttemptId
+ " or RM had restarted after AM registered. "
+ " AM should re-register.";
throw new ApplicationMasterNotRegisteredException(message);
}
// Normally request.getResponseId() == lastResponse.getResponseId()
if (AMRMClientUtils.getNextResponseId(
request.getResponseId()) == lastResponse.getResponseId()) {
// heartbeat one step old, simply return lastReponse
return lastResponse;
} else if (request.getResponseId() != lastResponse.getResponseId()) {
throw new InvalidApplicationMasterRequestException(AMRMClientUtils
.assembleInvalidResponseIdExceptionMessage(appAttemptId,
lastResponse.getResponseId(), request.getResponseId()));
}
// 构建响应
AllocateResponse response = recordFactory.newRecordInstance(AllocateResponse.class);
// 关键点
this.amsProcessingChain.allocate( amrmTokenIdentifier.getApplicationAttemptId(), request, response);
// update AMRMToken if the token is rolled-up
MasterKeyData nextMasterKey =
this.rmContext.getAMRMTokenSecretManager().getNextMasterKeyData();
if (nextMasterKey != null
&& nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier
.getKeyId()) {
// 获取RM application
RMApp app = this.rmContext.getRMApps().get(appAttemptId.getApplicationId());
RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
RMAppAttemptImpl appAttemptImpl = (RMAppAttemptImpl)appAttempt;
Token amrmToken = appAttempt.getAMRMToken();
if (nextMasterKey.getMasterKey().getKeyId() !=
appAttemptImpl.getAMRMTokenKeyId()) {
LOG.info("The AMRMToken has been rolled-over. Send new AMRMToken back"
+ " to application: " + appAttemptId.getApplicationId());
amrmToken = rmContext.getAMRMTokenSecretManager()
.createAndGetAMRMToken(appAttemptId);
appAttemptImpl.setAMRMToken(amrmToken);
}
response.setAMRMToken(org.apache.hadoop.yarn.api.records.Token
.newInstance(amrmToken.getIdentifier(), amrmToken.getKind()
.toString(), amrmToken.getPassword(), amrmToken.getService()
.toString()));
}
response.setResponseId(
AMRMClientUtils.getNextResponseId(lastResponse.getResponseId()));
lock.setAllocateResponse(response);
return response;
}
}
之后会详细介绍AMLivelinessMonitor的相关内容,以便完善对ApplicationMaster的分析研究。



