spark读文件写入mysql(scala版本)

 1 package com.zjlantone.hive
 2 
 3 import java.util.Properties
 4 
 5 import com.zjlantone.hive.SparkOperaterHive.sparkSession
 6 import org.apache.spark.rdd.RDD
 7 import org.apache.spark.sql.types.StructType
 8 import org.apache.spark.{SparkConf, SparkContext}
 9 import org.apache.spark.sql._
10 case class ManxingweiyanLis(diseaseName: String,cardId: String, lisName: String,lisResult:String,lisAndResult:String)
11 object jangganHive {
12   val sparkConf: SparkConf = new SparkConf().setAppName(jangganHive.getClass.getSimpleName)
13   val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
14   val url = "jdbc:mysql://192.168.4.732:3306/jianggan?Unicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false";
15   def main(args: Array[String]): Unit = {
16     assc
17     sparkSession.stop()
18   }
19 
20   def assc: Unit = {
21     import sparkSession.implicits._
22     import sparkSession.sql
23     val df: DataFrame = sql("select cardId,lisName,lisresult,lisbet from janggan.gaozhixuelis where lisbet !="" and lisName !="清洁度"")
24     val rdd: RDD[Row] = df.rdd
25     //计算化验结果
26     val operatorLis: RDD[(String, String)] = rdd.map(row => {
27       var i = ""
28       val cardID: String = row.get(0).toString
29       val lisName: String = row.get(1).toString
30       try {
31         val lisResult: String = row.get(2).toString
32         val lisBet: String = row.get(3).toString
33         if (lisResult.contains("+")) {
34           (cardID + "&" + lisName, "阳性")
35         } else if(lisResult.contains("阴性") || lisResult.contains("-")){
36           (cardID + "&" + lisName, "阴性")
37         }else {
38           val splits: Array[String] = lisBet.split("-|-")
39           if (lisResult.toDouble > splits(1).toDouble) {
40             i = "升高"
41           } else if (lisResult.toDouble < splits(0).toDouble) {
42             i = "降低"
43           }else{
44             i="正常"
45           }
46           (cardID + "&" + lisName, i)
47         }
48       } catch {
49         case e: Exception => {
50           (cardID + "&" + lisName, "数据异常")
51         }
52       }
53     })
54 
55     val frame: DataFrame = operatorLis.map(x => {
56       ManxingweiyanLis("高脂血症",x._1.split("&")(0), x._1.split("&")(1), x._2,x._1.split("&")(1)+x._2)
57     }).toDF()
58     val proprttity=new Properties()
59     proprttity.put("user", "root")
60     proprttity.put("password", "123456")
61     proprttity.put("driver", "com.mysql.jdbc.Driver")
62     frame.write.mode(SaveMode.Append).jdbc(url, "exceptionLis", proprttity)
63   }
64 }
原文地址:https://www.cnblogs.com/kwzblog/p/10191615.html