sparkSQL-2

SparkSQL-2

1.前言

sparksql-1

2、JDBC数据源

sparksql可以从mysql表中加载大量的数据,然后进行相应的统计分析查询,也可以把最后得到的结果数据写回到mysql表
2.1 通过sparksql加载mysql表中的数据
  • 代码开发
package cn.doit.sparksql

import java.util.Properties

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

//todo:利用sparksql加载mysql表中的数据
object DataFromMysql {

  def main(args: Array[String]): Unit = {
      //1、创建SparkConf对象
        val sparkConf: SparkConf = new SparkConf().setAppName("DataFromMysql").setMaster("local[2]")

     //2、创建SparkSession对象
      val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()


    //3、读取mysql表的数据
          //3.1 指定mysql连接地址
            val url="jdbc:mysql://node1:3306/spark?useSSL=false&characterEncoding=utf-8&serverTimezone=GMT%2B8"
          //3.2 指定要加载的表名
            val tableName="iplocation"
         // 3.3 配置连接数据库的相关属性
             val properties = new Properties()
             //用户名
              properties.setProperty("user","root")
             //密码
              properties.setProperty("password","123456")

      val mysqlDF: DataFrame = spark.read.jdbc(url,tableName,properties)
    
      //打印schema信息
      mysqlDF.printSchema()

       //展示数据
      mysqlDF.show()
    
       //把dataFrame注册成表
    mysqlDF.createTempView("iplocation")

    spark.sql("select * from iplocation where total_count >1500").show()

    spark.stop()
  }
}

2.2 通过sparksql把分析到的结果数据保存到mysql表中
  • 代码开发
package cn.doit.sparksql

import java.util.Properties

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

//todo:利用sparksql进行分析统计之后的结果数据保存到mysql表中
object DataSaveMysql {

  def main(args: Array[String]): Unit = {
      //1、创建SparkConf对象
        val sparkConf: SparkConf = new SparkConf().setAppName("DataSaveMysql").setMaster("local[2]")

     //2、创建SparkSession对象
      val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()


    //3、读取mysql表的数据
          //3.1 指定mysql连接地址
            val url="jdbc:mysql://node1:3306/spark"
          //3.2 指定要加载的表名
            val tableName="iplocation"
         // 3.3 配置连接数据库的相关属性
             val properties = new Properties()
             //用户名
              properties.setProperty("user","root")
             //密码
              properties.setProperty("password","123456")

      val mysqlDF: DataFrame = spark.read.jdbc(url,tableName,properties)

      //打印schema信息
      //mysqlDF.printSchema()

       //展示数据
      //mysqlDF.show()

       //把dataFrame注册成表
    mysqlDF.createTempView("iplocation")

    //把数据接受到之后,进行进行对应的分析
    val result: DataFrame = spark.sql("select * from iplocation where total_count >1500")

    //保存result结果数据到mysql表中
     val destTable="result2"
     //数据写入到mysql表中可以调用mode方法,这里可以指定数据写入的模式
      //overwrite: 表示覆盖,如果表事先不存在,它先帮忙我们创建
      //append: 表示追加,如果表事先不存在,它先帮忙我们创建
      //ignore: 表示忽略 如果表事先存在,就不进行任何操作
      //error :表示报错, 如果表事先存在就报错(默认选项)

    result.write.mode("ignore").jdbc(url,destTable,properties)

    spark.stop()


  }
}

  • 后期可以把程序改造一下,打成jar包提交到集群中运行
  package cn.doit.sparksql
  

import java.util.Properties

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

//todo:利用sparksql进行分析统计之后的结果数据保存到mysql表中
object DataSaveMysql {

