hive 初始化运行流程

CliDriver 

初始化过程 

CliDriver.main  是 Cli 的入口 

(1) 解析(Parse)args,放入cmdLine,处理 –hiveconf var=val  用于增加或者覆盖hive/hadoop配置,设置到System的属性中。 
(2) 配置log4j,加载hive-log4j.properties里的配置信息。 
(3)创建一个HiveConf,设置hiveJar= hive-exec-0.6.0.jar ,初始化加载hive-default.xml、 hive-site.xml。 
(4) 创建一个CliSessionState(SessionState) 
(5) 处理-S, -e, -f, -h,-i等信息,保存在SessionState中。如果是-h,打印提示信息,并退出。 
(6) –hiveconf var=val 设置的属性设置到HiveConf中。 
(7) ShimLoader,load  HadoopShims 
(8) CliSessionState设置到SessionState中,创建一个hive_job_log_ xxx文件(用于记录Hive的一些操作信息)保存到SessionState的hiveHist 。 
(9) 创建CliDriver. 
(10)在接受hivesql命令前,执行一些初始化命令,这些命令存在文件中,文件可以通过-i选项设置,如果没有设置就去查找是否 有$HIVE_HOME/bin/.hiverc和System.getProperty("user.home")/.hiverc两个文件,如果有就 执行这两个文件中的命令。 
(11) 如果是–e,执行命令并退出,如果是-f,执行文件中的命令并退出。 
(12)创建ConsoleReader,读取用户输入,遇到“;”为一个完整的命令,执行该命令(CliDriver.processLine ),接着读取处理用户的输入。用户输入的命令记录在user.home/.hivehistory文件中。 


读取用户输入hivesql,处理运行过程 

CliDriver.processLine   去掉命令末尾的;, 

CliDriver.processCmd 

Split命令,分析第一个单词: 
(1)如果是quit或者exit,不区分大小写,退出。 
(2)source,执行文件中的HiveQL 
(3)!,执行命令,如!ls,列出当前目录的文件信息。 
(4)list,列出jar/file/archive。 
(5)如果是其他,则生成调用相应的CommandProcessor处理。 

CommandProcessor 

CommandProcessorFactory
 
(1)set           SetProcessor,设置修改参数,设置到SessionState的HiveConf里。 
(2)dfs           DfsProcessor,使用hadoop的 FsShell运行hadoop的命令。 
(3)add         AddResourceProcessor  添加到SessionState的resource_map里,运行提交job的时候会写入 Hadoop的Distributed Cache。 
(4)delete    DeleteResourceProcessor从SessionState的resource_map里删除。 
(5)其他       Driver 


Driver 
Driver.run(String command) // 处理一条命令 

int ret = compile(command);  // 分析命令,生成Task。 
ret = execute();  // 运行Task。 


(1)词法分析,生成AST树,ParseDriver完成。 
(2)分析AST树,AST拆分成查询子块,信息记录在QB,这个QB在下面几个阶段都需要用到,SemanticAnalyzer.doPhase1完成。 
(3)从metastore中获取表的信息,SemanticAnalyzer.getMetaData完成。 
(4)生成逻辑执行计划,SemanticAnalyzer.genPlan完成。 
(5)优化逻辑执行计划,Optimizer完成,ParseContext作为上下文信息进行传递。 
(6)生成物理执行计划,SemanticAnalyzer.genMapRedTasks完成。 
(7)物理计划优化,PhysicalOptimizer完成,PhysicalContext作为上下文信息进行传递。 
(8)执行生成的物理计划,获得结果。 
(1)~(7)在Driver的compile中完成。 
(8)在Driver的execute中完成,在执行阶段一个一个Task运行,不会改变物理计划。 
整个Hive代码架构还不够清晰,传递的上下文信息比较臃肿,比较难理解。
 


Driver.compile 

Driver.compile(String command) // 处理一条命令 

(1) Context 
      ctx = new Context(conf); // private Context ctx; Driver的一个字段变量 
(2) Parser(antlr):HiveQL->AbstractSyntaxTree(AST) 
      ParseDriver pd = new ParseDriver(); 
      ASTNode tree = pd.parse(command, ctx); 
