spark学习进度20(column对象、缺省值处理)

column对象:

分类操作解释

创建

'

单引号 ' 在 Scala 中是一个特殊的符号, 通过 ' 会生成一个 Symbol 对象, Symbol 对象可以理解为是一个字符串的变种, 但是比字符串的效率高很多, 在 Spark 中, 对 Scala 中的 Symbol 对象做了隐式转换, 转换为一个 ColumnName 对象, ColumnName 是 Column 的子类, 所以在 Spark 中可以如下去选中一个列

val spark = SparkSession.builder().appName("column").master("local[6]").getOrCreate()
import spark.implicits._
val personDF = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()

val c1: Symbol = 'name

$

同理, $ 符号也是一个隐式转换, 同样通过 spark.implicits 导入, 通过 $ 可以生成一个 Column 对象

val spark = SparkSession.builder().appName("column").master("local[6]").getOrCreate()
import spark.implicits._
val personDF = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()

val c2: ColumnName = $"name"

col

SparkSQL 提供了一系列的函数, 可以通过函数实现很多功能, 在后面课程中会进行详细介绍, 这些函数中有两个可以帮助我们创建 Column 对象, 一个是 col, 另外一个是 column

val spark = SparkSession.builder().appName("column").master("local[6]").getOrCreate()
import org.apache.spark.sql.functions._
val personDF = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()

val c3: sql.Column = col("name")

column

val spark = SparkSession.builder().appName("column").master("local[6]").getOrCreate()
import org.apache.spark.sql.functions._
val personDF = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()

val c4: sql.Column = column("name")

Dataset.col

前面的 Column 对象创建方式所创建的 Column 对象都是 Free 的, 也就是没有绑定任何 Dataset, 所以可以作用于任何 Dataset, 同时, 也可以通过 Dataset 的 col 方法选择一个列, 但是这个 Column 是绑定了这个 Dataset 的, 所以只能用于创建其的 Dataset 上

val spark = SparkSession.builder().appName("column").master("local[6]").getOrCreate()
val personDF = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()

val c5: sql.Column = personDF.col("name")

Dataset.apply

可以通过 Dataset 对象的 apply 方法来获取一个关联此 Dataset 的 Column 对象

val spark = SparkSession.builder().appName("column").master("local[6]").getOrCreate()
val personDF = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()

val c6: sql.Column = personDF.apply("name")

apply 的调用有一个简写形式

val c7: sql.Column = personDF("name")

别名和转换

as[Type]

as 方法有两个用法, 通过 as[Type] 的形式可以将一个列中数据的类型转为 Type 类型

personDF.select(col("age").as[Long]).show()

as(name)

通过 as(name) 的形式使用 as 方法可以为列创建别名

personDF.select(col("age").as("age_new")).show()

添加列

withColumn

通过 Column 在添加一个新的列时候修改 Column 所代表的列的数据

