Spark SQL(3) Parser到Unresolved LogicPlan

Spark SQL Parser到Unresolved LogicPlan

Spark SQL Parser简单来说就是将sql语句解析成为算子树的过程,在这个过程中,spark sql采用了antlr4来完成。

当执行spark.sql()方法时,会调用

Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
实际会调用:
/** Creates LogicalPlan for a given SQL string. */
override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
val tmp = astBuilder.visitSingleStatement(parser.singleStatement())
tmp match {
case plan: LogicalPlan => plan
case _ =>
val position = Origin(None, None)
throw new ParseException(Option(sqlText), "Unsupported SQL statement", position, position)
}
}
解析详细的操作如下:
protected def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
logDebug(s"Parsing command: $command")
val oo = CharStreams.fromString(command)
logInfo(s"Parsing command: $oo")

val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
lexer.removeErrorListeners()
lexer.addErrorListener(ParseErrorListener)

val tokenStream = new CommonTokenStream(lexer)
val parser = new SqlBaseParser(tokenStream)
parser.addParseListener(PostProcessor)
parser.removeErrorListeners()
parser.addErrorListener(ParseErrorListener)

try {
try {
// first, try parsing with potentially faster SLL mode
parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
toResult(parser)
}
catch {
case e: ParseCancellationException =>
// if we fail, parse with LL mode
tokenStream.seek(0) // rewind input stream
parser.reset()

// Try Again.
parser.getInterpreter.setPredictionMode(PredictionMode.LL)
toResult(parser)
}
}
catch {
case e: ParseException if e.command.isDefined =>
throw e
case e: ParseException =>
throw e.withCommand(command)
case e: AnalysisException =>
val position = Origin(e.line, e.startPosition)
throw new ParseException(Option(command), e.message, position, position)
}
}
}

      在这里面会利用antlr4来解析整个sql语句,首先会尝试使用比较快速的SLL方式来解析,如果失败会转而使用LL方式来解析。解析完成的sqlBaseParser会调用singleStatement()之后会构建整棵树

之后调用AstBuilder的visitSingleStatement来递归查看每个节点,来返回生成的LogicalPlan, 这一步,主要利用antlr4生成的代码,使用访问者模式来挨个查看各个节点的处理,返回对应的结果,其主要的实现是继承了SqlBaseBaseVisitor的AstBuild类中。

假设有一段sql如下:

SELECT * FROM NAME WHERE AGE > 10 

那么它经过antlr4解析之后的树结构如下:

在AstBuilder.visitSingleStatement方法中:

override def visitSingleStatement(ctx: SingleStatementContext): LogicalPlan = withOrigin(ctx) {
  val statement: StatementContext = ctx.statement
  printRuleContextInTreeStyle(statement, 1)
  visit(ctx.statement).asInstanceOf[LogicalPlan]
}

  首先输入是SingleStatementContext,实际也就是上面树形图里面的根节点,之后获取了根节点下面的StatementContext,由上图可知实际获取的就是StatementDefaultContext,再来看看他的accept方法

@Override
		public <T> T accept(ParseTreeVisitor<? extends T> visitor) {
			if ( visitor instanceof SqlBaseVisitor ) return ((SqlBaseVisitor<? extends T>)visitor).visitStatementDefault(this);
			else return visitor.visitChildren(this);
		}

  大多数的节点都是类似这样的实现,对某些特殊的节点例如FromClauseContext,在AstBuilder中有特殊的处理逻辑实现。所以AstBuilder解析整棵树都是通过遍历整棵树,形成logicalPlan。

在上面的整棵树里面在解析到QuerySpecification节点时,在这里面会触发形成logicalPlan的操作:

 /**
   * Create a logical plan using a query specification.
   */
  override def visitQuerySpecification(
      ctx: QuerySpecificationContext): LogicalPlan = withOrigin(ctx) {
    val from = OneRowRelation().optional(ctx.fromClause) {
      visitFromClause(ctx.fromClause)
    }
    withQuerySpecification(ctx, from)
  }

  在visitFromClause中会去查看FromClause节点下面的Relation,同时如果有join操作的话,也会在其中解析join操作,生成from子树;

      在from解析之后就是在withQuerySpecification中携带着from子树解析where,聚合、表达式等子节点形成一颗对整个sql解析之后的树结构。

