spark-sql-02

  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf().setMaster("local").setAppName("test")
    val session: SparkSession = SparkSession
      .builder()
      .config(conf)
      .getOrCreate()
    val sc: SparkContext = session.sparkContext
    sc.setLogLevel("ERROR")

    // dataFrame = 数据 + 元数据
    //Spark 的Dataset  既可以按collection,类似于rdd的方法操作,也可以按SQL领域语言定义的方式操作数据
    val dataLists: RDD[String] = sc.textFile("data/person.txt")  //这里的RDD类型影响下面数据的类型,也可以返回DataSet
    val rddRow = dataLists
      .map(_.split(" ")).map(arr => Row.apply(arr(0), arr(1).toInt))
    
    val fields = Array(
      StructField.apply("name", DataTypes.StringType, true),
      StructField.apply("age", DataTypes.IntegerType, true)
    
    val schema = StructType.apply(fields)
    val dataFrame = session.createDataFrame(rddRow, schema)

    dataFrame.show()

+--------+---+
| name|age|
+--------+---+
|zhangsan| 18|
| lisi| 22|
| wangwu| 99|
| xiaoke| 22|
+--------+---+



    dataFrame.printSchema()

root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)



    dataFrame.createTempView("person") //设置表名

    session.sql("select * from person where name = 'xiaoke'").show()

+------+---+
| name|age|
+------+---+
|xiaoke| 22|
+------+---+

动态获取类型

   val userSchema = Array(
      "name string",
      "hobby string",
      "sex int"
    )

    // info = (zhangsan, 0)
    //  info = (18, 2)
    // 注意是Any类型
    def toDataType1(info: (String, Int)):Any ={
      userSchema(info._2).split(" ")(1)
      match{
        case "string" => info._1.toString
        case "int" => info._1.toInt
      }
    }

    // zhangsan lanqiu 18
    val rowRdd = dataLists.map(_.split(" "))
      .map(
      // [(zhangsan, 0), (lanqiu, 1), (18, 2)]
      x => x.zipWithIndex)
      .map(x => x.map(toDataType1(_)))
      .map(x => Row.fromSeq(x))

    // 1.RDD 2.structtype
    def getDataType2(v: String) ={
      v match {
        case "string" => DataTypes.StringType
        case "int" => DataTypes.IntegerType
      }
    }
    val fields = userSchema.map(_.split(" ")).map(x => StructField.apply(x(0), getDataType2(x(1))))
    val scheme = StructType.apply(fields)
    val dataFrame = session.createDataFrame(rowRdd, scheme)
    dataFrame.show()

+--------+-------+---+
| name| hobby|sex|
+--------+-------+---+
|zhangsan| PB| 18|
| lisi|xiangqi| 22|
| wangwu| lanqiu| 99|
| xiaoke| wan| 22|
+--------+-------+---+


    dataFrame.printSchema()

root
|-- name: string (nullable = true)
|-- hobby: string (nullable = true)
|-- sex: integer (nullable = true)

 

使用实体类进行映射类型

class Person  extends  Serializable {
  @BeanProperty
  var name :String = ""
  @BeanProperty
  var age:Int  =  0
}


    val rdd: RDD[String] = sc.textFile("data/person.txt")
    // 需要序列化
    val p = new Person
        val rddBean: RDD[Person] = rdd.map(_.split(" "))
          .map(arr => {
    //        val p = new Person
            p.setName(arr(0))
            p.setAge(arr(2).toInt)
            p
          })
    val df = session.createDataFrame(rddBean, classOf[Person])
    df.show()
    df.printSchema()
+---+--------+
|age|    name|
+---+--------+
| 18|zhangsan|
| 22|    lisi|
| 99|  wangwu|
| 22|  xiaoke|
+---+--------+

root
 |-- age: integer (nullable = false)
 |-- name: string (nullable = true)
    // dataFrame操作的是RDD, Dataset操作的是sql
        val ds01: Dataset[String] = session.read.textFile("data/person.txt")
        val person: Dataset[(String, Int)] = ds01.map(
          line => {
            val strs: Array[String] = line.split(" ")
            (strs(0), strs(2).toInt)
          }
// Encoders 相当于 import  session.implicits._ session隐式转换
        )(Encoders.tuple(Encoders.STRING, Encoders.scalaInt))

        val cperson: DataFrame = person.toDF("name","age")
        cperson.show()
        cperson.printSchema()

// dataFrame操作API   Dataset操作sql

    import  session.implicits._
    val dataDF: DataFrame = List(
      "hello world",
      "hello world",
      "hello msb",
      "hello world",
      "hello world",
      "hello spark",
      "hello world",
      "hello spark"
    ).toDF("line")

    //设置表名
    dataDF.createTempView("ooxx")
    val df: DataFrame = session.sql("select * from ooxx")
    df.show()

+-----------+
| line|
+-----------+
|hello world|
|hello world|
| hello msb|
|hello world|
|hello world|
|hello spark|
|hello world|
|hello spark|
+-----------+


    df.printSchema()

root
|-- line: string (nullable = true)

// 面向dataset操作
    session.sql(" select word, count(*) from   (select explode(split(line,' ')) as word from ooxx) as tt   group by tt.word  ").show()

+-----+--------+
| word|count(1)|
+-----+--------+
|hello| 8|
|spark| 2|
|world| 5|
| msb| 1|
+-----+--------+

// 面向api的时候 df相当于from tab
    val res = dataDF.selectExpr("explode(split(line, ' ')) as word").groupBy("word").count()

    res.write.mode(SaveMode.Append).parquet("data/out/ooxx")



    val frame: DataFrame = session.read.parquet("data/out/ooxx")
    frame.show()

+-----+-----+
| word|count|
+-----+-----+
|hello| 8|
|spark| 2|
|world| 5|
| msb| 1|
+-----+-----+


    frame.printSchema()

root
|-- word: string (nullable = true)
|-- count: long (nullable = true)



   /*
    基于文件的行式:
    session.read.parquet()
    session.read.textFile()
    session.read.json()
    session.read.csv()
    读取任何格式的数据源都要转换成DF
    res.write.parquet()
    res.write.orc()
    res.write.text()
    */
 
    基于文件的行式:
    session.read.parquet()
    session.read.textFile()
    session.read.json()
    session.read.csv()


    读取任何格式的数据源都要转换成DF
    res.write.parquet()
    res.write.orc()
    res.write.text()
 
原文地址:https://www.cnblogs.com/bigdata-familyMeals/p/14490322.html