Monday, April 16, 2018

Hive Hook遇到的坑儿和解决思路

问题要从Hive Execution PreHook说起。有个需求是根据SQL中用到的table/partition总大小来判断用MR还是Spark。但在执行的时候,虽然确定prehook中已经将hive.execution.engine设置为spark,但执行的时候还是使用了默认的mr。
在hive源代码里全局搜hive.exec.pre.hooks, 可以找到入口在HiveConf的enum类中:
PREEXECHOOKS("hive.exec.pre.hooks", "",
    "Comma-separated list of pre-execution hooks to be invoked for each statement. \n" +
    "A pre-execution hook is specified as the name of a Java class which implements the \n" +
    "org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext interface."),
Driver类里搜索“PREEXECHOOKS”,可以发现调用的方法在Driver.execute(). 在此处抛出stacktrace(手动throw exception然后catch打印stacktrace即可)应该能获得类似如下的信息(测试时是在Optimizer打印的stacktrace):
2018-04-12T19:55:21,535 INFO [1df5090e-6c7f-4751-8979-d4fd3ef5024e HiveServer2-Handler-Pool: Thread-92] optimize
r.Optimizer: [FLAG_15_1] stack trace java.lang.RuntimeException: t
at org.apache.hadoop.hive.ql.optimizer.Optimizer.initialize(Optimizer.java:66)
at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:11246)
at org.apache.hadoop.hive.ql.parse.CalcitePlanner.analyzeInternal(CalcitePlanner.java:286)
at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:259)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:814)
at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1286)
at org.apache.hadoop.hive.ql.Driver.compileAndRespond(Driver.java:1265)
at org.apache.hive.service.cli.operation.SQLOperation.prepare(SQLOperation.java:204)
at org.apache.hive.service.cli.operation.SQLOperation.runInternal(SQLOperation.java:290)
at org.apache.hive.service.cli.operation.Operation.run(Operation.java:320)
at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementAsyncntInternal(HiveSessionImpl.java:530)
at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementAsync(HiveSessionImpl.java:517)
移步到Driver.compile:814, 找到关于semantic analyzer相关逻辑:
      BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree);
      List<HiveSemanticAnalyzerHook> saHooks =
          getHooks(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK,
              HiveSemanticAnalyzerHook.class);

      // Flush the metastore cache.  This assures that we don't pick up objects from a previous
      // query running in this same thread.  This has to be done after we get our semantic
      // analyzer (this is when the connection to the metastore is made) but before we analyze,
      // because at that point we need access to the objects.
      Hive.get().getMSC().flushCache();

      // Do semantic analysis and plan generation
      if (saHooks != null && !saHooks.isEmpty()) {
        HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl();
        hookCtx.setConf(conf);
        hookCtx.setUserName(userName);
        hookCtx.setIpAddress(SessionState.get().getUserIpAddress());
        hookCtx.setCommand(command);
        hookCtx.setHiveOperation(queryState.getHiveOperation());
        hookCtx.setQueryState(queryState);
        hookCtx.setContext(ctx);
        for (HiveSemanticAnalyzerHook hook : saHooks) {
          tree = hook.preAnalyze(hookCtx, tree);
        }
        sem.analyze(tree, ctx);
        hookCtx.update(sem);
        for (HiveSemanticAnalyzerHook hook : saHooks) {
          hook.postAnalyze(hookCtx, sem.getAllRootTasks());
        }
      } else {
        sem.analyze(tree, ctx);
      }
此时可以判断得出的是,在execution.prehook阶段,本身任务是mr还是spark早已确定,通过set conf肯定是无法实现的。那么此过程需要在semantic_analyzer.prehook阶段完成。但由于在sementic analyze步骤之前没有语法解析,所以没有input table/partition信息。这是解决的思路之一是,ch同构hive源码,将initial sem.analyze(tree, ctx)所需的两个参数传给semantic_analyzer.prehook,在里面先执行analyze()方法拿到input信息,从hive metastore拿到对应大小后再set conf从而决定使用mr还是spark

1 comment:

  1. 数据量大的使用spark,还是数据量小的时候使用spark。你的事根据数据量如何分的

    ReplyDelete