栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Hadoop源码分析(五) ApplicationMasterService源码分析 2021SC@SDUSC

Hadoop源码分析(五) ApplicationMasterService源码分析 2021SC@SDUSC

2021SC@SDUSC

Hadoop源码分析(五) ApplicationMasterService源码分析 2021SC@SDUSC
  • 一、ApplicationMasterService简介
  • 二、ApplicationMasterService接口
  • 三、ApplicationMasterService源码分析
    • 3.1 构造方法
    • 3.2 属性
    • 3.3 serviceInit 方法
    • 3.4 serviceStart方法
    • 3.5 registerApplicationMaster方法
    • 3.6 finishApplicationMaster方法
    • 3.7 allocate方法

一、ApplicationMasterService简介

上一章,介绍了ApplicationMasterLauncher的有关内容,ApplicationMasterLauncher是ApplicationMaster管理的一部分,它主要负责与NodeManager通信,以完成ApplicationMaster的启动。而ApplicationMasterService主要是处理来自ApplicationMaster的请求,包括注册和心跳两种请求。其中注册请求包含Application启动节点、对外RPC端口号和tracking URL等信息,心跳是周期性行为,汇报信息包含所需资源描述、待释放的Container列表、黑名单列表等。ApplicationMasterService为返回新分配的Container、失败的Container、待抢占的Container列表等信息。

二、ApplicationMasterService接口

ApplicationMasterProtocol协议是AM与RM通信的协议,用于对所有提交的AM进行管理。主要有以下任务:
注册新的AM、来自任意正在结束的AM的终止/取消注册请求、认证来自不同AM的所有请求,
确保合法的AM发送的请求传递给RM中的应用程序对象、获取来自所有运行AM的Container的分配和释放请求、异步的转发给Yarn调度器。
ApplicaitonMaster Service确保了任意时间点、任意AM只有一个线程可以发送请求给RM,因为在RM上所有来自AM的RPC请求都串行化了。
该协议主要有三个方法:
registerApplicationMaster,finishApplicationMaster,allocate

//新的ApplicationMaster向RM注册
//ApplicationMaster会提供RPC端口,url等信息给RM,响应信息会返回集群所能响应的最大资源能力
  public RegisterApplicationMasterResponse registerApplicationMaster(
      RegisterApplicationMasterRequest request) 
  throws YarnException, IOException;

//ApplicationMaster通知RM自己的状态为成功/失败
  public FinishApplicationMasterResponse finishApplicationMaster(
      FinishApplicationMasterRequest request) 
  throws YarnException, IOException;

//ApplicationMaster向ResourceManager申请资源/心跳
  public AllocateResponse allocate(AllocateRequest request) 
  throws YarnException, IOException;
三、ApplicationMasterService源码分析 3.1 构造方法

通过ResourceManager 的serviceInit 方法构建

 public ApplicationMasterService(RMContext rmContext,
      YarnScheduler scheduler) {
    this(ApplicationMasterService.class.getName(), rmContext, scheduler);
  }

  public ApplicationMasterService(String name, RMContext rmContext,
      YarnScheduler scheduler) {
    super(name);
    this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
    this.rScheduler = scheduler;
    this.rmContext = rmContext;
    // AMSProcessingChain通过责任链模式处理ApplicationMaster的注册
    // 责任链上的processor的头节点目前是DefaultAMSProcessor
    this.amsProcessingChain = new AMSProcessingChain(new DefaultAMSProcessor());
  }
3.2 属性
  // AM监控
  private final AMLivelinessMonitor amLivelinessMonitor;
  // 调度器
  private YarnScheduler rScheduler;
  // 接口地址
  protected InetSocketAddress masterServiceAddress;

  // 服务实体
  protected Server server;
  protected final RecordFactory recordFactory =  RecordFactoryProvider.getRecordFactory(null);

  // 存储响应实体
  private final ConcurrentMap responseMap = new ConcurrentHashMap();

  // ApplicationAttemptId 状态
  private final ConcurrentHashMap finishedAttemptCache = new ConcurrentHashMap<>();

  // RM信息
  protected final RMContext rmContext;
  // 存放AM的处理Chain
  private final AMSProcessingChain amsProcessingChain;
  // 是否启用timelineServiceV2 , 默认 false
  private boolean timelineServiceV2Enabled;

