spark SQL之 DataFrame和DataSet

一、Dataset

特定域对象中的强类型集合 

 // 1.DS=> RDD + schema
    val spark = SparkSession.builder().master("local[2]").appName("test").getOrCreate()
     val rdd = spark.sparkContext.parallelize(Seq(1,2,3)) // RDD[Int] 变成DS=> value 1 2 3
     val rdd = spark.sparkContext.parallelize(Seq(("SZ",1),("SD",2)))  // RDD[String,Int] 变成DS=> 无名二维
    val rdd = spark.sparkContext.parallelize(Seq(UserInfo("SZ",1),UserInfo("SD",2))) // RDD[UserInfo] => 有名二维
    import spark.implicits._ // 担心rdd不给你结构,没类名给你给类名
    spark.createDataset(rdd).show(false)

1、createDataset()的参数可以是:Seq、Array、RDD

2、Dataset=RDD+Schema,所以Dataset与RDD有大部共同的函数,如map、filter等

二、DataFrame

DF => ROW + schema

如何生成DF?

1)使用RDD -> ()或case class -> toDF()

  需要加上 import spark.implicits._  

//toDF不需要ROW而是Tuple,可以重命名【推荐使用元组方式!】; 

   好处:可以定类型,可以重命名 ;坏处:一列一列写出来

rdd2.map(x=>{
      val strs = x.split(",")
      (strs(0),strs(1).toInt)
    }).toDF("student","age").show(false)
// CASE CLASS不能重命名;
rdd2.map(x=>{ val strs = x.split(",") UserInfo(strs(0),strs(1).toInt) }).toDF().show(false)

2)最原始的方法:ROW+SCHEMA

// RDD1
val rdd3 = rdd2.map(x => {
val strs = x.split(",")
Row(strs(0), strs(1).toInt)
})
// RDD2
val rowrdd = spark.sparkContext.parallelize(Seq(
Row("no1", 4.5, 5.0, 0.0, 3.0, 0.0),
Row("no2", 0.0, 5.0, 0.5, 0.0, 5.0),
Row("no3", 2.5, 3.0, 0.0, 4.0, 0.0),
Row("no4", 4.5, 1.0, 0.0, 3.0, 0.0),
Row("no5", 0.0, 0.0, 0.0, 0.0, 2.0),
Row("no6", 4.0, 5.0, 0.0, 0.0, 1.0)
))
val schema = StructType(Seq(
StructField("username",StringType,false),
StructField("age",IntegerType,false)
))
val df = spark.createDataFrame(rdd3,schema).show(false)

3) spark.read.format load

// 好处:不用一列一列的写出来;坏处:只能全String型
val orders = spark.read.format("csv").load("file:///D:/idea/ideaProjects/spark_projects/myspark8/src/main/scala/com/njbdqn/DSDF/orders.csv")
        .toDF("orderid","order_date","userid","order_status")

4) 直接Seq.toDF

val spark = SparkSession.builder().master("local[*]").appName("app").getOrCreate()
    import spark.implicits._
    val df = Seq(("no1", 4.5, 5.0, 0.0, 3.0, 0.0),
      ("no2", 0.0, 5.0, 0.5, 0.0, 5.0),
      ("no3", 2.5, 3.0, 0.0, 4.0, 0.0),
      ("no4", 4.5, 1.0, 0.0, 3.0, 0.0),
      ("no5", 0.0, 0.0, 0.0, 0.0, 2.0),
      ("no6", 4.0, 5.0, 0.0, 0.0, 1.0)
    ).toDF("userId", "apple", "banana", "orange", "watermelon", "melon")

如何应用?

//$"key1"这种写法对应的是select方法的Column参数类型重载
    orderitem
    //.select($"orderid",$"countprice".cast(DataTypes.DoubleType))
下面是sum(),则需要上面一行;如果是agg(sum()),不需要这样,所以最好使用agg方法! .groupBy(
"orderid").sum("countprice") .withColumnRenamed("sum(countprice)","cp") .join(orders,Seq("orderid"),"inner") // Seq 意在可以join on 多个字段 .join(custo,Seq("userid"),"inner") .orderBy(desc("cp")).limit(1) .select("orderid","cp","lname","fname") /** * +-------+------------------+--------+-----+ * |orderid|cp |lname |fname| * +-------+------------------+--------+-----+ * |68703 |3449.9100000000003|Victoria|Smith| * +-------+------------------+--------+-----+ */

>>> 关系如图:

原文地址:https://www.cnblogs.com/sabertobih/p/13742073.html