使用SparkStreaming实现将数据写到MySQL中

(1)在pom.xml中加入如下依赖包

复制代码
 1 <dependency>
 2     <groupId>mysql</groupId>
 3     <artifactId>mysql-connector-java</artifactId>
 4     <version>5.1.38</version>
 5 </dependency>
 6 <dependency>
 7     <groupId>commons-dbcp</groupId>
 8     <artifactId>commons-dbcp</artifactId>
 9     <version>1.4</version>
10 </dependency>
复制代码
 

(2)在MySql中创建数据库和表,命令操作如下

1 mysql -uroot –p
2 create database test;
3 use test;
4 show tables;
5 create table streaming(item varchar(30),count int);

(3)使用Java编写一个数据库连接池类

复制代码
 1 package cn.itcast.spark.day7;
 2 
 3 import java.sql.Connection;
 4 import java.sql.DriverManager;
 5 import java.util.LinkedList;
 6 
 7 public class ConnectionPool {
 8     private static LinkedList<Connection> connectionQueue;
 9 
10     static {
11         try {
12             Class.forName("com.mysql.jdbc.Driver");
13         }catch (ClassNotFoundException e) {
14             e.printStackTrace();
15         }
16     }
17 
18     public synchronized static Connection getConnection() {
19         try {
20             if (connectionQueue == null) {
21                 connectionQueue = new LinkedList<Connection>();
22                 for (int i = 0;i < 5;i ++) {
23                     Connection conn = DriverManager.getConnection(
24                             "jdbc:mysql://localhost:3306/test?characterEncoding=utf8&useSSL=true",
25                             "root",
26                             "root"
27                     );
28                     connectionQueue.push(conn);
29                 }
30             }
31         }catch (Exception e) {
32             e.printStackTrace();
33         }
34         return connectionQueue.poll();
35     }
36 
37     public static void returnConnection(Connection conn) {
38         connectionQueue.push(conn);
39     }
40 }
复制代码

(4)编写Spark代码
复制代码
 1 package cn.itcast.spark.day7
 2 
 3 import org.apache.spark.{SparkConf, TaskContext}
 4 import org.apache.spark.streaming.{Seconds, StreamingContext}
 5 
 6 object sqlTest {
 7   def main(args: Array[String]){
 8 
 9     val conf = new SparkConf().setMaster("local[2]").setAppName("w")
10     val ssc = new StreamingContext(conf,Seconds(5))
11 
12     val lines = ssc.socketTextStream("101.132.122.75",9999)
13     val words = lines.flatMap(_.split(" "))
14     val wordcount = words.map(x => (x,1)).reduceByKey(_+_)
15     wordcount.foreachRDD(rdd => {
16       rdd.foreachPartition(eachPartition => {
17         val conn = ConnectionPool.getConnection();
18         eachPartition.foreach(record => {
19           val sql = "insert into streaming(item,count) values('" + record._1 + "'," + record._2 + ")"
20           val stmt = conn.createStatement
21           stmt.executeUpdate(sql)
22         })
23         ConnectionPool.returnConnection(conn)
24       })
25     })
26     ssc.start()
27     ssc.awaitTermination()
28   }
29 }
复制代码

(5)打开netcat发送数据

1 root@spark-master:~# nc -lk 9999 
2 spark hadoop kafka spark hadoop kafka spark hadoop kafka spark hadoop

(6)提交

/usr/local/spark/bin/spark-submit --driver-class-path /usr/local/spark/lib/mysql-connector-java-5.1.35-bin.jar /root/Documents/SparkApps/SparkStreamApps.jar

转载于:https://www.cnblogs.com/hmy-blog/p/7798840.html

原文地址:https://www.cnblogs.com/it-deepinmind/p/14363682.html