3.3 serviceInit 方法

初始化了masterServiceAddress 服务地址: 0.0.0.0/0.0.0.0:8030。然后开始初始化 initializeProcessingChain

初始化了masterServiceAddress 服务地址

  @Override
  protected void serviceInit(Configuration conf) throws Exception {

    // 构建 rpc 服务
    // 0.0.0.0/0.0.0.0:8030
    masterServiceAddress = conf.getSocketAddr(
        YarnConfiguration.RM_BIND_HOST,
        YarnConfiguration.RM_SCHEDULER_ADDRESS,
        YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
        YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);

    // 初始化amsProcessingChain
    initializeProcessingChain(conf);
  }

初始化initializeProcessingChain

  private void initializeProcessingChain(Configuration conf) {
    amsProcessingChain.init(rmContext, null);

    // 处理放置策略, 默认 拒绝
    // yarn.resourcemanager.placement-constraints.handler : disabled
    addPlacementConstraintHandler(conf);

    // 从配置文件中获取 ApplicationMasterServiceProcessor 添加到amsProcessingChain 中.
    List processors = getProcessorList(conf);
    if (processors != null) {
      Collections.reverse(processors);
      for (ApplicationMasterServiceProcessor p : processors) {
        // Ensure only single instance of PlacementProcessor is included
        if (p instanceof AbstractPlacementProcessor) {
          LOG.warn("Found PlacementProcessor=" + p.getClass().getCanonicalName()
              + " defined in "
              + YarnConfiguration.RM_APPLICATION_MASTER_SERVICE_PROCESSORS
              + ", however PlacementProcessor handler should be configured "
              + "by using " + YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER
              + ", this processor will be ignored.");
          continue;
        }
        this.amsProcessingChain.addProcessor(p);
      }
    }
  }
3.4 serviceStart方法

核心就是启动server服务: BoYi-Pro.local/192.168.xx.xxx:8030

  @Override
  protected void serviceStart() throws Exception {
    Configuration conf = getConfig();
    YarnRPC rpc = YarnRPC.create(conf);

    Configuration serverConf = conf;
    // If the auth is not-simple, enforce it to be token-based.
    serverConf = new Configuration(conf);

    serverConf.set(  CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, SaslRpcServer.AuthMethod.TOKEN.toString());

    // ProtobufRpcEngin$Server   ==> 0.0.0.0: 8030
    this.server = getServer(rpc, serverConf, masterServiceAddress, this.rmContext.getAMRMTokenSecretManager());
    // TODO more exceptions could be added later.

    this.server.addTerseExceptions(ApplicationMasterNotRegisteredException.class);

    // Enable service authorization?
    if (conf.getBoolean(
        CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, 
        false)) {


      InputStream inputStream =
          this.rmContext.getConfigurationProvider()
              .getConfigurationInputStream(conf,
                  YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE);

      if (inputStream != null) {
        conf.addResource(inputStream);
      }
      refreshServiceAcls(conf, RMPolicyProvider.getInstance());
    }


    this.server.start();

    // 刷新配置 BoYi-Pro.local/192.168.xx.xxx:8030
    this.masterServiceAddress = conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST,
                               YarnConfiguration.RM_SCHEDULER_ADDRESS,
                               YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
                               server.getListenerAddress());

    this.timelineServiceV2Enabled = YarnConfiguration.timelineServiceV2Enabled(conf);

    super.serviceStart();
  }
3.5 registerApplicationMaster方法

ApplicationMasterProtocol协议中定义的方法. 用于注册application.
获取注册请求, 校验通过之后, 构建response , 然后使用amsProcessingChain进行注册操作。