(3) SemanticAnalyzer 
      BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(conf, tree); 
      // Do semantic analysis and plan generation 
      sem.analyze(tree, ctx); 
      // 说明:如果有SEMANTIC_ANALYZER_HOOK("hive.semantic.analyzer.hook",null)这个hook,那 么会在sem.analyze(tree, ctx);前执行hook.preAnalyze(hookCtx, tree);在sem.analyze(tree, ctx);后执行hook.postAnalyze(hookCtx, sem.getRootTasks(), sem.getFetchTask()); 这里的hook有多个 
(4) QueryPlan 
   plan = new QueryPlan(command, sem); 
(5) Schema 
   schema = getSchema(sem, conf); // / get the output schema 

Parser是: 
使用antlr,语法规则是 Hive.g 
ql/src/java目录下面的:org.apache.hadoop.hive.ql. ParseDriver 

SemanticAnalyzerFactory/SemanticAnalyzer 
多种SemanticAnalyzer: 
(1)ExplainSemanticAnalyzer(会调用SemanticAnalyzer获得相应信息) 
  explain 某条HiveSQL时调用 
  EXPLAIN [EXTENDED] query 
(2)LoadSemanticAnalyzer 
  LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename [PARTITION (partcol1=val1, partcol2=val2 ...)]  
(3)DDLSemanticAnalyzer 
SHOW TABLES、DROP TABLE、DESC TABLE等时 
(4)FunctionSemanticAnalyzer 
CREATE/DROP  FUNCTION 
(5)SemanticAnalyzer 
select 等 
(6)其他SemanticAnalyzer,Hive-0.6、0.7只有上面5种,trunk里面针对新功能新特性添加了相应的SemanticAnalyzer 


Driver.execute 

Driver.execute() // 运行命令生成的Task(一个或多个) 

    (1) Get all the pre execution hooks and execute them. 
    (2)  把root Tasks 加到 runnable队列 
    (3) 运行该SQL产生的Task  
        while (running.size() != 0 || runnable.peek() != null) { //task running队列不为空,或者runnable不为空。 
            while (runnable.peek() != null && running.size() < maxthreads) {//runnable队列不为空 
                   Task<? extends Serializable> tsk = runnable.remove();//删除runnable队列头的task 
                   launchTask(tsk, queryId, noName, running, jobname, jobs, driverCxt); //运行Task,如果打开了并发提交会通过新的线程去运行Task,否则就是主线程运行Task,直到Task运行完毕,把Task对应的 TaskResult和TaskRunner加入running队列 
             } 
  
             //从running队列中获取一个运行完的Task 
             TaskResult tskRes = pollTasks(running.keySet()); 
             TaskRunner tskRun = running.remove(tskRes); 
             Task<? extends Serializable> tsk = tskRun.getTask(); 

              int exitVal = tskRes.getExitVal(); //task完成的状态 
              if (exitVal != 0) { //Task失败 
                    获得task的backupTask 
                    有backup,把backup加入到runnable队列,没有就需要返回return 9;,表示HiveSQL运行失败。而不是System.exit(9); 
              } 
              
               // 把task的ChildTasks加入到runnable队列。 
        } 
   (4) Get all the post execution hooks and execute them. 

Driver.launchTask(Task<? extends Serializable> tsk, String queryId, boolean noName, 
      Map<TaskResult, TaskRunner> running, String jobname, int jobs, DriverContext cxt){ 

    tsk.initialize(conf, plan, cxt); // Task初始化 
    TaskResult tskRes = new TaskResult(); // task信息:是否成功执行,是否运行 
    TaskRunner tskRun = new TaskRunner(tsk, tskRes); 

    // Launch Task 
    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL) && tsk.isMapRedTask()) { //并发提交打开并且这个是MR task,在另一个线程中执行。 
      // Launch it in the parallel mode, as a separate thread only for MR tasks 
      tskRun.start(); 
    } else { 
      tskRun.runSequential(); // 主线程执行 
    } 
    running.put(tskRes, tskRun); // 放入running队列 


Task: 
(1) ConditionalTask 
(2) CopyTask 
(3) DDLTask 
(4) ExecDriver 
      (5) MapRedTask 
(6) ExplainTask 
(7) FetchTask 
(8) FunctionTask 
(9) MapredLocalTask 
(10) MoveTask   