/**
   * Add a query specification to a logical plan. The query specification is the core of the logical
   * plan, this is where sourcing (FROM clause), transforming (SELECT TRANSFORM/MAP/REDUCE),
   * projection (SELECT), aggregation (GROUP BY ... HAVING ...) and filtering (WHERE) takes place.
   *
   * Note that query hints are ignored (both by the parser and the builder).
   */
  private def withQuerySpecification(
      ctx: QuerySpecificationContext,
      relation: LogicalPlan): LogicalPlan = withOrigin(ctx) {
    import ctx._

    // WHERE
    def filter(ctx: BooleanExpressionContext, plan: LogicalPlan): LogicalPlan = {
      Filter(expression(ctx), plan)
    }

    // Expressions.
    val expressions = Option(namedExpressionSeq).toSeq
      .flatMap(_.namedExpression.asScala)
      .map(typedVisit[Expression])

    // Create either a transform or a regular query.
    val specType = Option(kind).map(_.getType).getOrElse(SqlBaseParser.SELECT)
    specType match {
      case SqlBaseParser.MAP | SqlBaseParser.REDUCE | SqlBaseParser.TRANSFORM =>
        // Transform

        // Add where.
        val withFilter = relation.optionalMap(where)(filter)

        // Create the attributes.
        val (attributes, schemaLess) = if (colTypeList != null) {
          // Typed return columns.
          (createSchema(colTypeList).toAttributes, false)
        } else if (identifierSeq != null) {
          // Untyped return columns.
          val attrs = visitIdentifierSeq(identifierSeq).map { name =>
            AttributeReference(name, StringType, nullable = true)()
          }
          (attrs, false)
        } else {
          (Seq(AttributeReference("key", StringType)(),
            AttributeReference("value", StringType)()), true)
        }

        // Create the transform.
        ScriptTransformation(
          expressions,
          string(script),
          attributes,
          withFilter,
          withScriptIOSchema(
            ctx, inRowFormat, recordWriter, outRowFormat, recordReader, schemaLess))

      case SqlBaseParser.SELECT =>
        // Regular select

        // Add lateral views.
        val withLateralView = ctx.lateralView.asScala.foldLeft(relation)(withGenerate)

        // Add where.
        val withFilter = withLateralView.optionalMap(where)(filter)

        // Add aggregation or a project.
        val namedExpressions = expressions.map {
          case e: NamedExpression => e
          case e: Expression => UnresolvedAlias(e)
        }
        val withProject = if (aggregation != null) {
          withAggregation(aggregation, namedExpressions, withFilter)
        } else if (namedExpressions.nonEmpty) {
          Project(namedExpressions, withFilter)
        } else {
          withFilter
        }

        // Having
        val withHaving = withProject.optional(having) {
          // Note that we add a cast to non-predicate expressions. If the expression itself is
          // already boolean, the optimizer will get rid of the unnecessary cast.
          val predicate = expression(having) match {
            case p: Predicate => p
            case e => Cast(e, BooleanType)
          }
          Filter(predicate, withProject)
        }

        // Distinct
        val withDistinct = if (setQuantifier() != null && setQuantifier().DISTINCT() != null) {
          Distinct(withHaving)
        } else {
          withHaving
        }

        // Window
        val withWindow = withDistinct.optionalMap(windows)(withWindows)

        // Hint
        hints.asScala.foldRight(withWindow)(withHints)
    }
  }

  在这里解析后的算子树是未绑定的树结构算是unresolved LogicPlan。

      总结,这一步的理解主要是spark中sqlBase.g4文件中的定义,然后就是对于观察者模式中生成的默认代码,其实大多数的代码规律都一样,之后就是AstBuild类中对某些节点逻辑重写;这里如果想仔细研究这步的话,可以参考上面,打开spark中的sqlBase.g4文件,然后在idea里面打开ANTLR Preview在里面输入sql,然后在sqlBase.g4文件里面选中singleExpression右击test Rule,然后看生成的树结构,其实对应的就是AstBuild树从上往下遍历的节点,这里面有的节点AstBuilder没有重写,直接遍历子节点,有的在AstBuilder里面有重写可以看看其具体逻辑,主要就是上面讲到的QuerySpecification、FromClauseContext、booleanDefault、nameExpression等。

原文地址:https://www.cnblogs.com/ldsggv/p/13380370.html