spark 系列之三 RDD,DataFrame与DataSet之间的转化

本篇介绍 RDD,DataFrame与DataSet之间的转化

在Object中构建 SparkSession

object SparkRDD_DF {
  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder()
      .appName("DatasetAPP")
      .master("local")
      .getOrCreate()

    val rd = new Rdd2DF(sparkSession)
    rd.RDD2DF_1()
    rd.RDD2DF_2()
  }
}

第一种方式:

这种方式适合你再体验已经知道了数据的字段类型和字段名称的情况下。

需要先提前构建一个 Info的 case class

case class Info(id: Int, name: String, age: Int)

然后

  def RDD2DF_1():Unit ={
    //RDD ==> DataFrame 方式一
    val sparkSession:SparkSession = this.sparkSession
    import sparkSession.implicits._
    sparkSession.sparkContext.textFile("file:///D:/software_download/spark_text/people.txt")
      .map(_.split(","))
      .map(line => Info(line(0).toInt, line(1), line(2).toInt))
      .toDF()
      .show()
  }

第二种方式:

适合你提前不知道数据的字段,以及字段类型

  def RDD2DF_2():Unit ={
    //RDD ==> DataFrame 方式二
    val peopleRDD = sparkSession.sparkContext.textFile("file:///D:/software_download/spark_text/people.txt")
      .map(_.split(","))
      .map(line => Row(line(0).toInt, line(1), line(2).toInt))


    val structType =  StructType(Array(StructField("id", IntegerType, true),
      StructField("name", StringType, true),
      StructField("age", IntegerType, true)))

    sparkSession.createDataFrame(peopleRDD, structType).show()
  }

DataFrame 与DataSet之间的转化

object SparkDataset {
  case class Sales(transactionId:Int,customerId:Int,itemId:Int,amountPaid:Double)
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("SparkDataset").master("local").getOrCreate()
    //csv文件路径
    val path = "file:///D:/software_download/spark_text/sales.csv"
    //导入隐式转换
    import spark.implicits._
    //spark如何解析csv文件(外部数据源功能)
    val csv_dataframe = spark.read
                             .option("header","true")
                             .option("inferSchema","true").csv(path)

    //把csv_dataframe转换成Dataset
    //csv_dataframe.select("amountPaid","itemId").show()

    val csv_dataset = csv_dataframe.as[Sales]
    csv_dataset.map(line => (line.itemId,line.amountPaid)).show
    spark.stop()
  }
}

DataFrame和RDD 的简单使用,前面已经介绍过。

我们知道 通过DataFrame 可以方便的使用一些算子,以及把DataFrame注册成临时表,方便我们使用sql语句进行查询,大大降低了学习的成本

那么既生瑜,何生亮,既然有了DataFrame,那我们还要DataSet干嘛呢?

下一遍,我们深入的讲一下DataSet的出现的意义,以及使用场景:)

原文地址:https://www.cnblogs.com/suzhenxiang/p/14203026.html