有一篇文章,讲 Hook. 【HIVE】Hook(钩子)函数从入门到放弃
HookRunnerHookRunner 定义了所有的hook 和所有 hook 的定义。
在 initialize 方法里,定义所有的 hook.
public void initialize() {
if (initialized) {
return;
}
initialized = true;
queryHooks.addAll(loadHooksFromConf(HiveConf.ConfVars.HIVE_QUERY_LIFETIME_HOOKS, QueryLifeTimeHook.class));
saHooks.addAll(loadHooksFromConf(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK, HiveSemanticAnalyzerHook.class));
driverRunHooks.addAll(loadHooksFromConf(HiveConf.ConfVars.HIVE_DRIVER_RUN_HOOKS, HiveDriverRunHook.class));
preExecHooks.addAll(loadHooksFromConf(HiveConf.ConfVars.PREEXECHOOKS, ExecuteWithHookContext.class));
postExecHooks.addAll(loadHooksFromConf(HiveConf.ConfVars.POSTEXECHOOKS, ExecuteWithHookContext.class));
onFailureHooks.addAll(loadHooksFromConf(HiveConf.ConfVars.ONFAILUREHOOKS, ExecuteWithHookContext.class));
if (conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED)) {
queryHooks.add(new MetricsQueryLifeTimeHook());
}
}
QueryLifeTimeHook
定义了四个操作。
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface QueryLifeTimeHook extends Hook {
void beforeCompile(QueryLifeTimeHookContext ctx);
void afterCompile(QueryLifeTimeHookContext ctx, boolean hasError);
void beforeExecution(QueryLifeTimeHookContext ctx);
void afterExecution(QueryLifeTimeHookContext ctx, boolean hasError);
}
HiveSemanticAnalyzerHook
定义了两个操作。
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface HiveSemanticAnalyzerHook extends Hook {
public ASTNode preAnalyze(
HiveSemanticAnalyzerHookContext context,
ASTNode ast) throws SemanticException;
public void postAnalyze(
HiveSemanticAnalyzerHookContext context,
List> rootTasks) throws SemanticException;
}
ReExecDriver
ReExecDriver 增加了一个 SemanticAnalyzerHook。
public ReExecDriver(QueryState queryState, String userName, QueryInfo queryInfo,
ArrayList plugins) {
this.queryState = queryState;
coreDriver = new Driver(queryState, userName, queryInfo, null);
coreDriver.getHookRunner().addSemanticAnalyzerHook(new HandleReOptimizationExplain());
this.plugins = plugins;
for (IReExecutionPlugin p : plugins) {
p.initialize(coreDriver);
}
}
DriverFactory
public static IDriver newDriver(QueryState queryState, String userName, QueryInfo queryInfo) {
boolean enabled = queryState.getConf().getBoolVar(ConfVars.HIVE_QUERY_REEXECUTION_ENABLED);
if (!enabled) {
return new Driver(queryState, userName, queryInfo);
}
String strategies = queryState.getConf().getVar(ConfVars.HIVE_QUERY_REEXECUTION_STRATEGIES);
strategies = Strings.nullToEmpty(strategies).trim().toLowerCase();
ArrayList plugins = new ArrayList<>();
for (String string : strategies.split(",")) {
if (string.trim().isEmpty()) {
continue;
}
plugins.add(buildReExecPlugin(string));
}
return new ReExecDriver(queryState, userName, queryInfo, plugins);
}
HIVE_QUERY_REEXECUTION_ENABLED("hive.query.reexecution.enabled", true,
"Enable query reexecutions"),
HIVE_QUERY_REEXECUTION_STRATEGIES("hive.query.reexecution.strategies", "overlay,reoptimize",
"comma separated list of plugin can be used:n"
+ " overlay: hiveconf subtree 'reexec.overlay' is used as an overlay in case of an execution errors outn"
+ " reoptimize: collects operator statistics during execution and recompile the query after a failure"),
DriverFactory.buildReExecPlugin
private static IReExecutionPlugin buildReExecPlugin(String name) throws RuntimeException {
if (name.equals("overlay")) {
return new ReExecutionOverlayPlugin();
}
if (name.equals("reoptimize")) {
return new ReOptimizePlugin();
}
throw new RuntimeException(
"Unknown re-execution plugin: " + name + " (" + ConfVars.HIVE_QUERY_REEXECUTION_STRATEGIES.varname + ")");
}
ReExecutionOverlayPlugin
ReExecutionOverlayPlugin add OnFailureHook
@Override
public void initialize(Driver driver) {
this.driver = driver;
driver.getHookRunner().addOnFailureHook(new LocalHook());
HiveConf conf = driver.getConf();
subtree = conf.subtree("reexec.overlay");
}
ReOptimizePlugin
ReOptimizePlugin add 4 hooks.
@Override
public void initialize(Driver driver) {
coreDriver = driver;
coreDriver.getHookRunner().addOnFailureHook(new LocalHook());
statsReaderHook = new OperatorStatsReaderHook();
coreDriver.getHookRunner().addOnFailureHook(statsReaderHook);
coreDriver.getHookRunner().addPostHook(statsReaderHook);
alwaysCollectStats = driver.getConf().getBoolVar(ConfVars.HIVE_QUERY_REEXECUTION_ALWAYS_COLLECT_OPERATOR_STATS);
statsReaderHook.setCollectOnSuccess(alwaysCollectStats);
coreDriver.setStatsSource(StatsSources.getStatsSource(driver.getConf()));
}



