spark 数据读取与保存

spark支持的常见文件格式如下:

文本,json,CSV,SequenceFiles,Protocol buffers,对象文件

1.文本

只需要使用文件路径作为参数调用SparkContext 中的textFile() 函数,就可以读取一个文
本文件;

scala> val lines=sc.textFile("/tmp/20171024/20171024.txt")

lines: org.apache.spark.rdd.RDD[String] = /tmp/20171024/20171024.txt MapPartitionsRDD[1] at textFile at <console>:24

scala> lines.collect

res0: Array[String] = Array(10 20 30 50 80 100 60 90 60 60 31 80 70 51 50)

求每个文件的平均值:

scala> val linesall=sc.wholeTextFiles("/tmp/20171024/2017*.txt")

linesall: org.apache.spark.rdd.RDD[(String, String)] = /tmp/20171024/2017*.txt MapPartitionsRDD[7] at wholeTextFiles at <console>:24

scala> linesall.collect

res4: Array[(String, String)] = Array((hdfs://localhost:9000/tmp/20171024/20171024.txt,"10 20 30 50 80 100 60 90 60 60 31 80 70 51 50 "), (hdfs://localhost:9000/tmp/20171024/20171026.txt,"100 500 600 800 10 30 66 96 89 80 100 "))

scala> val result=linesall.mapValues{y=>val nums=y.split(" ").map(x=>x.toDouble);nums.sum/nums.size}.collect
result: Array[(String, Double)] = Array((hdfs://localhost:9000/tmp/20171024/20171024.txt,56.13333333333333), (hdfs://localhost:9000/tmp/20171024/20171026.txt,224.63636363636363))

保存计算结果:

scala> linesall.mapValues{y=>val nums=y.split(" ").map(x=>x.toDouble);nums.sum/nums.size}.saveAsTextFile("/tmp/20171024/result0.txt")

查看保存的结果:

[root@host bin]# hdfs dfs -lsr /tmp/20171024/result0*
lsr: DEPRECATED: Please use 'ls -R' instead.
-rw-r--r--   1 root supergroup          0 2017-10-26 17:00 /tmp/20171024/result0.txt/_SUCCESS
-rw-r--r--   1 root supergroup         68 2017-10-26 17:00 /tmp/20171024/result0.txt/part-00000
-rw-r--r--   1 root supergroup         69 2017-10-26 17:00 /tmp/20171024/result0.txt/part-00001

 2.json

 读取json,将数据作为文本文件读取,然后对JSON 数据进行解析。

 scala> import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.hive.HiveContext

scala> val hiveCtx = new HiveContext(sc)

warning: there was one deprecation warning; re-run with -deprecation for details hiveCtx: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@1dbb9a4a

scala> val json=hiveCtx.jsonFile("/tmp/20171024/namejson.txt")

warning: there was one deprecation warning; re-run with -deprecation for details json: org.apache.spark.sql.DataFrame = [age: bigint, id: bigint ... 1 more field]

scala> json.printSchema()

root

 |-- age: long (nullable = true)

 |-- id: long (nullable = true)

 |-- name: string (nullable = true)

scala> json.registerTempTable("t_name")

warning: there was one deprecation warning; re-run with -deprecation for details

scala>

scala> val all=hiveCtx.sql("select * from t_name")

all: org.apache.spark.sql.DataFrame = [age: bigint, id: bigint ... 1 more field]

scala> all.show

+---+---+-----+

|age| id| name|

+---+---+-----+

| 18|  1|  leo|

| 19|  2| jack|

| 17|  3|marry|

+---+---+-----+

3.读取csv

scala> import java.io.StringReader import java.io.StringReader

scala> import au.com.bytecode.opencsv.CSVReader import au.com.bytecode.opencsv.CSVReader

scala> import scala.collection.JavaConversions._

scala> val input=sc.wholeTextFiles("/tmp/20171024/game*.csv") input: org.apache.spark.rdd.RDD[(String, String)] = /tmp/20171024/game*.csv MapPartitionsRDD[1] at wholeTextFiles at <console>:26

scala> input.collect

res0: Array[(String, String)] = Array((hdfs://localhost:9000/tmp/20171024/gamelist.csv,""9041","167","百人牛牛" "9041","174","将相和" "9041","152","百家乐" "9041","194","血战到底" "9041","4009","推到胡" "9041","4010","红中王" "9041","4098","二人麻将" "9041","4077","福建麻将" "9041","4039","血流成河" "9041","178","178" "))

scala> case class gamelist(id:String,subid:String,name:String)

defined class gamelist

scala> val result = input.flatMap { case (_, txt) => val reader = new CSVReader(new StringReader(txt));reader.readAll().map(x => gamelist(x(0), x(1), x(2))) }
result: org.apache.spark.rdd.RDD[gamelist] = MapPartitionsRDD[2] at flatMap at <console>:33

scala> result.collect
res1: Array[gamelist] = Array(gamelist(9041,167,百人牛牛), gamelist(9041,174,将相和), gamelist(9041,152,百家乐), gamelist(9041,194,血战到底), gamelist(9041,4009,推到胡), gamelist(9041,4010,红中王), gamelist(9041,4098,二人麻将), gamelist(9041,4077,福建麻将), gamelist(9041,4039,血流成河), gamelist(9041,178,178))

sparksql读取csv如下:

scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext

scala> val sqltext=new SQLContext(sc)
warning: there was one deprecation warning; re-run with -deprecation for details

  注释:.option("header","false") //如果在csv第一行没有列头就是"false"
  .option("inferSchema",true.toString)//这是自动推断属性列的数据类型。

scala> val data=sqltext.read.format("com.databricks.spark.csv").option("header","false").option("inferSchema",true.toString).load("/tmp/20171024/game*.csv")
data: org.apache.spark.sql.DataFrame = [_c0: int, _c1: int ... 1 more field]
scala> data.collect
res25: Array[org.apache.spark.sql.Row] = Array([9041,167,百人牛牛], [9041,174,将相和], [9041,152,百家乐], [9041,194,血战到底], [9041,4009,推到胡], [9041,4010,红中王], [9041,4098,二人麻将], [9041,4077,福建麻将], [9041,4039,血流成河], [9041,178,178])nt ... 1 more field]


scala> import org.apache.spark.sql.types.{StructType,StructField,StringType};
import org.apache.spark.sql.types.{StructType, StructField, StringType}

scala> val struct1=StructType(StructType(Array(StructField("id",StringType,true),StructField("subid",StringType,true),StructField("name",StringType,true))))
struct1: org.apache.spark.sql.types.StructType = StructType(StructField(id,StringType,true), StructField(subid,StringType,true), StructField(name,StringType,true))

scala> val data=sqltext.read.schema(struct1).format("com.databricks.spark.csv").option("header","false").option("inferSchema",true.toString).load("/tmp/20171024/game*.csv")

data: org.apache.spark.sql.DataFrame = [id: string, subid: string ... 1 more field]

scala> data.collect

res27: Array[org.apache.spark.sql.Row] = Array([9041,167,百人牛牛], [9041,174,将相和], [9041,152,百家乐], [9041,194,血战到底], [9041,4009,推到胡], [9041,4010,红中王], [9041,4098,二人麻将], [9041,4077,福建麻将], [9041,4039,血流成河], [9041,178,178])

scala> data.select("id","name").collect
res33: Array[org.apache.spark.sql.Row] = Array([9041,百人牛牛], [9041,将相和], [9041,百家乐], [9041,血战到底], [9041,推到胡], [9041,红中王], [9041,二人麻将], [9041,福建麻将], [9041,血流成河], [9041,178])


scala> data.registerTempTable("game")
warning: there was one deprecation warning; re-run with -deprecation for details

scala> sqltext.sql("select * from game").show
+----+-----+----+
|  id|subid|name|
+----+-----+----+
|9041|  167|百人牛牛|
|9041|  174| 将相和|
|9041|  152| 百家乐|
|9041|  194|血战到底|
|9041| 4009| 推到胡|
|9041| 4010| 红中王|
|9041| 4098|二人麻将|
|9041| 4077|福建麻将|
|9041| 4039|血流成河|
|9041|  178| 178|
+----+-----+----+scala> sqltext.sql("select * from game where subid>200").show
+----+-----+----+
|  id|subid|name|
+----+-----+----+
|9041| 4009| 推到胡|
|9041| 4010| 红中王|
|9041| 4098|二人麻将|
|9041| 4077|福建麻将|
|9041| 4039|血流成河|
+----+-----+----+

4.读取SequenceFile

SequenceFile 是由没有相对关系结构的键值对文件组成的常用Hadoop 格式。SequenceFile
文件有同步标记,Spark 可以用它来定位到文件中的某个点,然后再与记录的边界对
齐。这可以让Spark 使用多个节点高效地并行读取SequenceFile 文件。SequenceFile 也是
Hadoop MapReduce 作业中常用的输入输出格式,所以如果你在使用一个已有的Hadoop 系
统,数据很有可能是以SequenceFile 的格式供你使用的。SequenceFile 是由实现Hadoop 的Writable
接口的元素组成。

scala>     val rdd = sc.parallelize(List(("wwww", 3), ("tttt", 6), ("gggg", 2))) rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[45] at parallelize at <console>:34

scala>     rdd.saveAsSequenceFile("/tmp/20171024/sqf.txt")

[root@host bin]# hdfs dfs -lsr /tmp/20171024/sqf*
lsr: DEPRECATED: Please use 'ls -R' instead.
-rw-r--r--   1 root supergroup          0 2017-10-30 15:37 /tmp/20171024/sqf.txt/_SUCCESS
-rw-r--r--   1 root supergroup         85 2017-10-30 15:37 /tmp/20171024/sqf.txt/part-00000
-rw-r--r--   1 root supergroup        103 2017-10-30 15:37 /tmp/20171024/sqf.txt/part-00001
-rw-r--r--   1 root supergroup        101 2017-10-30 15:37 /tmp/20171024/sqf.txt/part-00002
-rw-r--r--   1 root supergroup        103 2017-10-30 15:37 /tmp/20171024/sqf.txt/part-00003


scala> import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.io.{IntWritable, Text}

scala> val output = sc.sequenceFile("/tmp/20171024/sqf.txt", classOf[Text], classOf[IntWritable]).map{case (x, y) => (x.toString, y.toString)} output: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[1] at map at <console>:25

scala> output.collect res0: Array[(String, String)] = Array((wwww,3), (tttt,6), (gggg,2))   

读取SequenceFile  

scala> import org.apache.hadoop.io.{IntWritable, Text}

import org.apache.hadoop.io.{IntWritable, Text}

scala> val data = sc.sequenceFile("/spark/parinum/p*", classOf[Text], classOf[IntWritable]).map{case (x,y)=>(x.toString,y.get)}

data: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[216] at map at <console>:37

scala> data.collect

res229: Array[(String, Int)] = Array((zhangsan,100), (wangwu,250), (xiaoma,120), (laozhan,300), (tiandi,60))

保存SequenceFile

scala> numpairdd.collect

res224: Array[(String, Int)] = Array((zhangsan,100), (wangwu,250), (xiaoma,120), (laozhan,300), (tiandi,60))

scala> numpairdd.saveAsSequenceFile("/spark/parinum")

查看hdfs

[root@host conf]# hdfs dfs -ls -R /spark

SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/root/hadoop/hadoop-2.7.4/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/root/hive/apache-hive-2.1.1/lib/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

drwxr-xr-x   - root supergroup          0 2018-06-25 16:08 /spark/parinum

-rw-r--r--   1 root supergroup          0 2018-06-25 16:08 /spark/parinum/_SUCCESS

-rw-r--r--   1 root supergroup        125 2018-06-25 16:08 /spark/parinum/part-00000

-rw-r--r--   1 root supergroup        143 2018-06-25 16:08 /spark/parinum/part-00001

----------------------------

读取人员信息,并按得分逆序排序,如果得分相同则按年龄顺序排列

scala> val lines=sc.textFile("/tmp/person.txt")
lines: org.apache.spark.rdd.RDD[String] = /tmp/person.txt MapPartitionsRDD[5] at textFile at <console>:24

scala> lines.collect
res10: Array[String] = Array(2,zhangsan,50,866, 4,laoliu,522,30, 5,zhangsan,20,565, 6,limi,522,65, 1,xiliu,50,6998, 7,llihmj,23,565)

scala> lines.map(_.split(",")).map(arr=>person(arr(0).toLong,arr(1),arr(2).toInt,arr(3).toInt))
res11: org.apache.spark.rdd.RDD[person] = MapPartitionsRDD[7] at map at <console>:29

scala> case class person(id:Long,name:String,age:Int,fv:Int)
defined class person

scala> val userRdd=lines.map(_.split(",")).map(arr=>person(arr(0).toLong,arr(1),arr(2).toInt,arr(3).toInt))
userRdd: org.apache.spark.rdd.RDD[person] = MapPartitionsRDD[9] at map at <console>:28

scala> val pdf=userRdd.toDF()
pdf: org.apache.spark.sql.DataFrame = [id: bigint, name: string ... 2 more fields]

scala> pdf.registerTempTable("person")
warning: there was one deprecation warning; re-run with -deprecation for details

scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext

scala> val sqlcon=new SQLContext(sc)
warning: there was one deprecation warning; re-run with -deprecation for details
sqlcon: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@65502709

scala> sqlcon.sql("select * from person")
res14: org.apache.spark.sql.DataFrame = [id: bigint, name: string ... 2 more fields]

scala> sqlcon.sql("select * from person").show
+---+--------+---+----+
| id| name|age| fv|
+---+--------+---+----+
| 2|zhangsan| 50| 866|
| 4| laoliu|522| 30|
| 5|zhangsan| 20| 565|
| 6| limi|522| 65|
| 1| xiliu| 50|6998|
| 7| llihmj| 23| 565|
+---+--------+---+----+

scala> sqlcon.sql("select * from person order by fv desc,age ").show
+---+--------+---+----+
| id| name|age| fv|
+---+--------+---+----+
| 1| xiliu| 50|6998|
| 2|zhangsan| 50| 866|
| 5|zhangsan| 20| 565|
| 7| llihmj| 23| 565|
| 6| limi|522| 65|
| 4| laoliu|522| 30|
+---+--------+---+----+

scala> sqlcon.sql("select * from person order by fv desc,age limit 4 ").show
+---+--------+---+----+
| id| name|age| fv|
+---+--------+---+----+
| 1| xiliu| 50|6998|
| 2|zhangsan| 50| 866|
| 5|zhangsan| 20| 565|
| 7| llihmj| 23| 565|
+---+--------+---+----+

scala> val pdf1=sqlcon.sql("select name,age,fv from person order by fv desc,age limit 4 ")
pdf1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]

scala> pdf1.show
+--------+---+----+
|    name|age|  fv|
+--------+---+----+
|   xiliu| 50|6998|
|zhangsan| 50| 866|
|zhangsan| 20| 565|
|  llihmj| 23| 565|
+--------+---+----+

scala> pdf1.groupBy("age").count.show
+---+-----+
|age|count|
+---+-----+
| 50|    2|
| 20|    1|
| 23|    1|
+---+-----+

scala> pdf1.write.csv("/tmp/pdf2")

查看hdfs:

[root@host tmpdata]# hdfs dfs -ls /tmp/pdf2
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/root/hadoop/hadoop-2.7.4/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/root/hive/apache-hive-2.1.1/lib/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Found 2 items
-rw-r--r--   1 root supergroup          0 2018-08-07 10:51 /tmp/pdf2/_SUCCESS
-rw-r--r--   1 root supergroup         60 2018-08-07 10:51 /tmp/pdf2/part-00000-f4a16495-ea53-4332-b9a7-06e38036a7ac-c000.csv
[root@host tmpdata]# hdfs dfs -cat /tmp/pdf2/par*
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/root/hadoop/hadoop-2.7.4/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/root/hive/apache-hive-2.1.1/lib/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
xiliu,50,6998
zhangsan,50,866
zhangsan,20,565
llihmj,23,565

scala> pdf.printSchema
root
 |-- id: long (nullable = false)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = false)
 |-- fv: integer (nullable = false)

scala> pdf1.write.json("/tmp/pdf1json")

查看HDFS:

[root@host tmpdata]# hdfs dfs -ls /tmp/pdf1json
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/root/hadoop/hadoop-2.7.4/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/root/hive/apache-hive-2.1.1/lib/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Found 2 items
-rw-r--r--   1 root supergroup          0 2018-08-07 10:57 /tmp/pdf1json/_SUCCESS
-rw-r--r--   1 root supergroup        148 2018-08-07 10:57 /tmp/pdf1json/part-00000-f7246fb2-ad7b-49a6-bcb0-919c9102d9f2-c000.json
[root@host tmpdata]# hdfs dfs -cat /tmp/pdf1json/par*
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/root/hadoop/hadoop-2.7.4/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/root/hive/apache-hive-2.1.1/lib/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
{"name":"xiliu","age":50,"fv":6998}
{"name":"zhangsan","age":50,"fv":866}
{"name":"zhangsan","age":20,"fv":565}
{"name":"llihmj","age":23,"fv":565}

--------------------------

scala> pdf1.write.text("/tmp/pdf1text")
org.apache.spark.sql.AnalysisException: Text data source supports only a single column, and you have 3 columns.;
  at org.apache.spark.sql.execution.datasources.text.TextFileFormat.verifySchema(TextFileFormat.scala:46)
  at org.apache.spark.sql.execution.datasources.text.TextFileFormat.prepareWrite(TextFileFormat.scala:66)

scala> pdf1.registerTempTable("tb_pdf")

warning: there was one deprecation warning; re-run with -deprecation for details

scala> val sqlcon=new SQLContext(sc)

warning: there was one deprecation warning; re-run with -deprecation for details sqlcon: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@66000786

scala> sqlcon.sql("select * from tb_pdf")

res31: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]


scala> sqlcon.sql("select * from tb_pdf").show
+--------+---+----+
|    name|age|  fv|
+--------+---+----+
|   xiliu| 50|6998|
|zhangsan| 50| 866|
|zhangsan| 20| 565|
|  llihmj| 23| 565|
+--------+---+----+

scala> sqlcon.sql("desc tb_pdf").show
+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|    name|   string|   null|
|     age|      int|   null|
|      fv|      int|   null|
+--------+---------+-------+ 

-------------------------------------------------------------------------------------------------------------------

scala> pdf.cube("age").mean("fv")

res43: org.apache.spark.sql.DataFrame = [age: int, avg(fv): double]

scala> pdf.cube("age").mean("fv").show
+----+------------------+
| age|           avg(fv)|
+----+------------------+
|  23|             565.0|
|null|1514.8333333333333|
|  50|            3932.0|
| 522|              47.5|
|  20|             565.0|
+----+------------------+

scala> pdf.describe("age").show
+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|                 6|
|   mean|197.83333333333334|
| stddev|251.42348073850752|
|    min|                20|
|    max|               522|
+-------+------------------+

scala> pdf.filter("age>50").show
+---+------+---+---+
| id|  name|age| fv|
+---+------+---+---+
|  4|laoliu|522| 30|
|  6|  limi|522| 65|
+---+------+---+---+

scala> pdf.select(col("age")+1,col("name")).show
+---------+--------+
|(age + 1)|    name|
+---------+--------+
|       51|zhangsan|
|      523|  laoliu|
|       21|zhangsan|
|      523|    limi|
|       51|   xiliu|
|       24|  llihmj|
+---------+--------+

scala> pdf.select("age"+1,"name").show
org.apache.spark.sql.AnalysisException: cannot resolve '`age1`' given input columns: [id, name, age, fv];;
'Project ['age1, name#6]

...........................

scala> pdf.select(col("age")+1,"name").show

<console>:37: error: overloaded method value select with alternatives:
  [U1, U2](c1: org.apache.spark.sql.TypedColumn[org.apache.spark.sql.Row,U1], c2: org.apache.spark.sql.TypedColumn[org.apache.spark.sql.Row,U2])org.apache.spark.sql.Dataset[(U1, U2)] <and>
  (col: String,cols: String*)org.apache.spark.sql.DataFrame <and>
  (cols: org.apache.spark.sql.Column*)org.apache.spark.sql.DataFrame
 cannot be applied to (org.apache.spark.sql.Column, String)
       pdf.select(col("age")+1,"name").show
           ^

scala> pdf.select("name","age").where("fv>500").show
+--------+---+
| name|age|
+--------+---+
|zhangsan| 50|
|zhangsan| 20|
| xiliu| 50|
| llihmj| 23|
+--------+---+

 DataFrame可以通过多种来源创建:结构化数据文件,hive的表,外部数据库,或者RDDs

Spark SQL如何使用
首先,利用sqlContext从外部数据源加载数据为DataFrame
然后,利用DataFrame上丰富的api进行查询、转换
最后,将结果进行展现或存储为各种外部数据形式

Spark on Hive和Hive on Spark

Spark on Hive: Hive只作为储存角色,Spark负责sql解析优化,执行。

Hive on Spark:Hive即作为存储又负责sql的解析优化,Spark负责执行。

DataFrame也是一个分布式数据容器。与RDD类似,然而DataFrame更像传统数据库的二维表格,除了数据以外,还掌握数据的结构信息,即schema同时,与Hive类似,DataFrame也支持嵌套数据类型(struct、array和map)。从API易用性的角度上 看, DataFrame API提供的是一套高层的关系操作,比函数式的RDD API要更加友好,门槛更低。

 

DataFrame的底层封装的是RDD,只不过RDD的泛型是Row类型。

通过反射的方式将非json格式的RDD转换成DataFrame(不建议使用)

1.创建RDD

scala> val lines=sc.textFile("/tmp/person.txt") lines: org.apache.spark.rdd.RDD[String] = /tmp/person.txt MapPartitionsRDD[5] at textFile at <console>:24

scala> lines.collect res10: Array[String] = Array(2,zhangsan,50,866, 4,laoliu,522,30, 5,zhangsan,20,565, 6,limi,522,65, 1,xiliu,50,6998, 7,llihmj,23,565)

2.对每行使用列分割符进行分割

3.定义case class (相当于表的schema)

4.将rdd与case class关联

scala> case class person(id:Long,name:String,age:Int,fv:Int) defined class person

scala> val userRdd=lines.map(_.split(",")).map(arr=>person(arr(0).toLong,arr(1),arr(2).toInt,arr(3).toInt))
userRdd: org.apache.spark.rdd.RDD[person] = MapPartitionsRDD[9] at map at <console>:28

5.rdd转化为dataframe

scala> val pdf=userRdd.toDF()

pdf: org.apache.spark.sql.DataFrame = [id: bigint, name: string ... 2 more fields]

6.对dataframe进行处理

 动态创建Schema将非json格式的RDD转换成DataFrame(建议使用)

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> import org.apache.spark.sql.{SQLContext,Row}
import org.apache.spark.sql.{SQLContext, Row}

scala> val lines=sc.textFile("/tmp/person.txt")
lines: org.apache.spark.rdd.RDD[String] = /tmp/person.txt MapPartitionsRDD[7] at textFile at <console>:31

scala> val rowrdd=lines.map(_.split("[,]"))
rowrdd: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[8] at map at <console>:33

scala> val schema=StructType(List(StructField("id",LongType,true),StructField("name",StringType,true),StructField("age",IntegerType,true),StructField("fv",IntegerType,true)))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,true), StructField(name,StringType,true), StructField(age,IntegerType,true), StructField(fv,IntegerType,true))