  def main(args: Array[String]): Unit = {
      //1、创建SparkConf对象
        val sparkConf: SparkConf = new SparkConf().setAppName("DataSaveMysql")

     //2、创建SparkSession对象
      val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()


    //3、读取mysql表的数据
          //3.1 指定mysql连接地址
            val url="jdbc:mysql://node1:3306/spark"
          //3.2 指定要加载的表名
            val tableName=args(0)
         // 3.3 配置连接数据库的相关属性
             val properties = new Properties()
             //用户名
              properties.setProperty("user","root")
             //密码
              properties.setProperty("password","123456")
    
      val mysqlDF: DataFrame = spark.read.jdbc(url,tableName,properties)
    
      //打印schema信息
      //mysqlDF.printSchema()
    
       //展示数据
      //mysqlDF.show()
    
       //把dataFrame注册成表
    mysqlDF.createTempView("iplocation")
    
    //把数据接受到之后,进行进行对应的分析
    val result: DataFrame = spark.sql("select * from iplocation where total_count >1500")
    
    //保存result结果数据到mysql表中
     val destTable=args(1)
     //数据写入到mysql表中可以调用mode方法,这里可以指定数据写入的模式
      //overwrite: 表示覆盖,如果表事先不存在,它先帮忙我们创建
      //append: 表示追加,如果表事先不存在,它先帮忙我们创建
      //ignore: 表示忽略 如果表事先存在,就不进行任何操作
      //error :表示报错, 如果表事先存在就报错(默认选项)
    
    result.write.mode("append").jdbc(url,destTable,properties)
    
    spark.stop()


  }
}

  • 任务的提交脚本
spark-submit 
--master spark://node1:7077 
--class cn.doit.sparksql.DataSaveMysql 
--executor-memory 1g 
--total-executor-cores 4 
--driver-class-path /export/servers/hive/lib/mysql-connector-java-5.1.35.jar 
--jars /export/servers/hive/lib/mysql-connector-java-5.1.35.jar 
spark_doit_class08-1.0-shaded.jar 
iplocation doit

--driver-class-path:它把对应的jar包下发到Driver端
--jars:它把对应的jar包下发到每一个executor进程

3、sparksql保存数据处理结果的操作

  • 代码开发
package cn.doit.sparksql

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

//todo:sparksql可以把结果数据保存到不同的外部存储介质中
object SaveResult {

  def main(args: Array[String]): Unit = {
    //1、创建SparkConf对象
    val sparkConf: SparkConf = new SparkConf().setAppName("SaveResult").setMaster("local[2]")

    //2、创建SparkSession对象
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    //3、加载数据源
      val jsonDF: DataFrame = spark.read.json("E:\data\score.json")

    //4、把DataFrame注册成表
      jsonDF.createTempView("t_score")

    //todo:5、统计分析
      val result: DataFrame = spark.sql("select * from t_score where score > 80")

        //保存结果数据到不同的外部存储介质中
        //todo: 5.1 保存结果数据到文本文件  ----  保存数据成文本文件目前只支持单个字段,不支持多个字段
        //result.select("name").write.text("./data/result/123.txt")

        //todo: 5.2 保存结果数据到json文件
        //result.write.json("./data/json")

        //todo: 5.3 保存结果数据到parquet文件
        //result.write.parquet("./data/parquet")

        //todo: 5.4 save方法保存结果数据,默认的数据格式就是parquet
        //result.write.save("./data/save")

        //todo: 5.5 保存结果数据到csv文件
        //result.write.csv("./data/csv")

        //todo: 5.6 保存结果数据到表中
        //result.write.saveAsTable("t1")

        //todo: 5.7  按照单个字段进行分区 分目录进行存储
        //result.write.partitionBy("classNum").json("./data/partitions")

        //todo: 5.8  按照多个字段进行分区 分目录进行存储
       result.write.partitionBy("classNum","name").json("./data/numPartitions")


    spark.stop()
  }

}

4、spark的窗口函数

  • 代码开发
package cn.doit.sparksql

import org.apache.spark.sql.{DataFrame, SparkSession}

object SparkTopN {

  def main(args: Array[String]): Unit = {
    //1、构建SparkSession
      val sparkSession: SparkSession = SparkSession.builder().appName("SparkTopN").master("local[2]").getOrCreate()
    sparkSession.sparkContext.setLogLevel("warn")
    //2、加载json数据源
      val jsonDF: DataFrame = sparkSession.read.json("E:\data\score.json")

    //3、注册成表
      jsonDF.createTempView("user_score")

    //4、统计分析
    //todo:4.1 取出每一个班级中学生成绩最高的
     sparkSession.sql("select * from (select *,row_number() over(partition by classNum order by score desc) as rn  from user_score) t where t.rn=1" ).show()

    //todo:4.2 取出每一个班级中学生成绩最高的前2位
    sparkSession.sql("select * from (select *,row_number() over(partition by classNum order by score desc) as rn  from user_score) t where t.rn <=2" ).show()

    //todo: 4.3 rank 可以跳跃进行排序,如果分数相同,这里的编号是一样,并且下面直接跳过
    sparkSession.sql("select *,rank() over(partition by classNum order by score desc) as rn  from user_score" ).show()

    //todo: 4.4 dense_rank 不跳跃进行排序,如果分数相同,这里的编号是一样,依次连续
    sparkSession.sql("select *,dense_rank() over(partition by classNum order by score desc) as rn  from user_score" ).show()


    sparkSession.stop()

  }
}

