spark连接mysql数据库的几种方式

一、spark连接mysql数据库的第一种方式:


def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().master("local").appName("createdataframefrommysql")
.config("spark.sql.shuffle.partitions", 1).getOrCreate()

/**
* 读取mysql的第一中方式
*
*/

val properties = new Properties()
properties.setProperty("user","root")
properties.setProperty("password","123")
val person: DataFrame = spark.read.jdbc("jdbc:mysql://192.168.126.111:3306/spark","person",properties)

person.show()
spark.read.jdbc("jdbc:mysql://192.168.126.111:3306/spark","(select person.id,person.name,person.age,score.score from person,score where person.id = score.id) T",properties).show()


二、第二种读取mysql数据的方式 

val map: Map[String, String] = Map[String, String](
      elems = "url" -> "jdbc:mysql://192.168.126.111:3306/spark",
      "driver" -> "com.mysql.jdbc.Driver",
      "user" -> "root",
      "password" -> "123",
      "dbtable" -> "score"

    )

    val score: DataFrame = spark.read.format("jdbc").options(map).load

    score.show()

三、第三种读取mysql 的方式

  val reader: DataFrameReader = spark.read.format("jdbc")
      .option("url", "jdbc:mysql://192.168.126.111:3306/spark")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("user", "root")
      .option("password", "123")
      .option("dbtable", "score")

    val source2: DataFrame = reader.load()

    source2.show()

四、将spark中的数据传输到mysql数据库

//将以上两张表注册为临时表,进行关联查询
    person.createOrReplaceTempView("person")
    score2.createOrReplaceTempView("score")

    val result = spark.sql("select person.id,person.name,person.age,score.score from person,score  where person.id=score.id ")

    result.show()

    //将查询出的结果保存到mysql表之中

    result.write.mode(SaveMode.Append).jdbc("jdbc:mysql://192.168.126.111:3306/spark","result",properties)

一个重要的参数: 

参数:  spark.sql.shuffle.partitions指定sql执行时,解析成sparkjob的分区数。

spark-sql将hive中的数据加载成为Dataframe

通过配置让spark找到hive所在的位置:

1、启动hive hive --service metastore &

2、将mynode3节点hive的配置文件发送到spark的配置节点  scp  ./hive-site-xml  mynode4:/software/spark-2.3.1/conf/

3、修改hive-site-xml中的配置参数   将其与的配置都删了,只保留

<configuration>
 <property>
  <name>hive.metastore.uris</name>
  <value>thrift://mynode1:9083</value>
 </property>
</configuration>
配置这个文件的作用:让spark可以找到hive中的元数据 ,找到元数据也就找到了hive

------------------------------------------------------------------------------------------------------------------------

用spark-sql查询hive中的数据:

1、启动hadoop   2、启动hive  3、启动spark   /software/spark-2.3.1/sbin/   ./start-all.sh  

./spark-shell --master spark://mynode1:7077,mynode2:7077   --通过这个命令启动spark的服务 
spark.sql("show databases").show()


使用MR和spaek sql 测试对同一批数据的查询速度 

spark代码在本地运行的时候,没有SparkSession.master()属性的设置,运行一定会报错 

spark-sql读取hive中的数据:

 val spark = SparkSession.builder().appName("CreateDataFrameFromHive").enableHiveSupport().getOrCreate()
    spark.sql("use spark")
    spark.sql("drop table if exists student_infos")
    spark.sql("create table if not exists student_infos (name string,age int) row format delimited fields terminated by '	'")
    spark.sql("load data local inpath '/root/test/student_infos' into table student_infos")
    spark.sql("drop table if exists student_scores")
    spark.sql("create table if not exists student_scores (name string,age int) row format delimited fields terminated by '	'")
    spark.sql("load data local inpath'/root/test/student_scores' into table student_scores")
    val frame: DataFrame = spark.table("student_infos")

    frame.show()

    val df: DataFrame = spark.sql("select si.name,si.age,ss.score from student_infos si,student_scores ss where si.name = ss.name")
    df.show()

    spark.sql("drop table if exists good_student_infos")

    /**
      * 将结果保存到hive 表之中
      *
      */
    df.write.mode(SaveMode.Overwrite).saveAsTable("good_student_infos")

 用maven将spark读取hive中的数据进行打包,首先 clear 之前项目中的tarfget文件就会消失,之后package 对数据进行打包 

将打包完成的jar包上传到linux从服务器,

用一下命令读取hive中的数据,并在hive中完成创建表或者删除一张表 

在spark bin 目录下

./spark-submit --master spark://mynode1:7077,mynode2:7077 --calss 报名.类名  jar在linux服务器中所在的位置

UDF : 用户自定义函数 

使用UDF是一对一的关系,读取一条数据处理得到一条数据

注册UDF: spark.udf.register("udf name ",function)

使用UDF:  sparkSession.sql("select xx,udf Name from tableName ....")

