栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Hadoop yarn源码分析(六) AMLivelinessMonitor源码分析 2021SC@SDUSC

Hadoop yarn源码分析(六) AMLivelinessMonitor源码分析 2021SC@SDUSC

2021SC@SDUSC

Hadoop yarn源码分析(六)AMLivelinessMonitor源码分析 2021SC@SDUSC
  • 一、AMLivelinessMonitor简介
  • 二、AMLivelinessMonitor属性
  • 三、AMLivelinessMonitor主要方法
    • 3.1 构造方法
    • 3.2 初始化方法
    • 3.3 超时回调
    • 3.4 启动服务
    • 3.5 注册监控
    • 3.6 内部类

一、AMLivelinessMonitor简介

上一章,介绍了ApplicationMasterService的有关内容,ApplicationMasterService主要是处理来自ApplicationMaster的请求,包括注册和心跳两种请求。而AMLivelinessMonitor负责监控管理。该服务周期性遍历所有程序的AM,若在一定时间内没有收到心跳信息,则认为该程序已死,对应的Container运行失败。若AM运行失败,则RM重新申请资源,以便重新分配并执行。

二、AMLivelinessMonitor属性

AbstractLivelinessMonitor是AMLivelinessMonitor的父类,是一个抽象类,是一个活跃度注册器,客户端可以注册,并监视组件的活跃度,每隔一段时间得到回馈,最终取消注册。
在该类中,有几个重要的参数:

//org.apache.hadoop.yarn.util.AbstractLivelinessMonitor.java
  //定时监测心跳的线程
  private Thread checkerThread;
  //标识是否停止
  private volatile boolean stopped;
  //设置时间间隔,为5mins
  public static final int DEFAULT_EXPIRE = 5*60*1000;//5 mins
  //设置过期时间间隔
  private long expireInterval = DEFAULT_EXPIRE;
  //设置监控时间间隔,为过期时间间隔的1/3
  private long monitorInterval = expireInterval / 3;
  //是否在启动时重置
  private volatile boolean resetTimeronStart = true;
  //时钟
  private final Clock clock;
  //正在监控程序的上一次运行时间
  private Map running = new HashMap();
三、AMLivelinessMonitor主要方法 3.1 构造方法

主要有两个属性,Dispatcher调度器,Clock时钟

//org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor.java
  public AMLivelinessMonitor(Dispatcher d) {
    super("AMLivelinessMonitor");
    this.dispatcher = d.getEventHandler();
  }

  public AMLivelinessMonitor(Dispatcher d, Clock clock) {
    super("AMLivelinessMonitor", clock);
    this.dispatcher = d.getEventHandler();
  }
3.2 初始化方法

serviceInit方法完成初始化,设置对应的过期时间间隔和监控时间间隔。其中,监控时间间隔为过期时间间隔的1/3

  public void serviceInit(Configuration conf) throws Exception {
    super.serviceInit(conf);
    //过期时间间隔
    int expireIntvl = conf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
            YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS);
    setExpireInterval(expireIntvl);
    //监控时间间隔
    setMonitorInterval(expireIntvl/3);
  }
3.3 超时回调

用dispatcher调度器进行超时回调处理,超过一定时间,则调用该函数

  protected void expire(ApplicationAttemptId id) {
    dispatcher.handle(
        new RMAppAttemptEvent(id, RMAppAttemptEventType.EXPIRE));
  }
3.4 启动服务

serviceStart方法负责启动服务,构建了一个checkerThread线程,用于监控

//org.apache.hadoop.yarn.util.AbstractLivelinessMonitor.java
  protected void serviceStart() throws Exception {
    assert !stopped : "starting when already stopped";
    resetTimer();
    checkerThread = new Thread(new PingChecker());
    checkerThread.setName("Ping Checker for "+getName());
    checkerThread.start();
    super.serviceStart();
  }
3.5 注册监控

receivedPing方法,只能传入已经注册过的对象和对应的时间

//org.apache.hadoop.yarn.util.AbstractLivelinessMonitor.java
  public synchronized void receivedPing(O ob) {
    //only put for the registered objects
    if (running.containsKey(ob)) {
      running.put(ob, clock.getTime());
    }
  }
3.6 内部类

PingChecker是实现Runnable方法的AbstractLivelinessMonitor的内部类,负责迭代running中记录的数据,验证是否过期。过期则调用回调函数expire。

//org.apache.hadoop.yarn.util.AbstractLivelinessMonitor.java
  //内部类,实现Runnable方法
  private class PingChecker implements Runnable {

    @Override
    public void run() {
      //正常运行
      while (!stopped && !Thread.currentThread().isInterrupted()) {
        synchronized (AbstractLivelinessMonitor.this) {
          Iterator> iterator = running.entrySet().iterator();

          // 避免每次在循环中计算当前时间
          long currentTime = clock.getTime();

          while (iterator.hasNext()) {
            Map.Entry entry = iterator.next();
            O key = entry.getKey();
            long interval = getExpireInterval(key);
            //在超时的情况下
            if (currentTime > entry.getValue() + interval) {
              iterator.remove();
              expire(key);
              LOG.info("Expired:" + entry.getKey().toString()
                  + " Timed out after " + interval / 1000 + " secs");
            }
          }
        }
        //异常处理
        try {
          Thread.sleep(monitorInterval);
        } catch (InterruptedException e) {
          LOG.info(getName() + " thread interrupted");
          break;
        }
      }
    }
  }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/487948.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号