栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Hive3.1.2源码阅读--处理sql语句 processLine-->processCmd

Hive3.1.2源码阅读--处理sql语句 processLine-->processCmd

    由上文进入processLine方法中
    1.1 该方法用于在特殊情况下可以中止作业的执行
    1.2 定义了两种作业中止的方式
    ① ctrl+c 当连续两次触发 Ctrl+c 作业中断
    ② kill -2
    1.3 HadoopJobExecHelper 这个类中保存了正在运行的hadoop job;通过其方法killRunningJobs杀死所有正在运行的任务根据;切割sql中的执行语句
    2.1 进入processCmd真正执行单条sql语句
//processLine可以理解为作业上了一道保险,确保如果因为特殊情况需要中止作业的话有结束的方法。
  public int processLine(String line, boolean allowInterrupting) {
    SignalHandler oldSignal = null;
    Signal interruptSignal = null;
    // 1.默认传入的allowInterrupting为true
    // 	 若allowInterrupting为true,表示作业可以中断
    //   若allowInterrupting为false,表示作业不可以中断
    if (allowInterrupting) {
      // Remember all threads that were running at the time we started line processing.
      // Hook up the custom Ctrl+C handler while processing this line
      // 定义接收到的信号 (可以用Ctrl+c ,和kill -2两种方式触发信号 )
      interruptSignal = new Signal("INT");
      oldSignal = Signal.handle(interruptSignal, new SignalHandler() {
        private boolean interruptRequested;

        @Override
        public void handle(Signal signal) {
          boolean initialRequest = !interruptRequested;
          interruptRequested = true;

          // 当连续两次触发 Ctrl+c 作业中断
          if (!initialRequest) {
            console.printInfo("Exiting the JVM");
            System.exit(127);
          }

          // 作业中断返回的语句
          console.printInfo("Interrupting... Be patient, this might take some time.");
          console.printInfo("Press Ctrl+C again to kill JVM");

          // 杀死所有正在运行的任务,HadoopJobExecHelper 这个类中保存了正在运行的hadoop job ,
          // 任务会在这个类的progress()方法中循环获取执行进度,并且保存了所有这个jvm 中执行的任务。
          HadoopJobExecHelper.killRunningJobs();
          TezJobExecHelper.killRunningJobs();
          HiveInterruptUtils.interrupt();
        }
      });
    }

    try {
      int lastRet = 0, ret = 0;

      // we can not use "split" function directly as ";" may be quoted
      List commands = splitSemicolon(line);

      String command = "";
      for (String oneCmd : commands) {

        if (StringUtils.endsWith(oneCmd, "\")) {
          command += StringUtils.chop(oneCmd) + ";";
          continue;
        } else {
          command += oneCmd;
        }
        if (StringUtils.isBlank(command)) {
          continue;
        }
        // 真正执行命令的类,根据命令不同做出不同对应
        // 包括对退出,执行sql文件,执行linux命令,和sql等命令处理
        ret = processCmd(command);//解析单行HQL
        command = "";
        lastRet = ret;
        boolean ignoreErrors = HiveConf.getBoolVar(conf, HiveConf.ConfVars.CLIIGNOREERRORS);
        if (ret != 0 && !ignoreErrors) {
          return ret;
        }
      }
      return lastRet;
    } finally {
      // 处理完行之后,恢复旧的处理程序
      if (oldSignal != null && interruptSignal != null) {
        Signal.handle(interruptSignal, oldSignal);
      }
    }
  }

    进入该方法中查看相关的执行逻辑
    3.1 刷新打印流,防止上一个命令输出
    3.2 将传入的单句sql去除注释
    3.3 按照空格、制表符切割,存入token数组中
    3.4 判断切割出来的第一个字段是什么来决定相应的处理方式
    3.4.1 如果输入的sql是quit,则退出
    3.4.2 如果是source,则是执行跟在后面的文件,最后也是交给processFile去执行
    3.4.3 如果是shell脚本,则调用ShellCmdExecutor
    3.4.4 其他情况则是sql,通过processLocalCmd去执行
  public int processCmd(String cmd) {
    CliSessionState ss = (CliSessionState) SessionState.get();
    ss.setLastCommand(cmd);

    ss.updateThreadName();

    // 刷新打印流,防止上个命令输出
    ss.err.flush();
    String cmd_trimmed = HiveStringUtils.removeComments(cmd).trim();//删除注释
    // tokenizeCmd将用户输入的指令从空格,制表符等出断开,得到token数组
    String[] tokens = tokenizeCmd(cmd_trimmed);
    int ret = 0;

    // 如果命令是`quit`或`exit`,则退出
    if (cmd_trimmed.toLowerCase().equals("quit") || cmd_trimmed.toLowerCase().equals("exit")) {

      // if we have come this far - either the previous commands
      // are all successful or this is command line. in either case
      // this counts as a successful run
      ss.close();
      System.exit(0);

      // 如果命令是`source`开头,则校验`source`命令后面的文件是否存在,存在则执行`processFile`
    } else if (tokens[0].equalsIgnoreCase("source")) {
      String cmd_1 = getFirstCmd(cmd_trimmed, tokens[0].length());
      cmd_1 = new VariableSubstitution(new HiveVariableSource() {
        @Override
        public Map getHiveVariable() {
          return SessionState.get().getHiveVariables();
        }
      }).substitute(ss.getConf(), cmd_1);

      File sourceFile = new File(cmd_1);
      // 如果source后跟的参数不是文件,就报错
      if (! sourceFile.isFile()){
        console.printError("File: "+ cmd_1 + " is not a file.");
        ret = 1;
      } else {
        try {
          // 不管再多操作,只要他是文件,最后也是交给processFile去执行
          ret = processFile(cmd_1);
        } catch (IOException e) {
          // 处理失败了会输出报错的原因
          console.printError("Failed processing file "+ cmd_1 +" "+ e.getLocalizedMessage(),
            stringifyException(e));
          ret = 1;
        }
      }
      // 如果命令是以感叹号!开头,表明是`shell`命令,这时候直接调用`shell`命令执行器执行
    } else if (cmd_trimmed.startsWith("!")) {//shell脚本
      // for shell commands, use unstripped command
      String shell_cmd = cmd.trim().substring(1);
      shell_cmd = new VariableSubstitution(new HiveVariableSource() {
        @Override
        public Map getHiveVariable() {
          return SessionState.get().getHiveVariables();
        }
      }).substitute(ss.getConf(), shell_cmd);

      // shell_cmd = "/bin/bash -c '" + shell_cmd + "'";
      try {
        ShellCmdExecutor executor = new ShellCmdExecutor(shell_cmd, ss.out, ss.err);
        ret = executor.execute();
        if (ret != 0) {
          console.printError("Command failed with exit code = " + ret);
        }
      } catch (Exception e) {
        console.printError("Exception raised from Shell command " + e.getLocalizedMessage(),
            stringifyException(e));
        ret = 1;
      }
    }  else { // local mode
      // 去除上面各种类型,其他sql的主要执行方法为processLocalCmd
      try {
        //直接解析输入的sql
        try (CommandProcessor proc = CommandProcessorFactory.get(tokens, (HiveConf) conf)) {
          if (proc instanceof IDriver) {
            // Let Driver strip comments using sql parser
            ret = processLocalCmd(cmd, proc, ss);
          } else {
            ret = processLocalCmd(cmd_trimmed, proc, ss);
          }
        }
      } catch (SQLException e) {
        console.printError("Failed processing command " + tokens[0] + " " + e.getLocalizedMessage(),
          org.apache.hadoop.util.StringUtils.stringifyException(e));
        ret = 1;
      }
      catch (Exception e) {
        throw new RuntimeException(e);
      }
    }

    ss.resetThreadName();
    // 如果最后返回的ret为1,那么就表示执行失败。
    return ret;
  }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/775389.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号