实例代码:

 val spark: SparkSession = SparkSession.builder().master("local").appName("test").getOrCreate()

    val nameList: List[String] = List[String]("zhangsan", "lisi", "wangwu", "zhouyu", "lili")

    import spark.implicits._

    val nameDF: DataFrame = nameList.toDF("name")
    nameDF.createOrReplaceTempView("students")
    nameDF.show()

    /**
      * 注册并自定义函数
      *
      */
    spark.udf.register("STRLEN",(name:String)=>
      {name.length}
    )

   spark.sql("select name,STRLEN(name) as length from  students order by length desc").show(100)

UDAF: 用户自定义聚合函数 

       主要是引用一个继承了UserDefinedAggregateFunction 类的类 

       继承这个类需要实现八个方法 ,以及每个方法所实现的作用 

       initialize :  1、在Map端每个RDD分区内,按照group by 的字段分组,每个分组都有初始化的值
                         2、在reduce 端给每个group by 的分组做初始值

       update  : 每个组,有新的值进来的时候,进行分组对应的聚合值的计算 

      merge : 在reduce阶段,有新的数据进来的时候,对该数据进行聚合

      bufferSchema: 聚合操作的时候,所处理数据的类型

     dataType : //最终函数返回值得数据类型 

     deterministic: 多次运行相同的输入总是有相同的输出

      evaluate最后返回一个最终的聚合值要和dataType的类型一致   

    UDAF:用户自定义聚合函数 的主要作用就是可以实现自己的聚合操作的具体内容的控制,具体实现需要按照业务的不同需求,去重新定义继承类的八个方法

    开窗函数 : 

   

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("over").enableHiveSupport().getOrCreate()
    spark.sql("use spark")
    spark.sql("create table if not exists sales (riqi string,leibie string,jine Int) " + "row format delimited fields terminated by '	'")
    spark.sql("load data local inpath '/root/test/sales' into table sales")

    /**
      * rank 在每个组内从1开始
      *   5 A 200   --- 1
      *   3 A 100   ---2
      *   4 A 80   ---3
      *   7 A 60   ---4
      *
      *   1 B 100   ---1
      *   8 B 90  ---2
      *   6 B 80  ---3
      *   1 B 70  ---4
      */
    val result = spark.sql(
      "select"
        +" riqi,leibie,jine "
        + "from ("
        + "select "
        +"riqi,leibie,jine,row_number() over (partition by leibie order by jine desc) rank "
        + "from sales) t "
        + "where t.rank<=3")
    result.write.mode(SaveMode.Append).saveAsTable("salesResult")
    result.show(100)

 

用java语言实现读取mysql中的数据:

SparkConf conf =  new SparkConf();
        conf.setMaster("local").setAppName("mysql");
        conf.set("spark.sql.shuffle.partitions","1");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);
        Map<String,String> map = new HashMap<String,String>();
        map.put("driver","com.mysql.jdbc.Driver");
        map.put("url","jdbc:mysql://192.168.126.111:3306/spark");
        map.put("user","root");
        map.put("password","123");
        map.put("dbtable","person");

        Dataset<Row> df = sqlContext.read().options(map).format("jdbc").load();
        //df.show();
        df.registerTempTable("person1");

        /**
         * 第二種连接JDBC的方式
         *
         */

        DataFrameReader read = sqlContext.read();
        read.option("driver","com.mysql.jdbc.Driver");
        read.option("url","jdbc:mysql://192.168.126.111:3306/spark");
        read.option("user","root");
        read.option("password","123");
        read.option("dbtable","score");
        Dataset<Row> jdbc = read.format("jdbc").load();
        jdbc.registerTempTable("score1");
        Dataset<Row> result = sqlContext.sql("select person1.id,person1.name,person1.age,score1.score from person1 join  score1 on  person1.name = score1.name ");
        result.show();
        Properties prop = new Properties();
        prop.put("user","root");
        prop.put("password","123");

        result.write().mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.126.111:3306/spark","result1",prop);
        //jdbc.show();
        sc.stop();

用Java言语实现读取hive 中的数据 :

SparkConf conf = new SparkConf();
        conf.setAppName("hive");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext hc = new SQLContext(sc);
        //创建表并加载数据
        hc.sql("use spark");
        hc.sql("create table student_info(name string,age int) row format delimited fields terminated by ','");
        hc.sql("load data local inpath '/root/data/student_infos' into table student_info");
        
        hc.sql("create table student_scores(name string,score int) row format delimited fields terminated by ','");
        hc.sql("load data local inpath '/root/data/student_scores' into table student_score");
        //得到表连接结果 
        Dataset<Row> sql = hc.sql("select t1.name,t1.age,t2.score from student_info t1 join student_score t2 on t1.name = t2.name");
        //将结果写回到hive 
        sql.write().mode(SaveMode.Overwrite).saveAsTable("student_result");

 

  

原文地址:https://www.cnblogs.com/wcgstudy/p/10984550.html