栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

2021SC@SDUSC HBase项目分析:Master启动(一)

2021SC@SDUSC HBase项目分析:Master启动(一)

2021SC@SDUSC

目录

Master介绍

Master整体架构

​Master各组件介绍

Master启动过程源码分析

2021SC@SDUSC 

Master介绍

Master整体架构

Master各组件介绍

ZooKeeperWatcher:所有需要获知与处理ZNode状态变化的组件都需要在ZooKeeperWatcher上注册ZooKeeperListener,还提供在ZooKeeper上操作节点的能力;

ActiveMasterManager:active master的管理对象,负责监听zookeeper上master znode的变化;

RpcServer:提供RPC服务的组件,具体服务由RpcEngine来支持;

InfoServer:一个web server,响应访问http://MasterHost:60010的请求,本质是一个Jetty web server;

RegionServerTracker:跟踪online region server的状态。如果某个RS znode被删除了,它会通过ServerManager去终止这个RS,并将其移出online servers列表;

DrainingServerTracker:跟踪draining region server的状态;

MasterFileSystem:抽象了Master对底层文件系统的操作;

ServerManager:管理region server,维护在线和离线的server列表,处理RS的startup和shutdown等,RS的负载信息会报告给它,关闭和打开region的操作也是由它完成;

AssignmentManager:负责region的分配和region状态的维护;

CatalogTracker:-ROOT- 和 .meta. 的跟踪器,具体工作由RootRegionTracker和metaNodeTracker完成,前者跟踪”root-region-server” 的状态 ,后者跟踪.meta.对应的ZNode状态;

MemoryBoundedLogMessageBuffer:存放来自region server的fatal error信息,超过buffer大小后会自动清理;

ExecutorService:事件执行器,各种不同事件会被提交到不同的队列里等待执行,不同的事件默认的资源也不同,对应的不是java.util.concurrent.ExecutorService,而是org.apache.hadoop.hbase.executor.ExecutorService;

LoadBalancer:平衡region server上region的负载;

BalancerChore:定期执行 master.balance();

CatalogJanitor:定期清理.meta.里split遗留的parent region信息;

LogCleaner:定期清理.oldlogs目录下的log;

HFileCleaner:定期清理.archive目录下的HFile;

MasterCoprocessorHost:提供Master端的coprocessor的执行环境与框架,这些coprocessor被包装在MasterEnvironment里。当action发生的时候,会遍历MasterCoprocessorHost里所有MasterEnvironment,获取里面MasterObserver,并调用相关方法;

SnapshotManager:管理table快照;

HealthCheckChore:不是一个必要的组件,只有配置了hbase.node.health.script.location才能启动,它会定期执行location设置的script来检测health,在失败窗口内失败次数达到阈值就会停止master。

Master启动过程源码分析

Master启动的入口方法为org.apache.hadoop.hbase.master.HMaster的main()方法,从这里开始分析:执行main()方法时需要给它传递一个参数:start或者stop,main()方法首先会打印hbase版本信息,然后调用HMasterCommandLine的doMain()方法

 public static void main(String [] args) {
    VersionInfo.logVersion();
    new HMasterCommandLine(HMaster.class).doMain(args);
  }

查看HMasterCommandLine.doMain()方法:HMasterCommandLine继承ServerCommandLine类,而ServerCommandLine类实现了Tool接口,用于解析Hbase Master server命令行参数,并启动Master线程

public void doMain(String args[]) {
    try {
      int ret = ToolRunner.run(HbaseConfiguration.create(), this, args);
      if (ret != 0) {
        System.exit(ret);
      }
    } catch (Exception e) {
      LOG.error("Failed to run", e);
      System.exit(-1);
    }
  }

据此再来查看 ToolRunner.run()方法:可以看到该方法实际上最终又调用了tool.run()方法,而从上面我们可以看到传递给该方法的tool是“this”,也就是HMasterCommandLine,所以实际上这个方法又回调了HMasterCommandLine.run()方法

public static int run(Configuration conf, Tool tool, String[] args) throws Exception {
        if (conf == null) {
            conf = new Configuration();
        }

        GenericOptionsParser parser = new GenericOptionsParser(conf, args);
        tool.setConf(conf);
        String[] toolArgs = parser.getRemainingArgs();
        return tool.run(toolArgs);
    }

现在来查看 HMasterCommandLine.run()方法:首先会处理命令行的参数,将其转换成相应的配置,参数与配置的对应关系: 

参数与配置的对应关系
参数配置
minRegionServershbase.regions.server.count.min
minServershbase.regions.server.count.min
backuphbase.master.backup
localRegionServershbase.regionservers
mastershbase.masters

然后会根据命令行参数,调用不同的方法,如果参数为start,则调用startMaster()方法

