问题要从Hive Execution PreHook说起。有个需求是根据SQL中用到的table/partition总大小来判断用MR还是Spark。但在执行的时候,虽然确定prehook中已经将
hive.execution.engine
设置为spark,但执行的时候还是使用了默认的mr。
在hive源代码里全局搜
hive.exec.pre.hooks
, 可以找到入口在HiveConf的enum类中:PREEXECHOOKS("hive.exec.pre.hooks", "",
"Comma-separated list of pre-execution hooks to be invoked for each statement. \n" +
"A pre-execution hook is specified as the name of a Java class which implements the \n" +
"org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext interface."),
在
Driver
类里搜索“PREEXECHOOKS”,可以发现调用的方法在Driver.execute(). 在此处抛出stacktrace(手动throw exception然后catch打印stacktrace即可)应该能获得类似如下的信息(测试时是在Optimizer打印的stacktrace):2018-04-12T19:55:21,535 INFO [1df5090e-6c7f-4751-8979-d4fd3ef5024e HiveServer2-Handler-Pool: Thread-92] optimize
r.Optimizer: [FLAG_15_1] stack trace java.lang.RuntimeException: t
at org.apache.hadoop.hive.ql.optimizer.Optimizer.initialize(Optimizer.java:66)
at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:11246)
at org.apache.hadoop.hive.ql.parse.CalcitePlanner.analyzeInternal(CalcitePlanner.java:286)
at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:259)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:814)
at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1286)
at org.apache.hadoop.hive.ql.Driver.compileAndRespond(Driver.java:1265)
at org.apache.hive.service.cli.operation.SQLOperation.prepare(SQLOperation.java:204)
at org.apache.hive.service.cli.operation.SQLOperation.runInternal(SQLOperation.java:290)
at org.apache.hive.service.cli.operation.Operation.run(Operation.java:320)
at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementAsyncntInternal(HiveSessionImpl.java:530)
at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementAsync(HiveSessionImpl.java:517)
移步到Driver.compile:814, 找到关于semantic analyzer相关逻辑:
BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree);
List<HiveSemanticAnalyzerHook> saHooks =
getHooks(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK,
HiveSemanticAnalyzerHook.class);
// Flush the metastore cache. This assures that we don't pick up objects from a previous
// query running in this same thread. This has to be done after we get our semantic
// analyzer (this is when the connection to the metastore is made) but before we analyze,
// because at that point we need access to the objects.
Hive.get().getMSC().flushCache();
// Do semantic analysis and plan generation
if (saHooks != null && !saHooks.isEmpty()) {
HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl();
hookCtx.setConf(conf);
hookCtx.setUserName(userName);
hookCtx.setIpAddress(SessionState.get().getUserIpAddress());
hookCtx.setCommand(command);
hookCtx.setHiveOperation(queryState.getHiveOperation());
hookCtx.setQueryState(queryState);
hookCtx.setContext(ctx);
for (HiveSemanticAnalyzerHook hook : saHooks) {
tree = hook.preAnalyze(hookCtx, tree);
}
sem.analyze(tree, ctx);
hookCtx.update(sem);
for (HiveSemanticAnalyzerHook hook : saHooks) {
hook.postAnalyze(hookCtx, sem.getAllRootTasks());
}
} else {
sem.analyze(tree, ctx);
}
此时可以判断得出的是,在execution.prehook阶段,本身任务是mr还是spark早已确定,通过set conf肯定是无法实现的。那么此过程需要在semantic_analyzer.prehook阶段完成。但由于在sementic analyze步骤之前没有语法解析,所以没有input table/partition信息。这是解决的思路之一是,ch同构hive源码,将initial
sem.analyze(tree, ctx)
所需的两个参数传给semantic_analyzer.prehook,在里面先执行analyze()方法拿到input信息,从hive metastore拿到对应大小后再set conf从而决定使用mr还是spark
数据量大的使用spark,还是数据量小的时候使用spark。你的事根据数据量如何分的
ReplyDelete