FLINK实例(4): CONNECTORS(3)MySQL读写

1 工程目录结构

 pom.xml

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.56</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-jdbc_2.11</artifactId>
            <version>1.9.2</version>
        </dependency>

2 flink 读取MySQL

1)  通过自定义source提交

MySQLSource

package com.atguigu.flink.source

import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}

import com.atguigu.flink.bean.SensorReading
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}

class MySQLSource extends RichSourceFunction[SensorReading] {
  var conn:Connection = null
  var ps:PreparedStatement = null
  // 流打开时操作
  override def open(parameters: Configuration): Unit = {
    // 加载驱动
    Class.forName("com.mysql.jdbc.Driver")
    // 数据库连接
    conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/gmall1122?useSSL=false","root","123456")
    ps = conn.prepareStatement("select * from sensor limit 5")
  }

  // 流运行时操作
  override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
    try {
      var resultSet:ResultSet = ps.executeQuery()
      while (resultSet.next()){
        var id:String = resultSet.getString("id")
        var curTime:Long = resultSet.getLong("timestamp")
        var timepreture:Double = resultSet.getDouble("timepreture")
        sourceContext.collect(SensorReading(id,curTime,timepreture))

      }
    } catch {
      case _:Exception => 0
    } finally {
      conn.close()
    }

  }
  
  // 流关闭时操作
  override def cancel(): Unit = {
    try{
      if(conn!=null){
        conn.close()
      }
      if(ps!=null){
        ps.close()
      }
    } catch {
      case _:Exception => print("error")
    }
  }
}

主程序入口 MySQLSourceSinkApp

package com.atguigu.flink.app

import com.atguigu.flink.bean.SensorReading
import com.atguigu.flink.sink.MySQLSink
import com.atguigu.flink.source.MySQLSource
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.scala
import org.apache.flink.streaming.api.scala._


object MySQLSourceSinkApp {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)


    //调用addSource以此来作为数据输入端
    val stream: scala.DataStream[SensorReading] = env.addSource(new MySQLSource)


    //调用addSink以此来作为数据输出端
    stream.addSink(new MySQLSink())

    // 打印流
    stream.print()

    // 执行主程序
    env.execute()
  }

}

2) 通过 JDBCInputFormat方式

    
  val sql_read = "select * from sensor limit 5"

def readMysql(env:ExecutionEnvironment,url: String, driver: String, user: String, pwd: String, sql: String): DataSet[SensorReading] ={ // 获取数据流 val dataResult: DataSet[Row] = env.createInput(JDBCInputFormat.buildJDBCInputFormat() .setDrivername(driver) .setDBUrl(url) .setUsername(user) .setPassword(pwd) .setQuery(sql) .setRowTypeInfo(new RowTypeInfo( BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO)) .finish()) // 转化为自定义格式 val dStream = dataResult.map(x=> { val id = x.getField(0).asInstanceOf[String] val timestamp = x.getField(1).asInstanceOf[Long] val timepreture = x.getField(2).asInstanceOf[Double] SensorReading(id, timestamp, timepreture) }) return dStream }

主程序入口MySQLSourceSinkApp2

package com.atguigu.flink.app

import com.atguigu.flink.bean.SensorReading
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
import org.apache.flink.types.Row

object MySQLSourceSinkApp2 {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val driver = "com.mysql.jdbc.Driver"
    val url = "jdbc:mysql://localhost:3306/gmall1122?useSSL=false"
    val username = "root"
    val password = "123456"
    val sql_read = "select * from sensor limit 5"
    val sql_write = "insert into sensor (id, timestamp,timepreture) values(?,?,?)"


    def readMysql(env:ExecutionEnvironment,url: String, driver: String, user: String, pwd: String, sql: String): DataSet[SensorReading] ={
      // 获取数据流
      val dataResult: DataSet[Row] = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
        .setDrivername(driver)
        .setDBUrl(url)
        .setUsername(user)
        .setPassword(pwd)
        .setQuery(sql)
        .setRowTypeInfo(new RowTypeInfo(
          BasicTypeInfo.STRING_TYPE_INFO,
          BasicTypeInfo.LONG_TYPE_INFO,
          BasicTypeInfo.DOUBLE_TYPE_INFO))
        .finish())

      // 转化为自定义格式
      val dStream = dataResult.map(x=> {
        val id = x.getField(0).asInstanceOf[String]
        val timestamp = x.getField(1).asInstanceOf[Long]
        val timepreture = x.getField(2).asInstanceOf[Double]
        SensorReading(id, timestamp, timepreture)
      })
      return dStream
    }

    // 读取mysql数据
    val readStream = readMysql(env, url, driver ,username ,password ,sql_read)

  }


}

3 flink 写入 MySQL

1)  通过自定义Sink提交

MySQLSink

package com.atguigu.flink.sink

import java.sql.{Connection, DriverManager, PreparedStatement}

import com.atguigu.flink.bean.SensorReading
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction

class MySQLSink extends RichSinkFunction[SensorReading]{
  var conn:Connection = null
  var ps:PreparedStatement = null
  val INSERT_CASE:String = "INSERT INTO sensor (id, timestamp,timepreture) " + "VALUES (?, ?, ?) "

  override def open(parameters: Configuration): Unit = {
    // 加载驱动
    Class.forName("com.mysql.jdbc.Driver")
    // 数据库连接
    conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/gmall1122?useSSL=false","root","123456")
    ps = conn.prepareStatement(INSERT_CASE)
  }

