SparkSQL的反射机制和自定义创建DataFrame

反射机制

1.RDD[Person]-----(case:反射机制)------>DataFrameF[ROW]---->DataSet[Person]
  RDD DF DS
    Person ["name","age","address"] {Person:("name","age","address")}
    Person ["name","age","address"] {Person:("name","age","address")}
    Person ["name","age","address"] {Person:("name","age","address")}
    Person ["name","age","address"] {Person:("name","age","address")}
    Person ["name","age","address"] {Person:("name","age","address")}
2.RDD-->DataFrame-->DataSet
  a.RDD-->DataFrame: sparksession.createDataFrame
  b.RDD-->DataSet: sparksession.createDataSet
  c.DF,DS-->RDD: DF.rdd-->RDD[ROW];DS.rdd-->RDD[Person]
  d.DataFrame-->DataSet: sparksession.createDataSet(df.rdd)
  e.DataSet-->Datafrmae: DS.toDF()

自定义创建DataFrame

  总共分3步:

    1.从原来的RDD创建一个Row格式的RDD

    2.创建与RDD中Rows结构匹配的StructType,通过该StructType创建表示RDD的Schema
    3.通过SparkSession提供的createDataFrame方法创建DataFrame,方法参数为RDD的Schema

  案例:

def main(args: Array[String]): Unit = {
        val sparksession = SparkSession.builder().appName("sparkSQL").master("local").getOrCreate()
        import sparksession.implicits._
        val rdd = sparksession.sparkContext.textFile("file:///d:/测试数据/users.txt")
        //step1:从原来的RDD创建一个Row格式的RDD
        val rdd_row = rdd.map(x=>x.split(" ")).map(x=>Row(x(0),x(1).toInt,x(2)))
        //step2:创建与RDD中Rows结构匹配的StructType,通过该StructType创建表示RDD的Schema
    //    val schemaString = "name age address"
    //    // Generate the schema based on the string of schema
    //    val fields = schemaString.split(" ")
    //      .map(fieldName => StructField(fieldName, StringType, nullable = true))
        val fields = List(
                StructField("name", StringType, nullable = true),
                StructField("age", IntegerType, nullable = true),
                StructField("address", StringType, nullable = true)
                 )
        val schema = StructType(fields)
        //step3.通过SparkSession提供的createDataFrame方法创建DataFrame,方法参数为RDD的Schema
        val rdd_df = sparksession.createDataFrame(rdd_row,schema)
        rdd_df.show
      }
SparkSQL的执行流程

  1.SQL执行过程

    select f1,f2,f3 from table_name where condition

    Step1-Parse(解析):
      首先,根据SQL语法搜素关键字(select、from、where、group by等等),标志出projection、DataSource、filter
    Step2-Bind(绑定):
      通过解析阶段的相关内容(projection、DataSource、filter),校验DataSource、filed合法性;如果校验失败,抛异常。
    Step3-optimize(优化):
      通过数据库对当前DataSource进行的统计数据分析,执行相应的优化措施。
    Step3-Execute(执行):
      开启物理执行,将逻辑计划转化为相对应的Task。

  2.执行计划实质:看做成tree(树),树节点上通过Rule对象保存节点信息。

      SparkSQL tree节点分一个几类:

        a.一元节点:filter、count等
        b.二元节点:join等
        c.叶子节点:加载外部数据等;

原文地址:https://www.cnblogs.com/lyr999736/p/10224676.html