spark SQL随笔

sparkSQL

1、主要的数据结构

DataFreames

2、开始使用:SQLContext

 创建步骤:

 Val  sc:sparkContext

 Val  sqlContext=new org.apache.spark.sql.SQLContext(sc)

 Import sqlContext.implicits._ //隐形将RDD转化DF

3、构建DF及DF 操作

  Val sc:SparkContext

  Val Val  sqlContext=new org.apache.spark.sql.SQLContext(sc)

  Val df = sqlContext.jsonFile(“/people.json”)

  0) df.show

  1) df.printSchema()

  2) df.select(“name”).show

  3) df.select(df(“name”),df(“age”)).show

  4) df.filter(df(“age”)>21).show

  5)df.groupBy(“age”).count().show

4RDDs

Spark支持两种不同的方法将现有的RDDs转化为SchemaRDD

1) 使用反射(reflection)来推断包含类型对象的RDD的格式,这种基于反射方法使得代码更简洁且运行良好,因为当你写spark应用时,你早已经知道他的格式了

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

import sqlContext.implicits._

case class Person(name: String, age: Int)

val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()

people.registerTempTable("people")

val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

2)通过一个编程接口,允许你构建一种格式,然后将类型时其应用到现在的RDD,虽然这种方法比较繁琐,但可以让你不知道RDD的列和他们的类型时构建SchemaRDDs

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create an RDD

val people = sc.textFile("examples/src/main/resources/people.txt")

// The schema is encoded in a string

val schemaString = "name age" 

// Import Row.

import org.apache.spark.sql.Row; 

// Import Spark SQL data types

import org.apache.spark.sql.types.{StructType,StructField,StringType};

// Generate the schema based on the string of schema

val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

// Convert records of the RDD (people) to Rows.

val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))

// Apply the schema to the RDD.

val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema) 

// Register the DataFrames as a table.

peopleDataFrame.registerTempTable("people")

 // SQL statements can be run by using the sql methods provided by sqlContext.

val results = sqlContext.sql("SELECT name FROM people")

 // The results of SQL queries are DataFrames and support all the normal RDD operations.

// The columns of a row in the result can be accessed by ordinal.

results.map(t => "Name: " + t(0)).collect().foreach(println)

5. 数据源

1)、加载和保存(load和save)

Val df=sqlCotext.load(“people.parquet”)

df.select(“name”,”age”).save(“namesAndAges.parquet”)

  2) 格式选择

     1. 文件类型

    Val df=sqlCotext.load(“people.parquet”)

   df.select(“name”,”age”).save(“namesAndAges.parquet”,”parquet”)

        2. 保存方式

          SaveMode.ErrorIfExists (default)

          SaveMode.Append

          SaveMode.Overwrite

          SaveMode.Ignore

Val df=sqlCotext.load(“people.parquet”)

df.select(“name”,”age”).save(“namesAndAges.parquet”,”parquet”,SaveMode.append)

我不生产知识 我只是知识的搬运工
原文地址:https://www.cnblogs.com/yyy-blog/p/5656795.html