转】RDD与DataFrame的转换

  原博文出自于:  http://www.cnblogs.com/namhwik/p/5967910.html

RDD与DataFrame转换
1. 通过反射的方式来推断RDD元素中的元数据。因为RDD本身一条数据本身是没有元数据的,例如Person,而Person有name,id等,而record是不知道这些的,但是变成DataFrame背后一定知道,通过反射的方式就可以了解到背后这些元数据,进而转换成DataFrame。
如何反射?
Scala: 通过case class映射,在case class里面说我们这个RDD里面每个record的不同列的元数据是什么。(废弃)
当样本类不能提前确定时(例如,当记录的结构由字符串或文本数据集编码而成,它在解析时,字段将会对不同的用户有不同的投影结果),SchemaRDD 可以由以下三个步骤创建: 
当JavaBean不能被预先定义的时候,编程创建DataFrame分为三步:

复制代码

 //   从原来的RDD创建一个Row格式的RDD
 //    创建与RDD 中Rows结构匹配的StructType,通过该StructType创建表示RDD 的Schema
 //   通过SQLContext提供的createDataFrame方法创建DataFrame,方法参数为RDD 的Schema
val conf = new SparkConf().setMaster ("local").setAppName ("Test1") val sc = new SparkContext (conf) val sqlContext = new SQLContext(sc) // import sqlContext.implicits._ case class Person(name:String,age:Int) val people = sc.textFile ("d:/people.txt") val schemaString = "name age" val schema = StructType ( schemaString.split(" ").map(fieldName => StructField(fieldName,StringType,true)) ) val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim)) val peopleSchemaRDD = sqlContext.createDataFrame(rowRDD, schema) peopleSchemaRDD .registerTempTable("people" ) val results = sqlContext . sql ("SELECT name FROM people" ) results.printSchema() println(results.count()) results.map(t => "Name: " + t(0)).collect().foreach(println)
复制代码


//1.利用反射来推断包含特定类型对象的RDD的schema。这种方法会简化代码并且在你已经知道schema的时候非常适用。

复制代码
//2.   先创建一个bean类,然后将Rdd转换成DataFrame
 case class Person(name: String, age: Int)
  def main (args : Array[String]) : Unit =
  {
    val conf = new SparkConf().setMaster ("local").setAppName ("Test1")
    val sc = new SparkContext (conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    val people = sc.textFile("d:/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
    people.registerTempTable("people")
    val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
    teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
    teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)
    teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
复制代码
原文地址:https://www.cnblogs.com/zlslch/p/6040469.html