Spark SQL之Catalog API介绍和使用

Catalog API                                                                                                     

  1. Spark中的DataSet和Dataframe API支持结构化分析。结构化分析的一个重要的方面就是管理元数据。这些元数据可能是一些临时元数据(比如临时表)、SQLContext上注册的UDF以及持久化的元数据(比如Hivemeta store或者HCatalog)。
  2. Spark的早期版本是没有标准的API来访问这些元数据的用户通常使用查询语句(比如show tables)来查询这些元数据。这些查询通常需要操作原始的字符串,而且不同的元数据类型的操作也是不一样的。
  3. 这种情况在Spark2.0中得到改变。Spark2.0中添加了标准的API(称为catalog)来访问Spark SQL中的元数据。这个API既可以操作Spark SQL,也可以操作Hive元数据
  4. 接下来将介绍如何使用catalog API。

访问Catalog                                                                                                     

  • Catalog可以通过SparkSession获取,下面代码展示如何获取Catalog:
/**
 * User: 过往记忆
 * Date: 2016年07月05日
 * Time: 下午23:16
 * bolg: https://www.iteblog.com
 * 本文地址:https://www.iteblog.com/archives/1701.html
 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 * 过往记忆博客微信公共帐号:iteblog_hadoop
 */
 
scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession
scala> val sparkSession = SparkSession.builder.appName("spark session example").enableHiveSupport().getOrCreate()
sparkSession: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@5d50ea49
 
scala> val catalog = sparkSession.catalog
catalog: org.apache.spark.sql.catalog.Catalog = org.apache.spark.sql.internal.CatalogImpl@17308af1

Querying the databases                                                                                 

我们一旦创建好catalog对象之后,我们可以使用它来查询元数据中的数据库,catalog上的API返回的结果全部都是dataset

scala> catalog.listDatabases().select("name").show(false)
19/07/17 14:21:54 ERROR metastore.ObjectStore: Version information found in metastore differs 1.1.0 from expected schema version 1.2.0. Schema verififcation is disabled hive.metastore.schema.verification so setting version.
+---------+
|name     |
+---------+
|default  |
|hadoop_g6|
|ruoze_d6 |
+---------+

  • listDatabases返回元数据中所有的数据库。默认情况下,元数据仅仅只有名为default的数据库。如果是Hive元数据,那么他会从Hive元数据中获取所有的数据库。listDatabases返回的类型是dataset,所以我们可以使用Dataset上的所有操作来查询元数据

使用createTempView注册Dataframe                                                              

  • 在Spark的早期版本,我们使用registerTemTable来注册Dataframe。然而在Spark2.0中,这个API已经被遗弃了。registerTempTable名字很让人误解,因为用户会认为这个函数会将Dataframe持久化并且保证这个临时表,但实际上并不是这样的,所以社区才有意将它替换成CreateTempView。createTempView的使用方法如下:

scala> val df = spark.format("json").read("/home/hadoop/data/people.json")

scala> df.createTempView("iteblog")

  • 我们注册完一个view之后,然后就可以使用listTables函数来查询它

查询表                                                                                                               

  • 正如我们可以展示出元数据中的所有数据库一样,我们也可以展示出元数据中某个数据库中的表。它会展示出SparkSQL中所有注册的临时表。同时可以展示出Hive中默认数据库(也就是default)中的表。如下:

scala> catalog.listTables().select("name").show(false)
+-------------------+
|name |
+-------------------+
|customer |
|dual |
|g6_access |
|g6_access_lzo |
|g6_access_lzo_split|
|g6_access_orc |
|g6_access_orc_none |
|g6_access_par |
|g6_access_par_zip |
|g6_access_rc |
|g6_access_seq |
|makedata_job |
|order |
|traffic_info |
|tv_info |
|iteblog |
+-------------------+

show源码

// scalastyle:off println
def show(numRows: Int, truncate: Boolean): Unit = if (truncate) {
println(showString(numRows, truncate = 20))
} else {
println(showString(numRows, truncate = 0))
}

* @param numRows Number of rows to show
* @param truncate If set to more than 0, truncates strings to `truncate` characters and
* all cells will be aligned right.

show(numRows,truncate ),参数numRows代表展示的条数,参数truncate=true代表展示字段中的数据只能展示20 的长度,大于20的位置会被截掉,相当于隐藏,默认是true

scala> catalog.listTables().select($"name").show(2,false)
+--------+
|name |
+--------+
|customer|
|dual |
+--------+
only showing top 2 rows


scala> catalog.listTables().select($"name").show(2,true)
+--------+
| name|
+--------+
|customer|
| dual|
+--------+
only showing top 2 rows


scala> catalog.listTables().select($"name").show(2)
+--------+
| name|
+--------+
|customer|
| dual|
+--------+
only showing top 2 rows

判断某个表是否缓存                                                                                          

  • 我们可以使用Catalog提供API来检查某个表是否缓存。如下:
scala> println(catalog.isCached("iteblog"))
false

上面判断iteblog表是否缓存,结果输出false。默认情况下表是不会被缓存的,我们可以手动缓存某个表,如下:

scala> df.cache()
res12: df.type = [_corrupt_record: string]

scala> println(catalog.isCached("iteblog"))
true 
  •  现在iteblog表已经被缓存了,所有现在的输出结构是true。

删除view                                                                                                           

  • 我们可以使用catalog提供的API来删除view。如果是Spark SQL情况,那么它会删除事先注册好的view;如果是hive情况,那么他会从元数据中删除表。
scala>  catalog.dropTempView("iteblog")
res16: Boolean = true

查询已经注册的函                                                                                              

+------------+----------------------------------------+------------------+
|name        |className                               |  isTemporary|
+-------------+----------------------------------------+-----------------+
|sayhello    |com.ruozedata.UDF.HelloUDF |   false           |
|    %          | org.apache.spark.sql.catalyst.expressions.Remainder |   true|
+-------------+------------------------------------------+----------------+

 

 上面展示了10个函数及其实现类。

参考原文链接:https://www.iteblog.com/archives/1701.html#Catalog_API

原文地址:https://www.cnblogs.com/xuziyu/p/11200080.html