[DB] Flink 读 MySQL

思路

在 Flink 中创建一张表有两种方法:

  • 从一个文件中导入表结构(Structure)(常用于批计算)(静态)
  • 从 DataStream 或者 DataSet 转换成 Table (动态)
package com.kaikeba.mysql.demo

import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala.BatchTableEnvironment
import org.apache.flink.types.Row

object Flink2Mysql {
  def main(args: Array[String]): Unit = {
    //设定执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    val tEnv = BatchTableEnvironment.create(env)

    //通过创建JDBCInputFormat读取JDBC数据源
    val jdbcDataSet: DataSet[Row] =
      env.createInput(JDBCInputFormat.buildJDBCInputFormat()
        .setDrivername("com.mysql.cj.jdbc.Driver")
        .setDBUrl("jdbc:mysql://127.0.0.1:3306/flink-mysql?serverTimezone=GMT%2B8&characterEncoding=UTF-8&useSSL=false")
        .setUsername("root")
        .setPassword("Chen1227+")
        .setQuery("select * from filter")
        .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
        .finish()
      )

    //将DataSet注册为表
    tEnv.registerDataSet("tb", jdbcDataSet)
    //执行查询操作
    val table = tEnv.sqlQuery("select * from tb")
    //把table转为DataSet
    tEnv.toDataSet[Row](table).print()
  }
}  

 

参考

Flink 读写 Mysql

https://blog.csdn.net/Android_xue/article/details/102705711

https://blog.csdn.net/ranyizhang/article/details/103759251

https://www.cnblogs.com/Gxiaobai/p/12645497.html

Flink流处理访问MySQL

https://blog.csdn.net/u012447842/article/details/89175772

Flink实例

https://blog.csdn.net/xianpanjia4616/article/details/98318750

原文地址:https://www.cnblogs.com/cxc1357/p/13798279.html