Spark 学习(九) SparkSQL 函数自定义和数据源

一,简介

二,SparkSQL 的函数自定义

  2.1 函数定义

  2.2 函数注册

  2.3 示例

三,spark的数据源读取

  3.1 JSON

  3.2 JDBC

  3.3 ParQuet

  3.4 CSV

正文

一,简介

  很多时候sql中的内置函数无法满足我们的日常开发需求,这就需要我们进行函数的自定义。同时Spark的数据源来源广泛,如JSON,MYSQL等都可以作为我们的数据源。

二,SparkSQL 的函数自定义

  2.1 定义函数

val fun1 = (arg: String) => {
    arg + "aaa"
}

  2.2 注册函数

spark.udf.register("addString", fun1)

  2.3 示例

package cn.edu360.spark07

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.types._

object AutoFun {
    def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession.builder()
            .appName("sparkDateSet1")
            .master("local[2]")
            .getOrCreate()
        val lines: Dataset[String] = spark.read.textFile("hdfs://hd1:9000/wordcount/input/")
        import spark.implicits._
        val words: Dataset[String] = lines.flatMap(_.split(" "))
        // 注册视图操作SQL形式
        words.createTempView("v_wc")
        // 定义函数
        val fun1 = (arg: String) => {
            arg + "aaa"
        }
        // 对函数进行注册
        spark.udf.register("addString", fun1)
        val result: DataFrame = spark.sql("select addString(value), count(*) from v_wc group by value")
        result.show()
        spark.stop()
    }
}

三,spark的数据源读取

  3.1 JSON

object JsonDataSource {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().appName("JdbcDataSource")
      .master("local[*]")
      .getOrCreate()
    import spark.implicits._

    //指定以后读取json类型的数据(有表头)
    val jsons: DataFrame = spark.read.json("/Users/zx/Desktop/json")
    val filtered: DataFrame = jsons.where($"age" <=500)
    filtered.printSchema()
    filtered.show()
    spark.stop()
    
  }
}

  3.2 JDBC

package cn.edu360.spark07

import org.apache.spark.sql.{DataFrame, SparkSession}

object JDBCSource {
    def main(args: Array[String]): Unit = {

        val spark = SparkSession.builder().appName("JdbcDataSource")
            .master("local[*]")
            .getOrCreate()
        import spark.implicits._
        val log: DataFrame = spark.read.format("jdbc").options(
            Map("url" -> "jdbc:mysql://localhost:3306/test?useSSL=true",
                "driver" -> "com.mysql.jdbc.Driver",
                "dbtable" -> "log",
                "user" -> "root",
                "password" -> "qwe123"
            )
        ).load()
        val result: DataFrame = log.select($"id", $"name", $"age")
        result.show()
        spark.stop()
    }
}

  3.3  ParQuet

package cn.edu360.day7

import org.apache.spark.sql.{DataFrame, SparkSession}

/**
  * Created by zx on 2017/9/18.
  */
object ParquetDataSource {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().appName("ParquetDataSource")
      .master("local[*]")
      .getOrCreate()
    //指定以后读取json类型的数据
    val parquetLine: DataFrame = spark.read.parquet("/Users/zx/Desktop/parquet")
    //val parquetLine: DataFrame = spark.read.format("parquet").load("/Users/zx/Desktop/pq")
    parquetLine.printSchema()
    //show是Action
    parquetLine.show()
    spark.stop()
  }
}

  3.4 CSV

package cn.edu360.day7

import org.apache.spark.sql.{DataFrame, SparkSession}

/**
  * Created by zx on 2017/9/18.
  */
object CsvDataSource {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("CsvDataSource")
      .master("local[*]")
      .getOrCreate()
    //指定以后读取json类型的数据
    val csv: DataFrame = spark.read.csv("/Users/zx/Desktop/csv")
    csv.printSchema()
    val pdf: DataFrame = csv.toDF("id", "name", "age")
    pdf.show()
    spark.stop()
    
  }
}
原文地址:https://www.cnblogs.com/tashanzhishi/p/10999613.html