Spark开发-SparkSQL执行过程和SQL相关

SparkSQL执行的场景

1.一种是直接写sql语句,这个需要有元数据库支持,例如Hive等库
 2.另一种是通过Dataset/DataFrame编写Spark应用程序,即 通过通过Dataset/DataFrame提供的APIs进行计算

Spark的执行过程

0.用户在client端提交作业后,会由Driver运行main方法并创建spark context上下文
1.SparkContext  向集群资源管理器注册并申请资源,资源管理器分配资源,并启动Executor
2.SparkContext 根据Job构建基于Stage的DAG,DAG根据血缘的宽窄依赖划分Stage,并提交Stage给任务管理器TaskSchedule,生成由Task 
3.Executor经过SparkContext向TaskScheduler申请Task,并执行Task。Task执行结束后,释放资源
4.SparkContext收集结果给Driver,运行结束后

SparkSQL执行过程

SQL 转换为 RDD  
  1.逻辑计划阶段
   逻辑计划阶段会将用户所写的 SQL 语句转换成 逻辑算子树 
   分别对应未解析的逻辑算子树( Unresolved LogicalPlan ,仅仅是数据结构,不包含任何数据信息等)、
          解析后的逻辑算子树( Analyzed LogicalPlan ,节点中绑定各种信息)和
	      优化后的逻辑算子树( Optimized LogicalPlan ,应用各种优化规则对一些低效的逻辑计划进行转换) 。
  2.物理计划阶段也包含 3 个子阶段:
     首先,根据逻辑算子树,生成物理算子树的列表 Iterator[PhysicalPlan] (同样的逻辑算子树可能对应多个物理算子树);
     然后,从列表中按照一定的策略选取最优的物理算子树( SparkPlan);
     最后,对选取的物理算子树进行提交前的准备工作,例如,确保分区操作正确、物理算子树节点
     重用、执行代码生成等,得到“准备后”的物理算子树( Prepared SparkPlan )
  注意: SQL解析到执行之前的步骤,都是在Driver端完成
    逻辑计划阶段的详情
     1.:parser 基于Antlr 4 框架对 sql解析,生成抽象语法树。AST是源代码语法结构的一种抽象表示。它以树状的形式表现编程语言的语法结
     然后将antlr的tree转成LogicPlan 逻辑计划
 2.analyzer;通过分析器,结合Catalog,把logical plan和实际的数据绑定起来
 3.Optimizer:logical plan优化,基于规则的优化;优化规则参考Optimizer,优化执行器RuleExecutor

Spark编译过程

 逻辑计划
      Unresolved Logical Plan
      resolved logical plan
      Optimizated logical plan
 物理计划

Spark SQL 核心类

  上述不同阶段对应的
  SparkSglParser类、Analyzer类、Optimizer类和SparkPlanner类等,
  最后封装成一个QueryExecution对象
解析:源码结构
- Catalyst (sql/catalyst) - An implementation-agnostic framework for
  manipulating trees of relational operators and expressions.
   实现词法分析器Lexer 以及语法分析器Parser
   Antlr4         SQL-- AST
   AstBuilder  -- Unsolved  LogicalPlan
                  Analyzed  LogicalPlan
                  Optimized LogicalPlan
1- Execution (sql/core) - A query planner / execution engine for  translating Catalyst's logical query plans into Spark RDDs.  
   This component also includes a new public interface, SQLContext, that allows users to execute SQL or LINQ statements against existing RDDs and Parquet files.
2- Hive Support (sql/hive) - Includes an extension of SQLContext called HiveContext that 
allows users to write queries using a subset of HiveQL and access data from a Hive Metastore using Hive SerDes.
There are also wrappers that allows users to run queries that include Hive UDFs, UDAFs, and UDTFs.
3- HiveServer and CLI support (sql/hive-thriftserver) - Includes support for the SQL CLI (bin/spark-sql) and a HiveServer2 (for JDBC/ODBC) compatible server.
  ANTLR4自动生成Spark SQL语法解析器Java代码SqlBaseParser类 SqlBaseLexer和SqlBaseParser均是使用ANTLR 4自动生成的Java类。 使用这两个解析器将SQL语句解
  然后在parsePlan中,使用 AstBuilder( AstBuilder.scala)将ANTLR 4语法树结构转换成catalyst表达式逻辑计划logical plan

Catalyst

 Catalyst中涉及的重要概念和数据结构,主要包括InternalRow体系、TreeNode体系和Expression体系
  InternalRow 表示一行行的数据
  TreeNode体系:
     Expression: 表达式体系即是子类也是一个体系
     QueryPlan: LogicalPlan SparkPlan
  数据类型体系

logicalplan和SparkPlan

logical plan 分为BinaryNode LeafNode UnaryNode 以及其他
  UnaryNode: 一元节点,即只有一个子节点
  BinaryNode:二元节点,即有左右子节点的二叉节点
  LeafNode:  叶子节点,没有子节点的节点。
SparkPlan 根据操作的子节点类型主要分为三种trait: BinaryExecNode/LeafExecNode/UnaryExecNode 和其他类型plan
元数据和指标系统
whole stage codegen : 通过全流式代码生成技术在运行时动态生成代码,并尽量将所有的操作打包到一个函数中

SparkSQL Join类型

1.SparkSQL语法:
   Join的表情况:是否有重复数据,过滤等
   Join大致包括三个要素:  Join方式、Join条件 以及过滤条件
2.了解底层常用的Join算法以及各自的适用场景
SparkSQL支持三种Join算法-shuffle hash join、broadcast hash join以及sort merge join
   BroadcastHashJoin
   shuffleHashJoin
   SortMergeJoin
   其他: BroadcastNestedLoopJoin  CartesianProductJOin
hash join算法就来自于传统数据库,而shuffle和broadcast是分布式情况下的场景
     shuffle    :分别将两个表按照join key进行分区,将相同join key的记录重分布到同一节点,两张表的数据会被重分布到集群中所有节点
     broadcast  :将小表广播分发到大表所在的所有主机
SparkSQL对两张大表join采用了全新的算法-  sort-merge join
   shuffle- sort -merge
     spark的shuffle实现都适用sort-based shuffle算法,因此在经过shuffle之后partition数据都是按照key排序的

SparkSQL配置

  Spark.sql.antoBroadcastJoinThreshold
  spark.sql.join.preferSortMergeJOin
   ……
 读取数据:
   DataSource表 即Spark的表
    spark.sql.files.maxPartitionBytes
   Hive 表情况
    spark.sql.combine.hive.input.splites.enbale=true
    mapreduce.input.fileinputformart.split.maxsize
  落表:
     spark.sql.adaptive.repaartition.enable=true
  说明: limit运行反而很慢
        可以通过查看执行计划,这种情况多发生在分区表中,多是执行了在每个分区取数,然后在这些分区中再取限制的数据
      分布式的情况,有时候和理解的不一致,这时候通过分析执行计划,可以有效的减少 想当然和 实际上的分歧和错误

参考

 SparkSQL – 有必要坐下来聊聊Join http://hbasefly.com/2017/03/19/sparksql-basic-join/
原文地址:https://www.cnblogs.com/ytwang/p/13665200.html