5、sparksql中自定义函数

5.1 自定义UDF函数(一对一的关系)
  • 代码开发
package cn.doit.sparksql

import org.apache.spark.sql.api.java.UDF1
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{DataFrame, SparkSession}

//TODO:自定义sparksql的UDF函数    一对一的关系
object SparkSQLFunction {

  def main(args: Array[String]): Unit = {
    //1、创建SparkSession
      val sparkSession: SparkSession = SparkSession.builder().appName("SparkSQLFunction").master("local[2]").getOrCreate()

    //2、构建数据源生成DataFrame
       val dataFrame: DataFrame = sparkSession.read.text("E:\data\test_udf_data.txt")

    //3、注册成表
       dataFrame.createTempView("t_udf")


    //4、实现自定义的UDF函数

    //小写转大写
     sparkSession.udf.register("low2Up",new UDF1[String,String]() {
       override def call(t1: String): String = {
           t1.toUpperCase
       }
     },StringType)

    //大写转小写
    sparkSession.udf.register("up2low",(x:String)=>x.toLowerCase)


    //4、把数据文件中的单词统一转换成大小写
      sparkSession.sql("select  value from t_udf").show()
      sparkSession.sql("select  low2Up(value) from t_udf").show()
      sparkSession.sql("select  up2low(value) from t_udf").show()


    sparkSession.stop()

  }
}

  • 接收2个Decimal参数,进行相加
spark.udf.register("addRR2", (x:Double,y:Double) => {
      x + y
    })
// UNIX_TIMESTAMP 将GMT 时间转换成时间戳
val result: DataFrame = spark.sql("select addRR2(nni_50,pnni_50) as nni, UNIX_TIMESTAMP(create_time) as create_time, UNIX_TIMESTAMP(update_time) as update_time, status,mean_nni from hrv where mean_nni > 800")
result.show()
5.2 自定义UDAF聚合函数(多对一的关系)
  • 代码开发
package cn.doit.sparksql

import org.apache.spark.sql.{DataFrame, SparkSession}

//todo:自定义UDAF函数----->多行输入一个输出
object SparkSQLUDAFFunction {

  def main(args: Array[String]): Unit = {
    //1、创建SparkSession
        val sparkSession: SparkSession = SparkSession.builder().appName("SparkSQLUDAFFunction").master("local[2]").getOrCreate()

   //2、读取数据文件
        val dataFrame: DataFrame = sparkSession.read.json("E:\data\test_udaf_data.json")

   //3、注册成表
         dataFrame.createTempView("t_udaf")

    //自定义一个udaf函数
      sparkSession.udf.register("avgSal",new MyUdafFunction)

   //4、通过sparksql执行sql语句,求出用户的平均薪水
     sparkSession.sql("select avgSal(salary) from t_udaf").show()


    sparkSession.stop()
  }
}

  • 自定义UDAF类
package cn.doit.sparksql

import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._

class MyUdafFunction  extends UserDefinedAggregateFunction{
   //输入数据的类型
  override def inputSchema: StructType ={
    StructType(StructField("input",LongType)::Nil)
  }


  //缓存区的数据类型 ,指定后期使用类似于2个字段进行对应的除法运行的字段类型   sum     countNum
  override def bufferSchema: StructType ={
    StructType(StructField("sum",LongType)::StructField("countNum",LongType)::Nil)
  }

  //最后的结果数据类型
  override def dataType: DataType ={
     DoubleType
  }

   //表示相同的输入是否得到相同的结果数
  override def deterministic: Boolean ={
      true
  }

  //给定数据的初始化值
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
        //给定金额字段的初始值  sum
        buffer(0)=0L

