Spark SQL

1 Overview
Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as distributed SQL query engine.
 
2 DataFrames
A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python
 
2.1 SQLContext
The entry point into all functionality in Spark SQL is the SQLContext class, or one of its descendants. To create a basic SQLContext, all you need is a SparkContext.
val sqlContext =new org.apache.spark.sql.SQLContext(sc)
 
2.2 Creating DataFrames
val df = sqlContext.read.json("/home/slh/data/people.json")
 
2.3 Operations
show()
printSchema()
select("name").show()
select(df("name"), df("age") + 1).show()
filter(df("age") > 13).show()
groupBy("age").count().show()
 
2.4 Running SQL Queries
The sql function on a SQLContext enables applications to run SQL queries programmatically and returns the result as a DataFrame.
val df =  sqlContext.sql("select * from table")
 
2.5 Interoperating with RDDs
Spark SQL supports two different methods for converting existing RDDs into DataFrames. The first method uses reflection to infer the schema of an RDD that contains specific types of objects. This reflection based approach leads to more concise code and works well when you already know the schema while writing your Spark application.
The second method for creating DataFrames is through a programmatic interface that allows you to construct a schema and then apply it to an existing RDD. While this method is more verbose, it allows you to construct DataFrames when the columns and their types are not known until runtime.
 
3 Data Sources
Spark SQL supports operating on a variety of data sources through the DataFrame interface. A DataFrame can be operated on as normal RDDs and can also be registered as a temporary table. Registering a DataFrame as a table allows you to run SQL queries over its data. This section describes the general methods for loading and saving data using the Spark Data Sources and then goes into specific options that are available for the built-in data sources.
 
3.1 Load/Save Functions
Generic:
val df = sqlContext.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
Specifying:
val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json")
df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
Save Modes:
SaveMode.ErrorIfExists(default)
SaveMode.Append
SaveMode.Overwrite
SaveMode.Ignore
 
3.2 Parquet Files
Parquet is a columnar format that is supported by many other data processing systems. Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema of the original data.
 
// The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet.
people.write.parquet("people.parquet")

// Read in the parquet file created above. Parquet files are self-describing so the schema is preserved.
// The result of loading a Parquet file is also a DataFrame.
val parquetFile = sqlContext.read.parquet("people.parquet")
 
3.3 JSON Datasets
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path)

val anotherPeopleRDD = sc.parallelize(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.read.json(anotherPeopleRDD)
 
3.4 Hive Tables
Spark SQL also supports reading and writing data stored in Apache Hive. However, since Hive has a large number of dependencies, it is not included in the default Spark assembly. Hive support is enabled by adding the -Phive and -Phive-thriftserver flags to Spark’s build. This command builds a new assembly jar that includes Hive. Note that this Hive assembly jar must also be present on all of the worker nodes, as they will need access to the Hive serialization and deserialization libraries (SerDes) in order to access data stored in Hive.
 
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
原文地址:https://www.cnblogs.com/sunflower627/p/4997655.html