public int run(String args[]) throws Exception {
    Options opt = new Options();
    opt.addOption("localRegionServers", true,
      "RegionServers to start in master process when running standalone");
    opt.addOption("masters", true, "Masters to start in this process");
    opt.addOption("minRegionServers", true, "Minimum RegionServers needed to host user tables");
    opt.addOption("backup", false, "Do not try to become HMaster until the primary fails");

    CommandLine cmd;
    try {
      cmd = new GnuParser().parse(opt, args);
    } catch (ParseException e) {
      LOG.error("Could not parse: ", e);
      usage(null);
      return 1;
    }
    if (cmd.hasOption("minRegionServers")) {
      String val = cmd.getOptionValue("minRegionServers");
      getConf().setInt("hbase.regions.server.count.min",
                  Integer.valueOf(val));
      LOG.debug("minRegionServers set to " + val);
    }

    if (cmd.hasOption("minServers")) {
      String val = cmd.getOptionValue("minServers");
      getConf().setInt("hbase.regions.server.count.min",
                  Integer.valueOf(val));
      LOG.debug("minServers set to " + val);
    }
    if (cmd.hasOption("backup")) {
      getConf().setBoolean(HConstants.MASTER_TYPE_BACKUP, true);
    }
    if (cmd.hasOption("localRegionServers")) {
      String val = cmd.getOptionValue("localRegionServers");
      getConf().setInt("hbase.regionservers", Integer.valueOf(val));
      LOG.debug("localRegionServers set to " + val);
    }
    if (cmd.hasOption("masters")) {
      String val = cmd.getOptionValue("masters");
      getConf().setInt("hbase.masters", Integer.valueOf(val));
      LOG.debug("masters set to " + val);
    }

    List remainingArgs = cmd.getArgList();
    if (remainingArgs.size() != 1) {
      usage(null);
      return 1;
    }

    String command = remainingArgs.get(0);

    if ("start".equals(command)) {
      return startMaster();
    } else if ("stop".equals(command)) {
      return stopMaster();
    } else if ("clear".equals(command)) {
      return (ZNodeClearer.clear(getConf()) ? 0 : 1);
    } else {
      usage("Invalid command: " + command);
      return 1;
    }
  }

查看startMaster()方法:该方法会根据hbase.cluster.distributed判断是本地模式还是分布式模式,如果hbase.cluster.distributed为false,则Hbase为本地模式,该模式下,master线程和regionserver线程会在同一个JVM内启动,先调用MiniZooKeeperCluster启动ZooKeeper,然后调用LocalHbaseCluster类启动master线程和regionserver线程,master线程个数由参数hbase.masters指定,而regionserver线程个数由hbase.regionservers指定;如果hbase.cluster.distributed值为true,则hbase为分布式,此时会通过反射调用HMaster的构造函数创建HMaster实例(这样实现是为了允许用户创建HMaster的子类来扩展HMaster的行为),并调用其start()和join()方法

 private int startMaster() {
    Configuration conf = getConf();
    try {
      if (LocalHbaseCluster.isLocal(conf)) {
        final MiniZooKeeperCluster zooKeeperCluster = new MiniZooKeeperCluster(conf);
        File zkDataPath = new File(conf.get(HConstants.ZOOKEEPER_DATA_DIR));
        int zkClientPort = conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 0);
        if (zkClientPort == 0) {
          throw new IOException("No config value for "
              + HConstants.ZOOKEEPER_CLIENT_PORT);
        }
        zooKeeperCluster.setDefaultClientPort(zkClientPort);
        int zkTickTime = conf.getInt(HConstants.ZOOKEEPER_TICK_TIME, 0);
        if (zkTickTime > 0) {
          zooKeeperCluster.setTickTime(zkTickTime);
        }
        ZKUtil.loginServer(conf, HConstants.ZK_SERVER_KEYTAB_FILE,
          HConstants.ZK_SERVER_KERBEROS_PRINCIPAL, null);
        int clientPort = zooKeeperCluster.startup(zkDataPath);
        if (clientPort != zkClientPort) {
          String errorMsg = "Could not start ZK at requested port of " +
            zkClientPort + ".  ZK was started at port: " + clientPort +
            ".  Aborting as clients (e.g. shell) will not be able to find " +
            "this ZK quorum.";
          System.err.println(errorMsg);
          throw new IOException(errorMsg);
        }
        conf.set(HConstants.ZOOKEEPER_CLIENT_PORT,
                 Integer.toString(clientPort));
        int localZKClusterSessionTimeout =
            conf.getInt(HConstants.ZK_SESSION_TIMEOUT + ".localHbaseCluster", 10*1000);
        conf.setInt(HConstants.ZK_SESSION_TIMEOUT, localZKClusterSessionTimeout);
        LocalHbaseCluster cluster = new LocalHbaseCluster(conf, conf.getInt("hbase.masters", 1),
          conf.getInt("hbase.regionservers", 1), LocalHMaster.class, HRegionServer.class);
        ((LocalHMaster)cluster.getMaster(0)).setZKCluster(zooKeeperCluster);
        cluster.startup();
        waitonMasterThreads(cluster);
      } else {
        logProcessInfo(getConf());
        HMaster master = HMaster.constructMaster(masterClass, conf);
        if (master.isStopped()) {
          LOG.info("Won't bring the Master up as a shutdown is requested");
          return 1;
        }
        master.start();
        master.join();
        if(master.isAborted())
          throw new RuntimeException("HMaster Aborted");
      }
    } catch (Throwable t) {
      LOG.error("Master exiting", t);
      return 1;
    }
    return 0;
  }