核心之一:SemanticAnalyzer 
ql/src/java目录下面的: org.apache.hadoop.hive.ql.SemanticAnalyzer 
  
SemanticAnalyzer. analyzeInternal(ASTNode ast) 

        // analyze create table command 
        if (ast.getToken().getType() == HiveParser.TOK_CREATETABLE) { //带有create 
            isCreateTable = true; 
            // if it is not CTAS, we don't need to go further and just return 
            if ((child = analyzeCreateTable(ast, qb)) == null) { // create-table-as-select 返回查询子树 
regular create-table or create-table-like statements 返回null 
                 return; 
            } 
        } 

doPhase1(child, qb, initPhase1Ctx());//分析AST树 
  
getMetaData(qb); //从数据库中获得表的信息 
  
Operator sinkOp = genPlan(qb);// AST-〉operator trees 
  
Optimizer optm = new Optimizer(); 
pCtx = optm.optimize();// 优化  operator trees  -〉operator trees 
  
       // At this point we have the complete operator tree 
       // from which we want to find the reduce operator 
       genMapRedTasks(qb); // operator trees-〉MapReduce Tasks 


Hive原理分析: 
(1) 从HQL语句到AST的转化过程是很机械,使用ANTLR,根据Hive.g的语法分析规则,生成AST。
(2) 从AST转化到QB,再到DAG图不是那么很容易明白,所以需要理清楚一下。 
AST-〉QB就是把AST里面的一些信息和子查询分析出来,如所有涉及的表和表的别名(如果这条HQL查询语句中没有为表取别名,那么取别名为 表名)的对应关系保存到QB的aliasToTabs,目标表(目标表即输出的table)的子AST。where子句,select子句,join子 句,等一些子查询的AST分析出来,保存起来。 
QBMetaData是查询相关的元数据信息,如所有源表(源表即从哪些表取得输入数据)到该表的Table关联。表的Table用来记录 Table有哪些字段,各个字段的类型,表的分隔符等等信息。目标表名(存放输出结果的表)到表的Table的关联,目标表可以有多个,因为输出可能是写 入多个表。

QB-〉DAG图的转化过程。 
从QB生成operator,从生成的QB中的子查询生成Operator并保存记录它们之间的父子关系,还可能插入一些operator,这些operator是一些必要的辅助功能。 
后面需要对这个DAG图,即operator图进行拆分,生成一些mapreduce作业(job),如有一个map阶段可能有多个 operator,完成这些operator的功能,如某个Job的map执行多个operator,TableScanOperator是第一个 operator,从读取一个表的数据开始(一条一条记录,record),在接着可能就是跟据where生成的 operator(FilterOperator),过滤哪些不符合规则的记录(record,key/value),在接着是执行根据select生成 的Select Operator(该operator选择仅需要的字段,过滤无关的字段,从而减少中间数据),最后是一个Reduce Output Operator,该operator完成map的输出,生成中间key和value。 
作业的reduce也是可以执行多个operator的。 

从QB生成的Operator里面有父子关系,生成mapreduce时,会对这个具有父子关系的operator图进行切分,生成一个个阶段,有些阶段是mapreduce作业,这些作业执行多个operator的功能。 

ReduceSinkOperator是map的最后一个Operator,因为该operator需要生成一个map的输出,即输出key和输出value。 

生成Operator树的过程:SemanticAnalyzer.genPlan(QB qb) 
(1) 子查询必须有一个别名即alias,遍历所有的子查询,出现多个子查询在Join时出现,join两边的表都是来自子查询。 
(2) 遍历所有的源表,出现多个在join时出现。 
(3)处理join,在on条件中的过滤条件会推到join前即ReduceSinkOperator前,如果是一个join,那么先生成两个ReduceSinkOperator,然后再生成JoinOperator,join这两个表。 
下面的是在SemanticAnalyzer.genBodyPlan(QB qb, Operator input) 里面完成。 
(4) optimizeMultiGroupBy 
  (4.1)optimizeMultiGroupBy可以优化时走的路径跟下面的不相同。 
     (4.2)    对每个select进行处理,多个select出现在Multi-Group-By Inserts、Multi Table/File Inserts、Dynamic-partition Insert等情况下。multi_insert.q 
         
  