personDF.withColumn("double_age", 'age * 2).show()

操作

like

通过 Column 的 API, 可以轻松实现 SQL 语句中 LIKE 的功能

personDF.filter('name like "%zhang%").show()

isin

通过 Column 的 API, 可以轻松实现 SQL 语句中 ISIN 的功能

personDF.filter('name isin ("hello", "zhangsan")).show()

sort

在排序的时候, 可以通过 Column 的 API 实现正反序

personDF.sort('age.asc).show()
personDF.sort('age.desc).show()
 
 
// 1. 创建 spark 对象
  val spark = SparkSession.builder()
    .master("local[6]")
    .appName("column")
    .getOrCreate()

  import spark.implicits._

  @Test
  def creation(): Unit = {
    val ds: Dataset[Person] = Seq(Person("zhangsan", 15), Person("lisi", 10)).toDS()
    val ds1: Dataset[Person] = Seq(Person("zhangsan", 15), Person("lisi", 10)).toDS()
    val df: DataFrame = Seq(("zhangsan", 15), ("lisi", 10)).toDF("name", "age")

    // 2. ' 必须导入spark的隐式转换才能使用 str.intern()
    val column: Symbol = 'name

    // 3. $ 必须导入spark的隐式转换才能使用
    val column1: ColumnName = $"name"

    // 4. col 必须导入 functions
    import org.apache.spark.sql.functions._

    val column2: sql.Column = col("name")

    // 5. column 必须导入 functions
    val column3: sql.Column = column("name")

    // 这四种创建方式, 有关联的 Dataset 吗?

    ds.select(column).show()

    // Dataset 可以, DataFrame 可以使用 Column 对象选中行吗?
    df.select(column).show()

    // select 方法可以使用 column 对象来选中某个列, 那么其他的算子行吗?
    df.where(column === "zhangsan").show()

    // column 有几个创建方式, 四种
    // column 对象可以用作于 Dataset 和 DataFrame 中
    // column 可以和命令式的弱类型的 API 配合使用 select where

    // 6. dataset.col
    // 使用 dataset 来获取 column 对象, 会和某个 Dataset 进行绑定, 在逻辑计划中, 就会有不同的表现
    val column4: sql.Column = ds.col("name")
    val column5: sql.Column = ds1.col("name")

    // 这会报错
//    ds.select(column5).show()

    // 为什么要和 dataset 来绑定呢?
//    ds.join(ds1, ds.col("name") === ds1.col("name"))

    // 7. dataset.apply
    val column6: sql.Column = ds.apply("name")
    val column7: sql.Column = ds("name")
  }

 

@Test
  def as(): Unit = {
    val ds: Dataset[Person] = Seq(Person("zhangsan", 15), Person("lisi", 10)).toDS()

    // select name, count(age) as age from table group by name
    ds.select('name as "new_name").show()

    ds.select('age.as[Long]).show()
  }

 

@Test
  def api(): Unit = {
    val ds: Dataset[Person] = Seq(Person("zhangsan", 15), Person("lisi", 10)).toDS()

    // 需求一, ds 增加列, 双倍年龄
    // 'age * 2 其实本质上就是将一个表达式(逻辑计划表达式) 附着到 column 对象上
    // 表达式在执行的时候对应每一条数据进行操作
    ds.withColumn("doubled", 'age * 2).show()

    // 需求二, 模糊查询
    // select * from table where name like zhang%
    ds.where('name like "zhang%").show()

    // 需求三, 排序, 正反序
    ds.sort('age asc).show()

    // 需求四, 枚举判断
    ds.where('name isin ("zhangsan", "wangwu", "zhaoliu")).show()
  }

 

 

 

缺省值处理:

缺失值的处理思路

如果想探究如何处理无效值, 首先要知道无效值从哪来, 从而分析可能产生的无效值有哪些类型, 在分别去看如何处理无效值

什么是缺失值

一个值本身的含义是这个值不存在则称之为缺失值, 也就是说这个值本身代表着缺失, 或者这个值本身无意义, 比如说 null, 比如说空字符串

20190527220736

关于数据的分析其实就是统计分析的概念, 如果这样的话, 当数据集中存在缺失值, 则无法进行统计和分析, 对很多操作都有影响

缺失值如何产生的

20190527215718

Spark 大多时候处理的数据来自于业务系统中, 业务系统中可能会因为各种原因, 产生一些异常的数据

例如说因为前后端的判断失误, 提交了一些非法参数. 再例如说因为业务系统修改 MySQL 表结构产生的一些空值数据等. 总之在业务系统中出现缺失值其实是非常常见的一件事, 所以大数据系统就一定要考虑这件事.

缺失值的类型

常见的缺失值有两种

  • nullNaN 等特殊类型的值, 某些语言中 null 可以理解是一个对象, 但是代表没有对象, NaN 是一个数字, 可以代表不是数字

    针对这一类的缺失值, Spark 提供了一个名为 DataFrameNaFunctions 特殊类型来操作和处理

  • "Null""NA"" " 等解析为字符串的类型, 但是其实并不是常规字符串数据

    针对这类字符串, 需要对数据集进行采样, 观察异常数据, 总结经验, 各个击破

    DataFrameNaFunctions

    当数据集中出现缺失值的时候, 大致有两种处理方式, 一个是丢弃, 一个是替换为某值, DataFrameNaFunctions 中包含一系列针对空值数据的方案

    • DataFrameNaFunctions.drop 可以在当某行中包含 null 或 NaN 的时候丢弃此行

    • DataFrameNaFunctions.fill 可以在将 null 和 NaN 充为其它值

    • DataFrameNaFunctions.replace 可以把 null 或 NaN 替换为其它值, 但是和 fill 略有一些不同, 这个方法针对值来进行替换

    •  // 1. 创建 SparkSession
        val spark = SparkSession.builder()
          .master("local[6]")
          .appName("null processor")
          .getOrCreate()
      
        @Test
        def nullAndNaN(): Unit = {
      
      
          // 2. 导入数据集
      
          // 3. 读取数据集
          //    1. 通过Saprk-csv自动的推断类型来读取, 推断数字的时候会将 NaN 推断为 字符串
      //    spark.read
      //      .option("header", true)
      //      .option("inferSchema", true)//这里是推断
      //      .csv(...)
          //    2. 直接读取字符串, 在后续的操作中使用 map 算子转类型
      //    spark.read.csv().map( row => row... )
          //    3. 指定 Schema, 不要自动推断
          val schema = StructType(
            List(
              StructField("id", LongType),
              StructField("year", IntegerType),
              StructField("month", IntegerType),
              StructField("day", IntegerType),
              StructField("hour", IntegerType),
              StructField("season", IntegerType),
              StructField("pm", DoubleType)
            )
          )
      
          val sourceDF = spark.read
            .option("header", value = true)
            .schema(schema)
            .csv("dataset/beijingpm_with_nan.csv")
      
          sourceDF.show()
      
          // 4. 丢弃
          // 2019, 12, 12, NaN
          // 规则:
          //      1. any, 只有有一个 NaN 就丢弃
          sourceDF.na.drop("any").show()
          sourceDF.na.drop().show()
          //      2. all, 所有数据都是 NaN 的行才丢弃
          sourceDF.na.drop("all").show()
          //      3. 某些列的规则
          sourceDF.na.drop("any", List("year", "month", "day", "hour")).show()
      
          // 5. 填充
          // 规则:
          //     1. 针对所有列数据进行默认值填充
          sourceDF.na.fill(0).show()
          //     2. 针对特定列填充
          sourceDF.na.fill(0, List("year", "month")).show()
        }

     

     

     

     

     

     遇到字符串的时候:

      @Test
      def strProcessor(): Unit = {
        // 读取数据集
        val sourceDF = spark.read
          .option("header", value = true)
          .option("inferSchema", value = true)
          .csv("dataset/BeijingPM20100101_20151231.csv")
    
    //    sourceDF.show()
    
        // 1. 丢弃
        import spark.implicits._
    //    sourceDF.where('PM_Dongsi =!= "NA").show()
    
        // 2. 替换
        import org.apache.spark.sql.functions._
        // select name, age, case
        // when ... then ...
        // when ... then ...
        // else
        sourceDF.select(
          'No as "id", 'year, 'month, 'day, 'hour, 'season,
          when('PM_Dongsi === "NA", Double.NaN)//当啥的时候进行替换
            .otherwise('PM_Dongsi cast DoubleType)//转换类型
            .as("pm")
        ).show()
    
        // 原类型和转换过后的类型, 必须一致
        sourceDF.na.replace("PM_Dongsi", Map("NA" -> "NaN", "NULL" -> "null")).show()
        //指定哪一行进行怎样的替换
      }

原文地址:https://www.cnblogs.com/dazhi151/p/14274848.html