        //给定表的条数的初始值  countNum
        buffer(1)=0L
  }

  //更新数据
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit ={
    //sum字段的结果数据
    buffer(0)=buffer.getLong(0) +  input.getLong(0)

    //countNum字段的结果数据
    buffer(1)=buffer.getLong(1) + 1

  }

   //由于程序要进行分布式计算,需要把每个分区处理的结果数据最后进行合并
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit ={
      //统计所有用户的总金额
     buffer1(0)=buffer1.getLong(0) +buffer2.getLong(0)
      //统计一共有多少个用户
     buffer1(1)=buffer1.getLong(1)+buffer2.getLong(1)
  }

  //统计求平均薪水
  override def evaluate(buffer: Row): Any = {
    buffer.getLong(0).toDouble  / buffer.getLong(1)

  }
}

6、sparksql整合hive

  • 1、需要把hive安装目录下的配置文件hive-site.xml拷贝到每一个spark安装目录下对应的conf文件夹中

  • 2、需要一个连接mysql驱动的jar包拷贝到spark安装目录下对应的jars文件夹中

  • 3、可以使用spark-sql脚本 后期执行sql相关的任务

    spark-sql 
    --master spark://node1:7077 
    --executor-memory 1g 
    --total-executor-cores 4 
    --conf spark.sql.warehouse.dir=hdfs://node1:9000/user/hive/warehouse 
    
    
    #!/bin/sh
    #定义sparksql提交脚本的头信息
    SUBMITINFO="spark-sql --master spark://node1:7077 --executor-memory 1g --total-executor-cores 4 --conf spark.sql.warehouse.dir=hdfs://node1:9000/user/hive/warehouse" 
    #定义一个sql语句
    SQL="select * from employee;" 
    #执行sql语句   类似于 hive -e sql语句
    echo "$SUBMITINFO" 
    echo "$SQL"
    $SUBMITINFO -e "$SQL"
    
    set fileformat  查看文件格式 dos/unix
    set fileformat=unix  修改文件格式
    

7、使用sparksql实现ip地址查询

  • 代码开发
package cn.doit.sparksql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

//todo:通过开发sparksql程序来实现ip地址查询
object SparkSqlIplocation {

  def main(args: Array[String]): Unit = {
    //1、构建SparkSession
       val sparkSession: SparkSession = SparkSession.builder().appName("SparkSqlIplocation").master("local[2]").getOrCreate()

    //2、读取文件数据
       val sc: SparkContext = sparkSession.sparkContext
         sc.setLogLevel("warn")
       //城市ip信息
      val city_ip_rdd: RDD[(Long, Long, String, String)] = sc.textFile("E:\data\ip.txt").map(x=>x.split("\|")).map(x=>(x(2).toLong,x(3).toLong,x(x.length-2),x(x.length-1)))

      //运营商日志数据
      val ipsRDD: RDD[String] = sc.textFile("E:\data\20090121000132.394251.http.format").map(x=>x.split("\|")(1))
      //把ipsRDD中的每一个ip转换成Long类型
      val ipLongRDD: RDD[Long] = ipsRDD.map(ip => {
        //ip转换Long类型
        val split: Array[String] = ip.split("\.")
        var ipNum = 0L
        for (i <- split) {
          ipNum = i.toLong | ipNum << 8L
        }
        ipNum
      })


   //3、分别把对应的rdd数据转换成dataFrame
         import sparkSession.implicits._
        //在toDF方法中可以手动指定字段的名称
        val cityIpDF: DataFrame = city_ip_rdd.toDF("ipStart","ipEnd","longitude","latitude")
        val ipsDF: DataFrame = ipLongRDD.toDF("ip")

   //4、注册成表
       cityIpDF.createTempView("t_city_ip")
       ipsDF.createTempView("t_user_ip")

  //5、统计每一个经纬度出现的总次数
      val result: DataFrame = sparkSession.sql("select t1.longitude,t1.latitude,count(*) as num  from t_city_ip t1 join  t_user_ip t2 on t2.ip between t1.ipStart and t1.ipEnd group by t1.longitude,t1.latitude")
      result.show()

    sparkSession.stop()

  }
}
原文地址:https://www.cnblogs.com/xujunkai/p/15007542.html