一、Dataset
特定域对象中的强类型集合
// 1.DS=> RDD + schema val spark = SparkSession.builder().master("local[2]").appName("test").getOrCreate() val rdd = spark.sparkContext.parallelize(Seq(1,2,3)) // RDD[Int] 变成DS=> value 1 2 3 val rdd = spark.sparkContext.parallelize(Seq(("SZ",1),("SD",2))) // RDD[String,Int] 变成DS=> 无名二维 val rdd = spark.sparkContext.parallelize(Seq(UserInfo("SZ",1),UserInfo("SD",2))) // RDD[UserInfo] => 有名二维 import spark.implicits._ // 担心rdd不给你结构,没类名给你给类名 spark.createDataset(rdd).show(false)
1、createDataset()的参数可以是:Seq、Array、RDD
2、Dataset=RDD+Schema,所以Dataset与RDD有大部共同的函数,如map、filter等
二、DataFrame
DF => ROW + schema
如何生成DF?
1)使用RDD -> ()或case class -> toDF()
需要加上 import spark.implicits._
//toDF不需要ROW而是Tuple,可以重命名【推荐使用元组方式!】;
好处:可以定类型,可以重命名 ;坏处:一列一列写出来
rdd2.map(x=>{ val strs = x.split(",") (strs(0),strs(1).toInt) }).toDF("student","age").show(false)
// CASE CLASS不能重命名;
rdd2.map(x=>{ val strs = x.split(",") UserInfo(strs(0),strs(1).toInt) }).toDF().show(false)
2)最原始的方法:ROW+SCHEMA
// RDD1 val rdd3 = rdd2.map(x => { val strs = x.split(",") Row(strs(0), strs(1).toInt) }) // RDD2 val rowrdd = spark.sparkContext.parallelize(Seq( Row("no1", 4.5, 5.0, 0.0, 3.0, 0.0), Row("no2", 0.0, 5.0, 0.5, 0.0, 5.0), Row("no3", 2.5, 3.0, 0.0, 4.0, 0.0), Row("no4", 4.5, 1.0, 0.0, 3.0, 0.0), Row("no5", 0.0, 0.0, 0.0, 0.0, 2.0), Row("no6", 4.0, 5.0, 0.0, 0.0, 1.0) )) val schema = StructType(Seq( StructField("username",StringType,false), StructField("age",IntegerType,false) )) val df = spark.createDataFrame(rdd3,schema).show(false)
3) spark.read.format load
// 好处:不用一列一列的写出来;坏处:只能全String型
val orders = spark.read.format("csv").load("file:///D:/idea/ideaProjects/spark_projects/myspark8/src/main/scala/com/njbdqn/DSDF/orders.csv")
.toDF("orderid","order_date","userid","order_status")
4) 直接Seq.toDF
val spark = SparkSession.builder().master("local[*]").appName("app").getOrCreate() import spark.implicits._ val df = Seq(("no1", 4.5, 5.0, 0.0, 3.0, 0.0), ("no2", 0.0, 5.0, 0.5, 0.0, 5.0), ("no3", 2.5, 3.0, 0.0, 4.0, 0.0), ("no4", 4.5, 1.0, 0.0, 3.0, 0.0), ("no5", 0.0, 0.0, 0.0, 0.0, 2.0), ("no6", 4.0, 5.0, 0.0, 0.0, 1.0) ).toDF("userId", "apple", "banana", "orange", "watermelon", "melon")
如何应用?
//$"key1"这种写法对应的是select方法的Column参数类型重载 orderitem
//.select($"orderid",$"countprice".cast(DataTypes.DoubleType))
下面是sum(),则需要上面一行;如果是agg(sum()),不需要这样,所以最好使用agg方法! .groupBy("orderid").sum("countprice") .withColumnRenamed("sum(countprice)","cp") .join(orders,Seq("orderid"),"inner") // Seq 意在可以join on 多个字段 .join(custo,Seq("userid"),"inner") .orderBy(desc("cp")).limit(1) .select("orderid","cp","lname","fname") /** * +-------+------------------+--------+-----+ * |orderid|cp |lname |fname| * +-------+------------------+--------+-----+ * |68703 |3449.9100000000003|Victoria|Smith| * +-------+------------------+--------+-----+ */
>>> 关系如图: