sparksql_dateframe操作

1.dataframe简介:

 sparksql中的dataframe等效于关系型数据表。对表的查询等操作,都可以使用dataframe的API接口实现

参考文档:http://spark.apache.org/docs/1.6.2/api/scala/index.html#org.apache.spark.sql.DataFrame

2.dataframe创建:

 2.1 parquet文件

val sparkConf = new SparkConf().setAppName("Test")
val session = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
session.read.parquet(filepath)

 2.2 json文件

  val sparkConf = new SparkConf().setAppName("Test")
  val session = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
  session.read.json(filepath)

 2.3 RDD

 2.3.1 反射方式  -- 样例类

package spark

import org.apache.spark.SparkConf
import org.apache.spark.sql._

object Execute extends App {

  case class Person(id: String, cust_num: String)

  val sparkConf = new SparkConf().setAppName("Read")
  val session = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
  val resultRDD = session.sql("select id,cust_num from sospdm.tmp_yinfei_test").rdd

  import session.implicits._
  
  val result: DataFrame = resultRDD.map(person => {
    Person(person.getAs[String]("id"), person.getAs[String]("cust_num"))
  }).toDF()
  result.show()
}

 2.3.2 编程接口方式  动态创建元数据

package spark


import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StringType, StructField, StructType}

object Execute extends App {

  case class Person(id: String, cust_num: String)

  val sparkConf = new SparkConf().setAppName("Read")
  val session = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
  val resultRDD = session.sql("select id,cust_num from sospdm.tmp_yinfei_test").rdd

 
  val result = resultRDD.map(row => {
    Row(row.getAs[String]("id"), row.getAs[String]("cust_num"))
  })

  //构建structType
  val structType = StructType(Array(StructField("id", StringType, true), StructField("cust_num", StringType, false)))

  //创建DF
  val resultDF = session.createDataFrame(result, structType)
  resultDF.show()
}

 2.4 其他:mysql等

3.dataframe操作:

package spark

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}


object Test extends App {

  val sparkConf = new SparkConf().setAppName("Test")
  val session = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()

  val resultDF1 = session.sql("select id1,cust_num1 from sospdm.tmp_yinfei_test_1")
  val resultDF2 = session.sql("select id2,cust_num2 from sospdm.tmp_yinfei_test_2")
  // inner, outer, left_outer, right_outer, leftsemi
  val result: DataFrame = resultDF1.join(resultDF2, resultDF1("id1") === resultDF2("id2"), "inner")

}
原文地址:https://www.cnblogs.com/yin-fei/p/10899777.html