public RegisterApplicationMasterResponse registerApplicationMaster(
      RegisterApplicationMasterRequest request) throws YarnException,
      IOException {

    AMRMTokenIdentifier amrmTokenIdentifier =
        YarnServerSecurityUtils.authorizeRequest();
    ApplicationAttemptId applicationAttemptId =
        amrmTokenIdentifier.getApplicationAttemptId();

    // 获取ApplicationId
    ApplicationId appID = applicationAttemptId.getApplicationId();

    AllocateResponseLock lock = responseMap.get(applicationAttemptId);
    if (lock == null) {
      RMAuditLogger.logFailure(this.rmContext.getRMApps().get(appID).getUser(),
          AuditConstants.REGISTER_AM, "Application doesn't exist in cache "
              + applicationAttemptId, "ApplicationMasterService",
          "Error in registering application master", appID,
          applicationAttemptId);
      throwApplicationDoesNotExistInCacheException(applicationAttemptId);
    }

    // 同一时间只能有一个线程注册
    // Allow only one thread in AM to do registerApp at a time.
    synchronized (lock) {

      AllocateResponse lastResponse = lock.getAllocateResponse();
      if (hasApplicationMasterRegistered(applicationAttemptId)) {
        // allow UAM re-register if work preservation is enabled
        ApplicationSubmissionContext appContext =
            rmContext.getRMApps().get(appID).getApplicationSubmissionContext();
        if (!(appContext.getUnmanagedAM()
            && appContext.getKeepContainersAcrossApplicationAttempts())) {
          String message =
              AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE + appID;
          LOG.warn(message);
          RMAuditLogger.logFailure(
              this.rmContext.getRMApps().get(appID).getUser(),
              AuditConstants.REGISTER_AM, "", "ApplicationMasterService",
              message, appID, applicationAttemptId);
          throw new InvalidApplicationMasterRequestException(message);
        }
      }
      // 更新心跳时间
      this.amLivelinessMonitor.receivedPing(applicationAttemptId);

      // 将 response id 设置为0,以标识应用程序主机是否注册了相应的 attemptid
      lastResponse.setResponseId(0);
      // 更新lastResponse
      lock.setAllocateResponse(lastResponse);

      RegisterApplicationMasterResponse response =
          recordFactory.newRecordInstance(
              RegisterApplicationMasterResponse.class);

      // 执行注册操作
      this.amsProcessingChain.registerApplicationMaster(amrmTokenIdentifier.getApplicationAttemptId(), request, response);

      return response;
    }
  }
3.6 finishApplicationMaster方法

ApplicationMasterProtocol协议中定义的方法. 用于 App Master 通知ApplicationMasterService .
直接调用this.amsProcessingChain .finishApplicationMaster 执行注册操作。

@Override
  public FinishApplicationMasterResponse finishApplicationMaster(
      FinishApplicationMasterRequest request) throws YarnException,
      IOException {

    // 获取 applicationAttemptId
    ApplicationAttemptId applicationAttemptId =
        YarnServerSecurityUtils.authorizeRequest().getApplicationAttemptId();

    // 获取 ApplicationId
    ApplicationId appId = applicationAttemptId.getApplicationId();

    // 获取RMApp
    RMApp rmApp =
        rmContext.getRMApps().get(applicationAttemptId.getApplicationId());

    // Remove collector address when app get finished.
    if (timelineServiceV2Enabled) {
      ((RMAppImpl) rmApp).removeCollectorData();
    }
    if (rmApp.isAppFinalStateStored()) {
      LOG.info(rmApp.getApplicationId() + " unregistered successfully. ");
      return FinishApplicationMasterResponse.newInstance(true);
    }

    AllocateResponseLock lock = responseMap.get(applicationAttemptId);
    if (lock == null) {
      throwApplicationDoesNotExistInCacheException(applicationAttemptId);
    }

    // Allow only one thread in AM to do finishApp at a time.
    synchronized (lock) {
      if (!hasApplicationMasterRegistered(applicationAttemptId)) {
        String message =
            "Application Master is trying to unregister before registering for: "
                + appId;
        LOG.error(message);
        RMAuditLogger.logFailure(
            this.rmContext.getRMApps()
                .get(appId).getUser(),
            AuditConstants.UNREGISTER_AM, "", "ApplicationMasterService",
            message, appId,
            applicationAttemptId);
        throw new ApplicationMasterNotRegisteredException(message);
      }

      FinishApplicationMasterResponse response =
          FinishApplicationMasterResponse.newInstance(false);

      // finishedAttemptCache 是否存在applicationAttemptId
      if (finishedAttemptCache.putIfAbsent(applicationAttemptId, true)
          == null) {
        // 没有处理过,直接处理
        this.amsProcessingChain
            .finishApplicationMaster(applicationAttemptId, request, response);
      }
      // 处理监控心跳
      this.amLivelinessMonitor.receivedPing(applicationAttemptId);
      return response;
    }
  }
