Spark 学习笔记:(三)Spark SQL

参考:https://spark.apache.org/docs/latest/sql-programming-guide.html#overview

   http://www.csdn.net/article/2015-04-03/2824407

Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as distributed SQL query engine.

1)在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。

2)A DataFrame can be operated on as normal RDDs and can also be registered as a temporary table. Registering a DataFrame as a table allows you to run SQL queries over its data.

3)The sql function on a SQLContext enables applications to run SQL queries programmatically and returns the result as a DataFrame.

val df = sqlContext.sql("SELECT * FROM table")   //sql接口

创建DataFrames:

With a SQLContext, applications can create DataFrames from an existing RDD, from a Hive table, or from data sources. 

val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
  • Two different methods for converting existing RDDs into DataFrames.

    • The first method uses reflection to infer the schema of an RDD that contains specific types of objects. 

The case class defines the schema of the table=>The names of the arguments to the case class are read using reflection and become the names of the columns=>This RDD can be implicitly converted to a DataFrame and then be registered as a table=>Tables can be used in subsequent SQL statements.

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)

// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
    •  Aprogrammatic interface that allows you to construct a schema and then apply it to an existing RDD. While this method is more verbose, it allows you to construct DataFrames when the columns and their types are not known until runtime.
  • From data sources: 
val df = sqlContext.load("people.json", "json")
df.select("name", "age").save("namesAndAges.parquet", "parquet")

or   

val df = sqlContext.jsonFile("examples/src/main/resources/people.json")   
原文地址:https://www.cnblogs.com/aezero/p/spark.html