DataFram 与RDD转换

  • 文件内容:

Michael, 29
Andy, 30
Justin, 19
  • 方法一:反射,通过RDD[CaseClass]

  • 自定义 CaseClass
case class Person(name: String, age: Int)



val peopleDF = spark.sparkContext
.textFile("examples/src/main/resources/people.txt")
.map(_.split(","))
.map(attributes => Person(attributes(0), attributes(1).trim.toInt))
.toDF()
  • 转成DF以后,就相当于有了schema,就可以当成table啦
scala> peopleDF.select("name", "age").show
+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+
  • 方法二: 通过RDD[Row]+Schema

//step1: 从原来的 RDD 创建一个行的 RDD
    val peopleRow = sc.textFile("file:///home/hadoop/app/spark-2.4.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.txt").map(_.split(",")).map(p => Row(p(0), p(1)))
//step2: 创建由一个 StructType 表示的模式, 并且与第一步创建的 RDD 的行结构相匹配
 //构造schema用到了两个类StructType和StructFile,其中StructFile类的三个参数分别是(字段名称,类型,数据是否可以用null填充)
    val schema = StructType(Array(StructField("name", StringType, true), StructField("age", IntegerType, true)))
 //step3.在行 RDD 上通过 createDataFrame 方法应用模式
    val people = spark.createDataFrame(peopleRow, schema)
    people.registerTempTable("peopleTable")

  

原文地址:https://www.cnblogs.com/xuziyu/p/11133436.html