SemanticAnalyzer.doPhase1(ASTNode ast, QB qb, Phase1Ctx ctx_1) { 
     switch (ast.getToken().getType()) { 
         case HiveParser.TOK_SELECTDI: 
         case HiveParser.TOK_SELECT: 
            (1) 在QBParseInfo里保存select查询子节点,Map<String, ASTNode> destToSelExpr; 
            (2)有hint,在QBParseInfo保存hints子节点,ASTNode hints; 
            (3)处理ast子树的聚合函数,HiveParser.TOK_FUNCTION、TOK_FUNCTIONDI、TOK_FUNCTIONSTAR, 
            (4)处理select中column别名Map<ASTNode, String> exprToColumnAlias; 
            (5)保存聚合函数,QBParseInfo的LinkedHashMap<String, LinkedHashMap<String, ASTNode>> destToAggregationExprs; 
            (6)TOK_FUNCTIONDI,抽取保存distinct聚合函数,HashMap<String, List<ASTNode>> destToDistinctFuncExprs; 
         case HiveParser.TOK_WHERE: 
               在QBParseInfo里保存where查询子节点,HashMap<String, ASTNode> destToWhereExpr; 
         case HiveParser.TOK_DESTINATION: 
               在QBParseInfo里面保存目标地址子节点信息,HashMap<String, ASTNode> nameToDest; 
         case HiveParser.TOK_FROM: 
               只有一个子节点,有四种子节点 
                (1) 一种是表,数据来源于一个表。processTable,处理别名,没有别名表名就是别名。 
                (2)一种是子查询,数据来源于子查询,processSubQuery,子查询必须要有个别名,子查询可能是单独的一个query或者是两个query的union。子查询也是递归调用doPhase1来完成相关分析。 
                (3)一种是视图,数据来源于一个视图,processLateralView 
                (4)一种是Join,数据来源于几个表的join,processJoin,join子节点的孩子节点是两个或者三个,孩子节点可以是表、子查询、join子节点,保存join子查询ASTNode joinExpr; 
         case HiveParser.TOK_CLUSTERBY: 
                在QBParseInfo里保存cluster by查询子节点,HashMap<String, ASTNode> destToClusterby; 
         case HiveParser.TOK_DISTRIBUTEBY: 
               在QBParseInfo里保存distribute by查询子节点,有distribute by的时候不能有cluster by和order by,HashMap<String, ASTNode> destToDistributeby; 
         case HiveParser.TOK_SORTBY: 
               在QBParseInfo里保存sort by查询子节点,有sort by的时候不能有cluster by和order by,HashMap<String, ASTNode> destToSortby; 
         case HiveParser.TOK_ORDERBY: 
               在QBParseInfo里保存order by查询子节点,有order by的时候不能有cluster by,HashMap<String, ASTNode> destToOrderby; 
         case HiveParser.TOK_GROUPBY: 
               在QBParseInfo里保存group by查询子节点,HashMap<String, ASTNode> destToGroupby; 
         case HiveParser.TOK_LIMIT: 
               在QBParseInfo里保存limit查询子节点,HashMap<String, Integer> destToLimit; 
         case HiveParser.TOK_UNION: 
     } 
     
         if (!skipRecursion) { 
      // Iterate over the rest of the children 
      int child_count = ast.getChildCount(); 
      for (int child_pos = 0; child_pos < child_count; ++child_pos) { 
        // Recurse 
        doPhase1((ASTNode) ast.getChild(child_pos), qb, ctx_1);  //递归处理各个孩子节点
      } 
    } 


SemanticAnalyzer.getMetaData(QB qb) { 
         (1)从数据库中获取表的信息,这些表是记录在QB的HashMap<String, String> aliasToTabs;中 
         表的别名和对应的org.apache.hadoop.hive.ql.metadata.Table保存记录在QB的QBMetaData qbm;的HashMap<String, Table> aliasToTable;中。 
         (2)如果有子查询,递归调用getMetaData(QB qb)从数据库获取表的信息    
         (3)获取目的表的信息 
                   目的子节点存储在QBParseInfo的HashMap<String, ASTNode> nameToDest; 
                   目的节点有2种:(3.1)目的是表,表分分区表和非分区表(3.2)目的是本地目录或者hdfs目录,获得设置一个中间临时目录 


SemanticAnalyzer.genPlan(QB qb)
    (1)处理子查询,生成子查询的operator tree 
    (2)遍历source tables,记录保存在QB的HashMap<String, String> aliasToTabs;里面,对每个源表生成一个TableScanOperator,保存到SemanticAnalyzer的 HashMap<TableScanOperator, Table> topToTable;里 
    (3)处理视图 
    (4)处理join 
    (5)genBodyPlan,生成剩下的operator tree. 


SemanticAnalyzer.genBodyPlan(QB qb, Operator input) 
      (1)multi-group by优化 
      (2)遍历所有的destination tables,保存记录在QBParseInfo的Map<String, ASTNode> destToSelExpr;里,从select获得。 
               (2.1)有where语句生成FilterOperator,从QBParseInfo.destToWhereExpr里查询 
               (2.2)有group by或者聚合函数,根据相关配置生成相应operator tree. 
               (2.3)生成SelectOperator,选取相应字段,来自select语句 
               (2.4)有cluster by 或者distribute by或者order by或者sort by生成相应的ReduceSinkOperator和ExtractOperator,如果是order by设置reduce数为1 
               (2.5)分两种情况,qbp是子查询与qbp不是子查询 
                            (2.5.1)是子查询 
                            (2.5.2)不是子查询 
                                  有limit,生成相应的LimitOperator,这里需要分情况,是否需要两个MR 
                                  如果需要进行类型转换则生成相应的SelectOperator,生成FileSinkOperator 


Optimizer.optimize() 



SemanticAnalyzer.genMapRedTasks(QB qb) 



核心之二:MapRedTask 
TaskRunner: 
  public void runSequential() { 
    int exitVal = -101; 
    try { 
      exitVal = tsk.executeTask(); //运行Task.executeTask() 
    } catch (Throwable t) { 
      t.printStackTrace(); 
    } 
    result.setExitVal(exitVal); 
  } 
Task: 
  public int executeTask() { 
      int retval = execute(driverContext); //各个子类实现该方法 
  } 
  protected abstract int execute(DriverContext driverContext); 
这里介绍MapRedTask这个Task. 

MapRedTask: 
public int execute(DriverContext driverContext) { 
(1)  setNumberOfReducers(); // estimate number of reducers   推测reduce个数 
(2)  if (!ctx.isLocalOnlyExecutionMode() && 
          conf.getBoolVar(HiveConf.ConfVars.LOCALMODEAUTO)) { //HiveConf.ConfVars.HADOOPJT不是local,并且 LOCALMODEAUTO("hive.exec.mode.local.auto", true)打开 
//hive.exec.mode.local.auto用于小job自动转换为本地运行,should hive determine whether to run in local mode automatically 
           判断job能否本地运行,目前的判断条件是:(一)输入数据小于等于128M (二)map数小于等于4 (三) reduce数小于等于1,这3个条件都满足,该任务就在本地运行。 
         } 
(3)计算得到 runningViaChild 
      runningViaChild = 
        "local".equals(conf.getVar(HiveConf.ConfVars.HADOOPJT)) || 
        conf.getBoolVar(HiveConf.ConfVars.SUBMITVIACHILD); 
       //如果是本地运行或者通过子进程提交作业,runningViaChild为true 
      (3.1) 如果runningViaChild为false,super.execute(driverContext);  ExecDriver.execute完成task。 
      (3.2) 如果runningViaChild为true,通过子进程完成 
               executor = Runtime.getRuntime().exec(cmdLine, env, new File(workDir)); 
               子进程的入口main函数是ExecDriver.main() 



核心之三:ExecDriver 
/home/tianzhao/apache/hive-0.6.0/build/hadoopcore/hadoop-0.19.1/bin/hadoop jar  /home/tianzhao/apache/hive-0.6.0/build/ql/hive-exec-0.6.0.jar org.apache.hadoop.hive.ql.exec.ExecDriver -plan /tmp/hive-tianzhao/hive_2011-05-31_09-30-02_222_4000721282829102058/plan5577504731701425227.xml  -jobconf datanucleus.connectionPoolingType=DBCP 
使用hadoop jar  hive-exec-0.6.0.jar  org.apache.hadoop.hive.ql.exec.ExecDriver提交job给hadoop,作业的信息诸如operator等信息 序列化到了 -plan plan5577504731701425227.xml里面。 
ExecMapper、ExecReducer在configure(JobConf job)运行的时候会反序列化出来。 
hive提交给hadoop的MapReduce作业,map阶段运行ExecMapper,reduce阶段运行ExecReducer。 

add jar/add file/add archive,这些archive、jar和文件会写入 Distributed Cache里面。在MapTask和ReduceTask运行的时候读取调用。  写入Distributed Cache参考ExecDriver。 

ExecDriver使用JobClient提交Job后,定期查看Job的进展情况,Job完成后,调用operator的jobClose()方法。 


hive.exec.plan 
org.apache.hadoop.hive.ql.exec.Utilities.setMapRedWork() 会设置plan的ID 

ExecDriver.execute(DriverContext driverContext) { 
    (1) 创建ScratchDir目录 
    (2) 设置mapper类,reducer类等等 
           job.setMapperClass(ExecMapper.class); 
    (3) 如果有MapredLocalWork,并且不是localMode,那么上传文件到HDFS,该文件加入DistributedCache。场景用于 auto map join,auto map join会产生两个Task:MapredLocalTask+MapRedTask。 MapredLocalTask将小表的数据从hdfs fetch下来,put到一个HashTable,写入到本地的一个文件中。在MapRedTask中把本地的这个文件写入hdfs,add到 DistributedCache,就是当前的部分。这里的小表写入HashTable合并相同的key,只需要在client端做一次,在map端只需 要读取使用即可。加入DistributedCache是为了一个TaskTracker多次运行MapTask使用到一个文件时不需要多次下载,只需一 次下载即可。 
  (4)MapredWork写入hdfs,副本数设置为10,加入DistributedCache 
          Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI()); 
   (5)创建JobClient 
          JobClient jc = new JobClient(job); 
   (6)运行PrejobHooks 
          runPreJobHooks();  // Call the Pre-job hooks' list 
    (7)提交job 
         orig_rj = rj = jc.submitJob(job); 
     (8)定时检测job时候运行完成 
         private void progress(ExecDriverTaskHandle th) throws IOException { 
                 while (!rj.isComplete()) { 
                        Thread.sleep(pullInterval); // pullInterval默认是 1000L,HIVECOUNTERSPULLINTERVAL("hive.exec.counters.pull.interval", 1000L),通过hive.exec.counters.pull.interval可以设置 
                        updateCounters(th); //获取更新进度信息 
                        String report = " " + getId() + " map = " + mapProgress + "%,  reduce = " + reduceProgress 
          + "%";  // 打印这行进度信息 
                 } 
                 // while循环外,job已经结束 
                 runPostJobHooks(rj); //运行PostJobHooks 
         } 
      (9)清理操作 
      (10)运行Operator的jobClose方法 
            for (Operator<? extends Serializable> op : work.getAliasToWork().values()) { 
                  op.jobClose(job, success, feedBack); 
            } 
            work.getReducer().jobClose(job, success, feedBack); 
      (11)return (returnVal); 返回 



ExecDriver.main( ) 
两个地方调用 
(1)MapRedTask   : 
           在本地执行或者通过子进程提交两种方式下会调用。 
           ExecDriver.main(String[] args) { 
               } else { 
                     MapredWork plan = Utilities.deserializeMapRedWork(pathData, conf); 
                     ExecDriver ed = new ExecDriver(plan, conf, isSilent); 
                     ret = ed.execute(new DriverContext()); 
               } 
           } 
(2)MapredLocalTask  : 
           启动子进程执行 
           ExecDriver.main(String[] args) { 
                     if (localtask) { 
                         memoryMXBean = ManagementFactory.getMemoryMXBean(); 
                         MapredLocalWork plan = Utilities.deserializeMapRedLocalWork(pathData, conf); 
                         MapredLocalTask ed = new MapredLocalTask(plan, conf, isSilent); 
                         ret = ed.executeFromChildJVM(new DriverContext());  // 从hdfs上面获取小表数据,写到HashTable中,然后dump到本地的一个文件。 
                     } 
           } 

......
原文地址:https://www.cnblogs.com/java20130722/p/3206979.html