查看HMaster的构造函数 :首先实例化Configuration对象,并进行配置:1. 在master上禁止block cache;2.设置服务器的重连次数;3.创建rpcServer(这个地方用到了java nio,对于java nio的使用会专门写一篇博客);4.设置serverName;5.登录zookeeper;6.初始化服务器主体;7.设置线程名称;8.注册用户自定义的ReplicationLogCleaner类;9.为task trackers配置参数;10.创建ZooKeeperWatcher;11.启动rpcServer线程;12.创建metricsMaster;13.判断是否进行健康检测

  public HMaster(final Configuration conf)
  throws IOException, KeeperException, InterruptedException {
    this.conf = new Configuration(conf);

    //在master上禁止block cache
    this.conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
    FSUtils.setupShortCircuitRead(conf);
    String hostname = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
      conf.get("hbase.master.dns.interface", "default"),
      conf.get("hbase.master.dns.nameserver", "default")));
    int port = conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT);
    InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
    if (initialIsa.getAddress() == null) {
      throw new IllegalArgumentException("Failed resolve of hostname " + initialIsa);
    }
    String bindAddress = conf.get("hbase.master.ipc.address");
    if (bindAddress != null) {
      initialIsa = new InetSocketAddress(bindAddress, port);
      if (initialIsa.getAddress() == null) {
        throw new IllegalArgumentException("Failed resolve of bind address " + initialIsa);
      }
    }
    String name = "master/" + initialIsa.toString();

    //设置服务器的重连次数
    HConnectionManager.setServerSideHConnectionRetries(this.conf, name, LOG);
    int numHandlers = conf.getInt(HConstants.MASTER_HANDLER_COUNT,
      conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, HConstants.DEFAULT_MASTER_HANLDER_COUNT));

    //创建rpcServer
    this.rpcServer = new RpcServer(this, name, getServices(),
      initialIsa, // BindAddress is IP we got for this server.
      conf,
      new FifoRpcScheduler(conf, numHandlers));
    this.isa = this.rpcServer.getListenerAddress();

    //设置serverName
    this.serverName = ServerName.valueOf(hostname, this.isa.getPort(), System.currentTimeMillis());
    this.rsFatals = new MemoryBoundedLogMessageBuffer(
      conf.getLong("hbase.master.buffer.for.rs.fatals", 1*1024*1024));

    //登录zookeeper
    ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE,
      HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, this.isa.getHostName());
    
    //初始化服务器
    UserProvider provider = UserProvider.instantiate(conf);
    provider.login("hbase.master.keytab.file",
      "hbase.master.kerberos.principal", this.isa.getHostName());

    LOG.info("hbase.rootdir=" + FSUtils.getRootDir(this.conf) +
        ", hbase.cluster.distributed=" + this.conf.getBoolean("hbase.cluster.distributed", false));

    //设置线程名称
    setName(MASTER + ":" + this.serverName.toShortString());

    //注册用户自定义的ReplicationLogCleaner类
    Replication.decorateMasterConfiguration(this.conf);

    //为task trackers配置参数
    if (this.conf.get("mapred.task.id") == null) {
      this.conf.set("mapred.task.id", "hb_m_" + this.serverName.toString());
    }

    //创建ZooKeeperWatcher
    this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":" + isa.getPort(), this, true);
    //启动rpcServer线程
    this.rpcServer.startThreads();
    this.pauseMonitor = new JvmPauseMonitor(conf);
    this.pauseMonitor.start();

    this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);

    this.masterCheckCompression = conf.getBoolean("hbase.master.check.compression", true);

    this.masterCheckEncryption = conf.getBoolean("hbase.master.check.encryption", true);
    
    //创建metricsMaster 
    this.metricsMaster = new MetricsMaster( new MetricsMasterWrapperImpl(this));

    this.preLoadTableDescriptors = conf.getBoolean("hbase.master.preload.tabledescriptors", true);

    //判断是否进行健康检测
    int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
      HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
    if (isHealthCheckerConfigured()) {
      healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
    }

    boolean shouldPublish = conf.getBoolean(HConstants.STATUS_PUBLISHED,
        HConstants.STATUS_PUBLISHED_DEFAULT);
    Class publisherClass =
        conf.getClass(ClusterStatusPublisher.STATUS_PUBLISHER_CLASS,
            ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS,
            ClusterStatusPublisher.Publisher.class);

    if (shouldPublish) {
      if (publisherClass == null) {
        LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
            ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS +
            " is not set - not publishing status");
      } else {
        clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass);
        Threads.setDaemonThreadRunning(clusterStatusPublisherChore.getThread());
      }
    }
  }

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/389426.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号