scala> val rows=rowrdd.map(x=>Row(x(0).toLong,x(1).toString,x(2).toInt,x(3).toInt))
rows: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[30] at map at <console>:35

scala> val sqlcon=new SQLContext(sc)
warning: there was one deprecation warning; re-run with -deprecation for details
sqlcon: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@2bd13c

scala> val personDF=sqlcon.createDataFrame(rows,schema)
personDF: org.apache.spark.sql.DataFrame = [id: bigint, name: string ... 2 more fields]

scala> personDF.show
+---+--------+---+----+
| id| name|age| fv|
+---+--------+---+----+
| 2|zhangsan| 50| 866|
| 4| laoliu|522| 30|
| 5|zhangsan| 20| 565|
| 6| limi|522| 65|
| 1| xiliu| 50|6998|
| 7| llihmj| 23| 565|
+---+--------+---+----+

scala> personDF.registerTempTable("tb_person")
warning: there was one deprecation warning; re-run with -deprecation for details

scala> sqlcon.sql("select * from tb_person where age>500").show
+---+------+---+---+
| id| name|age| fv|
+---+------+---+---+
| 4|laoliu|522| 30|
| 6| limi|522| 65|
+---+------+---+---+

scala> df.write.parquet("/tmp/parquet")

[root@host tmpdata]# hdfs dfs -cat /tmp/parquet/par*
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/root/hadoop/hadoop-2.7.4/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/root/hive/apache-hive-2.1.1/lib/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

.........................................
 org.apache.spark.sql.parquet.row.metadata?{"type":"struct","fields":[{"name":"id","type":"string","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"age","type":"string","nullable":true,"metadata":{}},{"name":"fv","type":"string","nullable":true,"metadata":{}}]}Iparquet-mr version 1.8.2 (build c6522788629e590a53eb79874b95f6c3ff11f16c)?PAR1


scala> val lines=spsession.read.parquet("/tmp/parquet/part*")
lines: org.apache.spark.sql.DataFrame = [id: string, name: string ... 2 more fields]

scala> lines.show
+---+--------+---+----+
| id|    name|age|  fv|
+---+--------+---+----+
|  2|zhangsan| 50| 866|
|  4|  laoliu|522|  30|
|  5|zhangsan| 20| 565|
|  6|    limi|522|  65|
|  1|   xiliu| 50|6998|
|  7|  llihmj| 23| 565|
+---+--------+---+----+

原文地址:https://www.cnblogs.com/playforever/p/7737675.html