RDD转换为DataFrame【反射/编程】

写在前面
主要是加载文件为RDD,再把RDD转换为DataFrame,进而使用DataFrame的API或Sql进行数据的方便操作

简单理解:DataFrame=RDD+Schema

贴代码

package february.sql

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

/**
  * Description:  ============Spark SQL支持两种不同的方法将现有RDD转换为Datasets数据集==============
  *
  *
  * (1) 反射 case class   前提:事先需要知道你的字段,字段类型
  * (2) 编程              事先不知道有哪几列
  *   ****  优先选择第一种 ****
  *
  * @Author: 留歌36
  * @Date: 2019/2/25 18:41
  */
object DataFrameRDDApp {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
                            .appName(this.getClass.getSimpleName)
                            .master("local[2]")
                            .getOrCreate()
    // 方法一:反射
//    inferReflection(spark)

    // 方法二:编程
    program(spark)

    spark.stop()

  }

  /**
    * 编程的方式
    * @param spark
    */
  private def program(spark: SparkSession) = {
    val textFile = spark.sparkContext.textFile("f:\infos.txt")

    val infoRdd = textFile.map(_.split(",")).map(line => Row(line(0).toInt, line(1), line(2).toInt))

    val structType = StructType(Array(
      StructField("id", IntegerType, true),
      StructField("name", StringType, true),
      StructField("age", IntegerType, true)))

    val DF =spark.createDataFrame(infoRdd, structType)
    DF.printSchema()

    DF.show()

  }


  /**
    * 反射的方式
    * @param spark
    */
  private def inferReflection(spark: SparkSession) = {
    // RDD ==> DataFrame  rdd.toDF()
    val textFile = spark.sparkContext.textFile("f:\infos.txt")
    // split()返回 String[]
    // 注意:需要导入隐式转换
    import spark.implicits._
    val infoDF = textFile.map(_.split(",")).map(line => Info(line(0).toInt, line(1), line(2).toInt)).toDF()

    // =====基于dataframe的API=======之后的就都是DataFrame 的操作了==============
    infoDF.show()

    infoDF.filter(infoDF.col("age") > 30).show()

    // ======基于SQL的API===========DataFrame 创建为一张表================
    infoDF.createOrReplaceTempView("infos")
    spark.sql("select * from infos where age > 30").show()
  }

  //类似java bean实体类
  // 反射的方式,将RDD的 每个字段 与 这里的实体类 进行一一映射
  case class Info(id: Int, name: String, age: Int)


}

更多相关小demo:每天一个程序:https://blog.csdn.net/liuge36/column/info/34094

原文地址:https://www.cnblogs.com/liuge36/p/10443968.html