3.7 allocate方法

还是调用amsProcessingChain.allocate 处理请求。

  @Override
  public AllocateResponse allocate(AllocateRequest request)
      throws YarnException, IOException {

    AMRMTokenIdentifier amrmTokenIdentifier = YarnServerSecurityUtils.authorizeRequest();

    ApplicationAttemptId appAttemptId = amrmTokenIdentifier.getApplicationAttemptId();

    // 更新心跳时间
    this.amLivelinessMonitor.receivedPing(appAttemptId);

    //如果缓存中没有数据,直接抛出异常.

    AllocateResponseLock lock = responseMap.get(appAttemptId);
    if (lock == null) {
      String message =
          "Application attempt " + appAttemptId
              + " doesn't exist in ApplicationMasterService cache.";
      LOG.error(message);
      throw new ApplicationAttemptNotFoundException(message);
    }
    synchronized (lock) {
      AllocateResponse lastResponse = lock.getAllocateResponse();
      if (!hasApplicationMasterRegistered(appAttemptId)) {
        String message =
            "AM is not registered for known application attempt: "
                + appAttemptId
                + " or RM had restarted after AM registered. "
                + " AM should re-register.";
        throw new ApplicationMasterNotRegisteredException(message);
      }

      // Normally request.getResponseId() == lastResponse.getResponseId()
      if (AMRMClientUtils.getNextResponseId(
          request.getResponseId()) == lastResponse.getResponseId()) {
        // heartbeat one step old, simply return lastReponse
        return lastResponse;
      } else if (request.getResponseId() != lastResponse.getResponseId()) {
        throw new InvalidApplicationMasterRequestException(AMRMClientUtils
            .assembleInvalidResponseIdExceptionMessage(appAttemptId,
                lastResponse.getResponseId(), request.getResponseId()));
      }

      // 构建响应
      AllocateResponse response =  recordFactory.newRecordInstance(AllocateResponse.class);

      // 关键点 
      this.amsProcessingChain.allocate(  amrmTokenIdentifier.getApplicationAttemptId(), request, response);

      // update AMRMToken if the token is rolled-up
      MasterKeyData nextMasterKey =
          this.rmContext.getAMRMTokenSecretManager().getNextMasterKeyData();

      if (nextMasterKey != null
          && nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier
          .getKeyId()) {

        // 获取RM application
        RMApp app =  this.rmContext.getRMApps().get(appAttemptId.getApplicationId());


        RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);

        RMAppAttemptImpl appAttemptImpl = (RMAppAttemptImpl)appAttempt;

        Token amrmToken = appAttempt.getAMRMToken();
        if (nextMasterKey.getMasterKey().getKeyId() !=
            appAttemptImpl.getAMRMTokenKeyId()) {
          LOG.info("The AMRMToken has been rolled-over. Send new AMRMToken back"
              + " to application: " + appAttemptId.getApplicationId());
          amrmToken = rmContext.getAMRMTokenSecretManager()
              .createAndGetAMRMToken(appAttemptId);
          appAttemptImpl.setAMRMToken(amrmToken);
        }

        response.setAMRMToken(org.apache.hadoop.yarn.api.records.Token
            .newInstance(amrmToken.getIdentifier(), amrmToken.getKind()
                .toString(), amrmToken.getPassword(), amrmToken.getService()
                .toString()));
      }

      response.setResponseId(
          AMRMClientUtils.getNextResponseId(lastResponse.getResponseId()));
      lock.setAllocateResponse(response);
      return response;
    }
  }

之后会详细介绍AMLivelinessMonitor的相关内容,以便完善对ApplicationMaster的分析研究。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/439186.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号