Spark学习进度10-DS&DF基础操作

有类型操作

flatMap

通过 flatMap 可以将一条数据转为一个数组, 后再展开这个数组放入 Dataset

 val ds1=Seq("hello spark","hello hadoop").toDS()
    ds1.flatMap(item => item.split(" ")).show()

map

可以将数据集中每条数据转为另一种形式

val ds2=Seq(Person("zhangsan",15),Person("lisi",32)).toDS()
    ds2.map(p => Person(p.name,p.age*2)).show()

mapPartitions

mapPartitions 和 map 一样, 但是 map 的处理单位是每条数据, mapPartitions 的处理单位是每个分区

ds2.mapPartitions(item => {
      val persons = item.map(p => Person(p.name, p.age * 2))
      persons
    }).show()

transform

map 和 mapPartitions 以及 transform 都是转换, map 和 mapPartitions 是针对数据, 而 transform 是针对整个数据集, 这种方式最大的区别就是 transform 可以直接拿到 Dataset 进行操作

@Test
  def transform(): Unit ={
    val ds=spark.range(10)
    ds.transform(dataset => dataset.withColumn("doubleid",'id*2)).show()
  }

as

as[Type] 算子的主要作用是将弱类型的 Dataset 转为强类型的 Dataset,

@Test
  def as(): Unit ={
    val structType = StructType(
      Seq(
        StructField("name", StringType),
        StructField("age", IntegerType),
        StructField("gpa", FloatType)
      )
    )

    val sourceDF = spark.read
      .schema(structType)
      .option("delimiter", "	")
      .csv("dataset/studenttab10k")

    val dataset = sourceDF.as[(String,Int,Float)]
    dataset.show()
  }

filter

用来按照条件过滤数据集

@Test
  def filter(): Unit ={
    val ds=Seq(Person("zhangsan",15),Person("lisi",32)).toDS()
    ds.filter(person => person.age>20).show()
  }

groupByKey

grouByKey 算子的返回结果是 KeyValueGroupedDataset, 而不是一个 Dataset, 所以必须要先经过 KeyValueGroupedDataset 中的方法进行聚合, 再转回 Dataset, 才能使用 Action 得出结果

@Test
  def groupByKey(): Unit ={
    val ds=Seq(Person("zhangsan",15),Person("zhangsan",20),Person("lisi",32)).toDS()
    val grouped = ds.groupByKey(p => p.name)
    val result: Dataset[(String, Long)] = grouped.count()
    result.show()
  }

randomSplit

randomSplit 会按照传入的权重随机将一个 Dataset 分为多个 Dataset, 传入 randomSplit 的数组有多少个权重, 最终就会生成多少个 Dataset, 这些权重的加倍和应该为 1, 否则将被标准化

@Test
  def randomSplit(): Unit ={
    val ds = spark.range(15)
    val datasets: Array[Dataset[lang.Long]] = ds.randomSplit(Array[Double](2, 3))
    datasets.foreach(dataset => dataset.show())

    ds.sample(withReplacement = false, fraction = 0.4).show()
  }

orderBy

orderBy 配合 Column 的 API, 可以实现正反序排列

val ds=Seq(Person("zhangsan",15),Person("zhangsan",20),Person("lisi",32)).toDS()
    ds.orderBy('age.desc).show() //select * from .. order by  .. desc

sort

ds.sort('age.asc).show()

dropDuplicates

使用 dropDuplicates 可以去掉某一些列中重复的行

 @Test
  def dropDuplicates(): Unit ={
    val ds=Seq(Person("zhangsan",15),Person("zhangsan",20),Person("lisi",32)).toDS()
    ds.distinct().show()
    ds.dropDuplicates("age").show()
  }

distinct

根据所有列去重

ds.distinct().show()

集合操作

差集,交集,并集,limit

@Test
  def collection(): Unit ={
    val ds1=spark.range(1,10)
    val ds2=spark.range(5,15)

    //差集
    ds1.except(ds2).show()

    //交集
    ds1.intersect(ds2).show()

    //并集
    ds1.union(ds2).show()

    //limit
    ds1.limit(3).show()

  }

无类型转换

选择

select:选择某些列出现在结果集中

selectExpr :在 SQL 语句中, 经常可以在 select 子句中使用 count(age)rand() 等函数, 在 selectExpr 中就可以使用这样的 SQL 表达式, 同时使用 select 配合 expr 函数也可以做到类似的效果

@Test
  def select(): Unit ={

    val ds=Seq(Person("zhangsan",15),Person("zhangsan",20),Person("lisi",32)).toDS()
    ds.select('name).show()
    ds.selectExpr("sum(age)").show()

  }

withColumn:通过 Column 对象在 Dataset 中创建一个新的列或者修改原来的列

withColumnRenamed:修改列名

@Test
  def withcolumn(): Unit ={
    import org.apache.spark.sql.functions._
    val ds=Seq(Person("zhangsan",15),Person("zhangsan",20),Person("lisi",32)).toDS()
    ds.withColumn("random",expr("rand()")).show()
    ds.withColumn("name_new",'name).show()
    ds.withColumn("name_jok",'name === "").show()

    ds.withColumnRenamed("name","new_name").show()

  }

剪除

drop:减掉某列

@Test
  def drop(): Unit ={
    import spark.implicits._
    val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()
    ds.drop('age).show()
  }

集合

groupBy:按给定的行进行分组

 @Test
  def groupBy(): Unit ={
    import spark.implicits._
    val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()
    ds.groupBy('name).count().show()
  }

Column 对象

创建

val spark=SparkSession.builder().master("local[6]").appName("trans").getOrCreate()
  import spark.implicits._
  @Test
  def column(): Unit ={

    import spark.implicits._
    val personDF = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDF()

    val c1: Symbol ='name

    val c2: ColumnName =$"name"

    import org.apache.spark.sql.functions._
    val c3: Column =col("name")
    //val c4: Column =column("name")

    val c5: Column =personDF.col("name")

    val c6: Column =personDF.apply("name")

    val c7: Column =personDF("name")

    personDF.select(c1).show()
    personDF.where(c1 ==="zhangsan").show()

  }

别名和转换

@Test
  def as(): Unit ={
    val personDF = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDF()
    import org.apache.spark.sql.functions._
    //as 用法1:更换数据类型
    personDF.select(col("age").as[Long]).show()
    personDF.select('age.as[Long]).show()
    //as:用法二
    personDF.select(col("age").as("new_age")).show()
    personDF.select('age as 'new_age ).show()
  }

添加列

val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()
    //增加一列
    ds.withColumn("doubled",'age *2).show()

操作

like:模糊查询;isin:是否含有;sort:排序

//模糊查询
    ds.where('name like "zhang%").show()
    //排序
    ds.sort('age asc).show()
    //枚举判断
    ds.where('name isin("zhangsan","wangwu","wxlf")).show()

聚合

package cn.itcast.spark.sql

import org.apache.spark.sql.{RelationalGroupedDataset, SparkSession}
import org.apache.spark.sql.types.{DoubleType, IntegerType, StructField, StructType}
import org.junit.Test

class Aggprocessor {
  val spark=SparkSession.builder().master("local[6]").appName("trans").getOrCreate()
  import spark.implicits._
  @Test
  def groupBy(): Unit ={

     val schema = StructType(
      List(
        StructField("id", IntegerType),
        StructField("year", IntegerType),
        StructField("month", IntegerType),
        StructField("day", IntegerType),
        StructField("hour", IntegerType),
        StructField("season", IntegerType),
        StructField("pm", DoubleType)
      )
    )

    //读取数据集
    val sourceDF=spark.read
      .schema(schema)
      .option("header",value = true)
      .csv("dataset/beijingpm_with_nan.csv")

    //去掉pm为空的
    val clearDF=sourceDF.where('pm =!= Double.NaN)
    //分组
    val groupedDF: RelationalGroupedDataset = clearDF.groupBy('year, 'month)

    import org.apache.spark.sql.functions._
    //进行聚合
    groupedDF.agg(avg('pm) as("pm_avg"))
      .orderBy('pm_avg desc)
      .show()

    //方法二
    groupedDF.avg("pm")
      .select($"avg(pm)" as "pm_avg")
      .orderBy('pm_avg desc)
      .show()

    groupedDF.max("pm").show()
    groupedDF.min("pm").show()
    groupedDF.sum("pm").show()
    groupedDF.count().show()
    groupedDF.mean("pm").show()


  }

}

连接

无类型连接 join

@Test
  def join(): Unit ={
    val person = Seq((0, "Lucy", 0), (1, "Lily", 0), (2, "Tim", 2), (3, "Danial", 0))
      .toDF("id", "name", "cityId")

    val cities = Seq((0, "Beijing"), (1, "Shanghai"), (2, "Guangzhou"))
      .toDF("id", "name")

    val df=person.join(cities, person.col("cityId") === cities.col("id"))
      .select(person.col("id"),
        person.col("name"),
        cities.col("name") as "city")
    
      df.createOrReplaceTempView("user_city")
    spark.sql("select id ,name,city from user_city where city=='Beijing'")
      .show()
  }

连接类型

交叉连接:cross交叉连接就是笛卡尔积, 就是两个表中所有的数据两两结对

@Test
  def crossJoin(): Unit ={

    person.crossJoin(cities)
      .where(person.col("cityId") === cities.col("id"))
      .show()

    spark.sql("select p.id,p.name,c.name from person  p cross join cities c where  p.cityId = c.id")
      .show()

  }

内连接:就是按照条件找到两个数据集关联的数据, 并且在生成的结果集中只存在能关联到的数据

@Test
  def inner(): Unit ={
    person.join(cities,person.col("cityId")===cities.col("id"),joinType = "inner")
      .show()

    spark.sql("select p.id,p.name,c.name from person p inner join cities c on p.cityId=c.id").show()
  }

全外连接:

@Test
  def fullOuter(): Unit ={
     person.join(cities,person.col("cityId") === cities.col("id"),joinType = "full")
      .show()

    spark.sql("select p.id,p.name,c.name from person p full outer join cities c on p.cityId=c.id")
      .show()
  }

左外连接

 person.join(cities,person.col("cityId") === cities.col("id"),joinType = "left")
      .show()

右外连接

person.join(cities,person.col("cityId") === cities.col("id"),joinType = "right")
      .show()
原文地址:https://www.cnblogs.com/xiaofengzai/p/14273745.html