  override def invoke(value:SensorReading): Unit = {
    try{
      ps.setString(1,value.id)
      ps.setLong(2,value.timestamp)
      ps.setDouble(3,value.timepreture)
      ps.addBatch()
      ps.executeBatch()
    } catch {
      case _:Exception => 0
    }
  }
}

主程序入口MySQLSourceSinkApp

package com.atguigu.flink.app

import com.atguigu.flink.bean.SensorReading
import com.atguigu.flink.sink.MySQLSink
import com.atguigu.flink.source.MySQLSource
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.scala
import org.apache.flink.streaming.api.scala._


object MySQLSourceSinkApp {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)


    //调用addSource以此来作为数据输入端
    val stream: scala.DataStream[SensorReading] = env.addSource(new MySQLSource)


    //调用addSink以此来作为数据输出端
    stream.addSink(new MySQLSink())

    // 打印流
    stream.print()

    // 执行主程序
    env.execute()
  }

}

2)  通过JDBCOutputFormat

在flink中没有现成的用来写入MySQL的sink,但是flink提供了一个类,JDBCOutputFormat,通过这个类,如果你提供了jdbc的driver,则可以当做sink使用。

JDBCOutputFormat其实是flink的batch api,但也可以用来作为stream的api使用,社区也推荐通过这种方式来进行。

    val sql_write = "insert into sensor (id, timestamp,timepreture) values(?,?,?)"


    def writeMysql(env: ExecutionEnvironment, outputData: DataSet[Row], url: String, user: String, pwd: String, sql: String) = {
      outputData.output(JDBCOutputFormat.buildJDBCOutputFormat()
        .setDrivername("com.mysql.jdbc.Driver")
        .setDBUrl(url)
        .setUsername(user)
        .setPassword(pwd)
        .setQuery(sql)
        .finish())
      env.execute("insert data to mysql")
      print("data write successfully")
    }

主程序入口MySQLSourceSinkApp2

package com.atguigu.flink.app

import com.atguigu.flink.bean.SensorReading
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
import org.apache.flink.types.Row

object MySQLSourceSinkApp2 {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val driver = "com.mysql.jdbc.Driver"
    val url = "jdbc:mysql://localhost:3306/gmall1122?useSSL=false"
    val username = "root"
    val password = "123456"
    val sql_read = "select * from sensor limit 5"
    val sql_write = "insert into sensor (id, timestamp,timepreture) values(?,?,?)"


    def readMysql(env:ExecutionEnvironment,url: String, driver: String, user: String, pwd: String, sql: String): DataSet[SensorReading] ={
      // 获取数据流
      val dataResult: DataSet[Row] = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
        .setDrivername(driver)
        .setDBUrl(url)
        .setUsername(user)
        .setPassword(pwd)
        .setQuery(sql)
        .setRowTypeInfo(new RowTypeInfo(
          BasicTypeInfo.STRING_TYPE_INFO,
          BasicTypeInfo.LONG_TYPE_INFO,
          BasicTypeInfo.DOUBLE_TYPE_INFO))
        .finish())

      // 转化为自定义格式
      val dStream = dataResult.map(x=> {
        val id = x.getField(0).asInstanceOf[String]
        val timestamp = x.getField(1).asInstanceOf[Long]
        val timepreture = x.getField(2).asInstanceOf[Double]
        SensorReading(id, timestamp, timepreture)
      })
      return dStream
    }

    // 读取mysql数据
    val readStream = readMysql(env, url, driver ,username ,password ,sql_read)

// 将流中的数据格式转化为JDBCOutputFormat接受的格式 val outputData
= readStream.map(x => { val row = new Row(3) row.setField(0, x.id) row.setField(1, x.timestamp) row.setField(2, x.timepreture) row }) def writeMysql(env: ExecutionEnvironment, outputData: DataSet[Row], url: String, user: String, pwd: String, sql: String) = { outputData.output(JDBCOutputFormat.buildJDBCOutputFormat() .setDrivername("com.mysql.jdbc.Driver") .setDBUrl(url) .setUsername(user) .setPassword(pwd) .setQuery(sql) .finish()) env.execute("insert data to mysql") print("data write successfully") } // 向mysql插入数据 writeMysql(env,outputData,url,username,password,sql_write) } }

4 scala读取 MySQL

MysqlUtil

package com.atguigu.flink.utils

import java.sql.{Connection, DriverManager, ResultSet, ResultSetMetaData, Statement}

import com.alibaba.fastjson.JSONObject

import scala.collection.mutable.ListBuffer

object MysqlUtil {
  def main(args: Array[String]): Unit = {
    val list:  List[ JSONObject] = queryList("select * from sensor limit 5")
    println(list)
  }

  def queryList(sql:String):List[JSONObject]={
    //加载驱动
    Class.forName("com.mysql.jdbc.Driver")
    val resultList: ListBuffer[JSONObject] = new  ListBuffer[ JSONObject]()
    //链接数据库
    val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/gmall1122?useSSL=false","root","123456")
    val stat: Statement = conn.createStatement
    val rs: ResultSet = stat.executeQuery(sql )
    val md: ResultSetMetaData = rs.getMetaData
    while (  rs.next ) {
      val rowData = new JSONObject();
      for (i  <-1 to md.getColumnCount  ) {
        rowData.put(md.getColumnName(i), rs.getObject(i))
      }
      resultList+=rowData
    }

    stat.close()
    conn.close()
    resultList.toList

    //
  }

}

5 flink kafka Mysql 实现exatly once

本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13680290.html

原文地址:https://www.cnblogs.com/qiu-hua/p/13680290.html