DataFram 的使用

DataFrame(重点)

无论是啥语言写的Spark SQL,还是用啥API,最终底层都是专成逻辑执行计划

  • SparkSession.sql()

 def sql(sqlText: String): DataFrame = {
    Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
  }
  • DataFrame.show(),默认显示20条,字段字符超过一定长度就会被截取

/**
   * Displays the top 20 rows of Dataset in a tabular form. Strings more than 20 characters
   * will be truncated, and all cells will be aligned right.
   *
   * @group action
   * @since 1.6.0
   */
  def show(): Unit = show(20)

DataFrame读取文件

Spark支持json,parquet,jdbc,orc,libsvm,csv,text类型文件直接读取和存储。读取返回的都是DF,但是注意text的Schema只有一个字符串字段。

scala> val usersDF = spark.read.json("/user/hadoop/examples/src/main/resources/employees.json")
usersDF: org.apache.spark.sql.DataFrame = [name: string, salary: bigint]        


DataFrame查询操作

传参可以是字符串*,可以是Column*,使用$"columname"需要导入隐士转换

scala> usersDF.select("name", "salary").show
+-------+------+
|   name|salary|
+-------+------+
|Michael|  3000|
|   Andy|  4500|
| Justin|  3500|
|  Berta|  4000|
+-------+------+

DataFrame过滤操作

scala> usersDF.filter("name='Justin'").show
+------+------+
| name|salary|
+------+------+
|Justin| 3500|
+------+------+

注意在某个字段不能为空要哦过滤掉的时候,要把以下这三种情况都考虑到才可以

xxDF.filter(" a!=''  and a!=NULL and a!=null ")

DataFrame排序操作

排序涉及到降序以及升序,故最好使用Column类型:$"xxx"或df(“xxx”)

scala> usersDF.select("name", "salary").filter("salary>='4000'").sort($"salary".desc).show
+-----+------+
| name|salary|
+-----+------+
| Andy|  4500|
|Berta|  4000|
+-----+------+


scala> usersDF.select("name", "salary").filter("salary>='4000'").sort($"salary".asc).show
+-----+------+
| name|salary|
+-----+------+
|Berta|  4000|
| Andy|  4500|
+-----+------+

DataFrame Join操作

默认是等值连接,连接条件是"===="

logDF1.join(logDF2,logDF1("id")===logDF2("id"))
原文地址:https://www.cnblogs.com/xuziyu/p/11137154.html