flink 自定义 mysql source

mysql source

 1 import java.sql.{Connection, DriverManager, PreparedStatement}
 2 
 3 import org.apache.flink.configuration.Configuration
 4 import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
 5 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
 6 
 7 object FlinkDemo06_CustomSource_mysql {
 8     def main(args: Array[String]): Unit = {
 9         //1 环境
10         val env = StreamExecutionEnvironment.getExecutionEnvironment
11         //2 流对象
12         import org.apache.flink.api.scala._
13         val dStream: DataStream[Flight] = env.addSource(new MySqlSource)
14         //3 计算 统计次数
15         dStream.map(k=>(1,1)).keyBy(0).sum(1).print()
16         //4 执行
17         env.execute("custom mysql source")
18     }
19     
20     class MySqlSource extends RichSourceFunction[Flight] {
21         private var connection: Connection = null
22         private var ps: PreparedStatement = null
23         
24         override def open(parameters: Configuration): Unit = {
25             val driver = "com.mysql.jdbc.Driver"
26             val url = "jdbc:mysql://localhost:3306/test_db"
27             val username = "root"
28             Class.forName(driver)
29             connection = DriverManager.getConnection(url, username, "123456")
30             val sql = "select id,avgTicketPrice,cancelled,carrier,dest,destAirportID,origin,originAirportID from flight"
31             ps = connection.prepareStatement(sql)
32         }
33         
34         override def close(): Unit = {
35             if (connection != null) {
36                 connection.close()
37             }
38             if (ps != null) {
39                 ps.close()
40             }
41         }
42         
43         override def run(ctx: SourceFunction.SourceContext[Flight]): Unit = {
44             //获取结果集,遍历并输出,关闭结果集
45             val rs = ps.executeQuery()
46             while(rs.next()) {
47                 val flight = Flight(rs.getFloat("avgTicketPrice"),rs.getString("cancelled"),rs.getString("carrier"),rs.getString("dest"),rs.getString("destAirportID"),rs.getString("origin"),rs.getString("originAirportID"))
48                 ctx.collect(flight)
49             }
50             rs.close()
51         }
52         
53         override def cancel(): Unit = {}
54     }
55     
56     case class Flight(avgTicketPrice: Float, cancelled: String, carrier: String, dest: String, destAirportID: String, origin: String, originAirportID: String)    
57 }
View Code

数据准备

 1 import java.io.{BufferedReader, FileReader}
 2 import java.sql.{Connection, DriverManager, PreparedStatement}
 3 
 4 import org.slf4j.LoggerFactory
 5 
 6 object FlinkDemo06_DataHelp {
 7     val logger = LoggerFactory.getLogger(FlinkDemo06_DataHelp.getClass)
 8     
 9     val url = "jdbc:mysql://localhost:3306/test_db"
10     val driver = "com.mysql.jdbc.Driver"
11     val username = "root"
12     
13     case class Flight(avgTicketPrice: String, cancelled: String, carrier: String, dest: String, destAirportID: String, origin: String, originAirportID: String)
14     
15     def main(args: Array[String]): Unit = {
16         Class.forName(driver)
17         var conn: Connection = null
18         var stat: PreparedStatement = null
19         val br = new BufferedReader(new FileReader("I:\projectImplement\dataWareHouse\test-es\data\630data.csv"))
20         var line: String = null
21         try {
22             conn = DriverManager.getConnection(url, username, "123456")
23             val sql =
24                 s"""
25                    |insert into flight
26                    | (id,avgTicketPrice,cancelled,carrier,dest,destAirportID,origin,originAirportID)
27                    |values
28                    | (null,?,?,?,?,?,?,?)
29                    |""".stripMargin
30             stat = conn.prepareStatement(sql)
31             
32             while ((line = br.readLine()) != null) {
33                 val vals = line.split(",")
34                 stat.setFloat(1, vals(0).toFloat)
35                 stat.setString(2, vals(1))
36                 stat.setString(3, vals(2))
37                 stat.setString(4, vals(3))
38                 stat.setString(5, vals(4))
39                 stat.setString(6, vals(5))
40                 stat.setString(7, vals(6))
41                 val count = stat.executeUpdate()
42                 if (count != 1) logger.error("插入失败")
43                 stat.clearParameters()
44                 
45                 Thread.sleep(10000L)
46             }
47         } catch {
48             case e: Exception => {
49                 logger.error(e.toString)
50             }
51         } finally {
52             try {
53                 if (br != null) {
54                     br.close()
55                 }
56                 if (stat != null) {
57                     stat.close()
58                 }
59                 if (conn != null) {
60                     conn.close()
61                 }
62             } catch {
63                 case e: Exception => logger.info(e.toString)
64             }
65         }
66     }
67 }
68 
69 /*
70 create table flight (
71 id int auto_increment,
72 avgTicketPrice float,
73 cancelled varchar(5),
74 carrier varchar(100),
75 dest varchar(100),
76 destAirportID varchar(100),
77 origin varchar(100),
78 originAirportID varchar(100),
79 primary key(id)
80 );
81  */
View Code
 1 642.5951538085938 ,false,Logstash Airways,OR Tambo International Airport,JNB,Al Maktoum International Airport ,DWC
 2 328.17108154296875,false,ES-Air,Licenciado Benito Juarez International Airport ,AICM ,Copenhagen Kastrup Airport,CPH
 3 774.8056030273438 ,false,Kibana Airlines ,Stockholm-Arlanda Airport,ARN,San Diego International Airport,SAN
 4 650.2644653320312 ,false,JetBeats,Ukrainka Air Base ,XHBU ,Adolfo Suarez Madrid— Barajas Airport ,MAD
 5 227.354248046875,false,Kibana Airlines ,Winnipeg / James Armstrong Richardson International Airport,YWG,Winnipeg / James Armstrong Richardson International Airport ,YWG
 6 575.6364135742188 ,false,Logstash Airways,Gimpo International Airport ,GMP,Genoa Cristoforo Colombo Airport ,GE01 
 7 619.7527465820312 ,true ,ES-Air,Tokyo Haneda International Airport ,HND,Shanghai Hongqiao International Airport ,SHA
 8 612.101318359375,false,Logstash Airways,Reno Tahoe International Airport,RNO,Dubai International Airport,DXB
 9 998.6478881835938 ,false,Kibana Airlines ,Xi'an Xianyang International Airport ,XIY,Kempegowda International Airport ,BLR
10 573.07666015625,false,Kibana Airlines ,Kansai International Airport,KIX,Beijing Capital International Airport ,PEK
View Code
原文地址:https://www.cnblogs.com/xiefeichn/p/13174977.html