参考学习尚硅谷Hive源码篇,自我总结
- run方法——解析用户参数,包含"-e -f -v -database"等等
- executeDriver 方法——识别 hivesql语句的"> “和”;"
- processLine方法——解析单行 HQL
- processCmd方法——判别四种开头情况 -1-“quit"或者"exit”-2-“source”-3-"!"-4-HQL
- processLocalCmd 方法——HQL 执行的核心方法
- qp.run(cmd)方法——分别进去未编译(false)和已编译(true)的run方法
- runInternal方法—— 1.编译 HQL 语句 (包含解析器、编译器和优化器);2.执行 (执行器)
int ret = new CliDriver().run(args);
//*******************************主类的 run 方法************************************
public int run(String[] args) throws Exception {
//解析用户参数,包含"-e -f -v -database"等等
if (!oproc.process_stage2(ss)) { return 2;}
// execute cli driver work
try {return executeDriver(ss, conf, oproc); }
//*******************************executeDriver 方法************************************
private int executeDriver(CliSessionState ss, HiveConf conf, OptionsProcessor oproc){
//读取客户端的输入 HQL
while ((line = reader.readLine(curprompt + "> ")) != null) {
//以按照“;”分割的方式解析
if (line.trim().endsWith(";") && !line.trim().endsWith("\;")) {
line = prefix + line;
ret = cli.processLine(line, true);
//*******************************processLine 方法************************************
public int processLine(String line, boolean allowInterrupting){
//解析单行 HQL
ret = processCmd(command);
//*******************************processCmd 方法************************************
public int processCmd(String cmd){
//1.如果命令为"quit"或者"exit",则退出
if (cmd_trimmed.toLowerCase().equals("quit") || cmd_trimmed.toLowerCase().equals("exit")) {
//2.如果命令为"source"开头,则表示执行 HQL 文件,继续读取文件并解析
} else if (tokens[0].equalsIgnoreCase("source")) {
//3.如果命令以"!"开头,则表示用户需要执行 Linux 命令
} else if (cmd_trimmed.startsWith("!")) {
//4.以上三者都不是,则认为用户输入的为"select ..."正常的增删改查 HQL 语句,则进行 HQL 解析
} else {// Let Driver strip comments using sql parser
ret = processLocalCmd(cmd, proc, ss);
//*******************************processLocalCmd 方法************************************
int processLocalCmd(String cmd, CommandProcessor proc, CliSessionStatess){
//HQL 执行的核心方法
ret = qp.run(cmd).getResponseCode();
//*******************************qp.run(cmd)方法************************************
public CommandProcessorResponse run(String command) { return run(command, false);}//false表示当前未编译
public CommandProcessorResponse run(String command, boolean alreadyCompiled) {
try {runInternal(command, alreadyCompiled); //*******************************runInternal 方法************************************
private void runInternal(String command, boolean alreadyCompiled){
//1.编译 HQL 语句 (包含解析器、编译器和优化器)
compileInternal(command, true);
//2.执行 (执行器)
execute();
SQL Parser解析器——HQL生成AST(抽象语法树)
compileInternal 和compile方法——传入HQL语句,调用parse方法
parse 方法
2.1 构建词法解析器 new HiveLexerX
2.2 将 HQL 中的关键词替换为 Token
2.3 将Token组合成AST抽象语法树
//*******************************compileInternal 方法*******************************
private void compileInternal(String command, boolean deferClose) throws CommandProcessorResponse {
compile(command, true, deferClose);
//*******************************compile 方法*******************************
private void compile(String command, boolean resetTaskIds, boolean deferClose) throws CommandProcessorResponse {
//HQL 生成 AST
ASTNode tree;
try {tree = ParseUtils.parse(command, ctx);
//*******************************parse 方法*******************************
public ASTNode parse(String command, Context ctx, String viewFullyQualifiedName)
//1.构建词法解析器
HiveLexerX lexer = new HiveLexerX(new ANTLRNoCaseStringStream(command));
//2.将 HQL 中的关键词替换为 Token
TokenRewriteStream tokens = new TokenRewriteStream(lexer);
HiveParser parser = new HiveParser(tokens);
//3.进行语法解析,生成最终的
AST r = parser.statement();
ASTNode tree = (ASTNode) r.getTree();
说明:Antlr 框架
Hive 使用 Antlr 实现 SQL 的词法和语法解析。Antlr 是一种语言识别的工具,可以用来构造领域语言。 这里不详细介绍 Antlr,只需要了解使用 Antlr 构造特定的语言只需要编写一个语法文件,定义词法和语法替换规则即可,Antlr 完成了词法分析、 语法分析、语义分析、中间代码生成的过程。 Hive 中语法规则的定义文件在 0.10 版本以前是 Hive.g 一个文件,随着语法规则越来越复杂,由语法规则生成的 Java 解析类可能超过 Java 类文件的最大上限,0.11 版本将 Hive.g 拆成了 5 个文件,词法规则 HiveLexer.g 和语法规则的 4 个文件 SelectClauseParser.g, FromClauseParser.g,IdentifiersParser.g,HiveParser.g。
- syntax tree抽象树 --> Parse tree解析树 --> Operator Tree优化树
- 对 OperatorTree 进行逻辑优化(LogicalOptimizer);
- 将 OperatorTree 转换为 TaskTree(任务树);
- 对 TaskTree 进行物理优化(PhysicalOptimizer)。
之所以将这 4 个步骤写在一起,是因为这几个步骤在源码中存在于一个analyzeInternal方法中。
//*******************************compile 方法*******************************
private void compile(String command, boolean resetTaskIds, boolean deferClose) throws CommandProcessorResponse {
//HQL 生成 AST
ASTNode tree;
try {tree = ParseUtils.parse(command, ctx);
//进一步解析抽象语法树
sem.analyze(tree, ctx);
//*******************************analyze 方法*******************************
public void analyze(ASTNode ast, Context ctx) throws SemanticException {
initCtx(ctx); init(true);
analyzeInternal(ast); }
//*******************************analyzeInternal 方法*******************************
void analyzeInternal(ASTNode ast, PlannerContextFactory pcf) throws SemanticException {
LOG.info("Starting Semantic Analysis");
// 1-6. syntax tree --> Parse tree --> Operator Tree
//处理 AST(抽象语法树),转换为 QueryBlock
if (!genResolvedParseTree(ast, plannerCtx)) { return;
Operator sinkOp = genOPTree(ast, plannerCtx);
// 7. Perform Logical optimization:对操作树执行逻辑优化
//创建优化器
Optimizer optm = new Optimizer();
optm.setPctx(pCtx);
optm.initialize(conf);
//执行优化
pCtx = optm.optimize();
// 9. Optimize Physical op tree & Translate to target execution engine (MR,// TEZ..):执行物理优化
//compile 为抽象方法,对应的实现类分别为 MapReduceCompiler、TezCompiler 和SparkCompiler
compiler.compile(pCtx, rootTasks, inputs, outputs);
Execution执行器——提交任务并执行
- execute方法——获取MR临时工作目录
- launchTask方法 --> runSequential方法 --> executeTask方法
- execute 方法——定义Partitoner、Mapper、Reducer、实例化Job、提交Job
//*******************************execute 方法*******************************
private void execute() throws CommandProcessorResponse {
//1.构建任务:根据任务树构建 MrJob
setQueryDisplays(plan.getRootTasks());
int mrJobs = Utilities.getMRTasks(plan.getRootTasks()).size();
//2.启动任务
TaskRunner runner = launchTask(task, queryId, noName, jobname,jobs, driverCxt);
//打印结果中最后的 OK
if (console != null) { console.printInfo("OK");}
//*******************************launchTask方法*******************************
private TaskRunner launchTask(Task extends Serializable> tsk, String queryId, boolean noName, String jobname, int jobs, DriverContext cxt) throws HiveException {
tskRun.runSequential();
//*******************************runSequential 方法*******************************
public void runSequential() {
exitVal = tsk.executeTask(ss == null ? null : ss.getHiveHistory());
//*******************************executeTask方法*******************************
public int executeTask(HiveHistory hiveHistory) {
int retval = execute(driverContext);
//*******************************execute 方法*******************************
public int execute(DriverContext driverContext) {
//设置 MR 任务的 InputFormat、OutputFormat 等等这些 MRJob 的执行类
int ret = super.execute(driverContext);
//构建执行 MR 任务的命令
String isSilent = "true".equalsIgnoreCase(System .getProperty("test.silent")) ? "-nolog" : "";
String jarCmd = hiveJar + " " + ExecDriver.class.getName() + libJarsOption;
String cmdLine = hadoopExec + " jar " + jarCmd + " -plan " + planPath.toString() + " " + isSilent + " " + hiveConfArgs;
... ...
// Run ExecDriver in another JVM
executor = Runtime.getRuntime().exec(cmdLine, env, new File(workDir));



