Flink实例(六十二): connectors(十三)clickhouse 写 入 (二)使用JDBC connector写入ClickHouse

https://help.aliyun.com/document_detail/175749.html

本节主要介绍如何利用 ClickHouse JDBC connector,使用不同版本的 Flink 写入数据到 ClickHouse 中。

背景信息

Flink 在 1.11.0 版本对其 JDBC connector 进行了一次较大的重构:

  • 重构之前(1.10.1 及之前版本),包名为 flink-jdbc 。
  • 重构之后(1.11.0 及之后版本),包名为 flink-connector-jdbc 。

二者对 Flink 中以不同方式写入 ClickHouse Sink 的支持情况如下:

 
API名称flink-jdbcflink-connector-jdbc
DataStream 不支持 支持
Table API (Legecy) 支持 不支持
Table API (DDL) 不支持 不支持

flink-connector-jdbc 完全移除了对 Table API (Legecy) 的支持,只能通过 DDL 的方式调用 Table API。但是,Table DDL 方式硬编码了其所支持的 JDBC Driver,不支持 ClickHouse。

下面,我们依次以 Flink 1.10.1 + flink-jdbc 以及 Flink 1.11.0 + flink-connector-jdbc 为例,介绍 Flink 写入 ClickHouse 的方法。

Flink 1.10.1 + flink-jdbc

Flink 1.10.1 及之前版本需要采用flink-jdbc+Table API的方式写入数据到ClickHouse。本节我们使用 Maven 及 Flink 1.10.1 版本进行示例。

  1. 用 mvn archetype:generate 命令创建项目,生成过程中根据提示输入 group-id 和 artifact-id 等。
$ mvn archetype:generate 
      -DarchetypeGroupId=org.apache.flink 
      -DarchetypeArtifactId=flink-quickstart-scala 
      -DarchetypeVersion=1.10.1

  2 编辑 pom.xml 中的<dependencies />小节添加依赖

//添加 Flink Table API 相关的依赖
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

//添加 Flink JDBC 以及 Clickhouse JDBC Driver 相关的依赖
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-jdbc_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>ru.yandex.clickhouse</groupId>
            <artifactId>clickhouse-jdbc</artifactId>
            <version>0.2.4</version>
        </dependency>
View Code

  3 创建数据写入程序文件。

示例程序使用CsvTableSource读入 CSV 文件产生 Table Source,使用JDBCAppendTableSink将数据写入到 ClickHouse Sink 中。
说明
  • 由于 ClickHouse 单次插入的延迟比较高,我们需要设置 BatchSize 来批量插入数据,提高性能。
  • 在 JDBCAppendTableSink 的实现中,若最后一批数据的数目不足 BatchSize,则不会插入剩余数据。
package org.myorg.example

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.sources._
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.api._
import org.apache.flink.types.Row
import org.apache.flink.table.api.{
  TableEnvironment,
  TableSchema,
  Types,
  ValidationException
}
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink
import org.apache.flink.api.common.typeinfo.TypeInformation

object StreamingJob {
  def main(args: Array[String]) {
    val SourceCsvPath =
      "/<your-path-to-test-csv>/source.csv"
    val CkJdbcUrl =
      "jdbc:clickhouse://<clickhouse-host>:<port>/<database>"
    val CkUsername = "<your-username>"
    val CkPassword = "<your-password>"
    val BatchSize = 500 // 设置您的batch size

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val tEnv = StreamTableEnvironment.create(env)

    val csvTableSource = CsvTableSource
      .builder()
      .path(SourceCsvPath)
      .ignoreFirstLine()
      .fieldDelimiter(",")
      .field("name", Types.STRING)
      .field("age", Types.LONG)
      .field("sex", Types.STRING)
      .field("grade", Types.LONG)
      .field("rate", Types.FLOAT)
      .build()

    tEnv.registerTableSource("source", csvTableSource)

    val resultTable = tEnv.scan("source").select("name, grade, rate")

    val insertIntoCkSql =
      """
        |  INSERT INTO sink_table (
        |    name, grade, rate
        |  ) VALUES (
        |    ?, ?, ?
        |  )
      """.stripMargin

//将数据写入 ClickHouse Sink 
    val sink = JDBCAppendTableSink
      .builder()
      .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
      .setDBUrl(CkJdbcUrl)
      .setUsername(CkUsername)
      .setPassword(CkPassword)
      .setQuery(insertIntoCkSql)
      .setBatchSize(BatchSize)
      .setParameterTypes(Types.STRING, Types.LONG, Types.FLOAT)
      .build()

    tEnv.registerTableSink(
      "sink",
      Array("name", "grade", "rate"),
      Array(Types.STRING, Types.LONG, Types.FLOAT),
      sink
    )

    tEnv.insertInto(resultTable, "sink")

    env.execute("Flink Table API to ClickHouse Example")
  }
}
View Code
参数说明:
  • SourceCsvPath:源CSV文件路径。
  • CkJdbcUrl:目标ClickHouse集群地址。
  • CkUsername:目标ClickHouse集群用户名。
  • CkPassword:目标ClickHouse集群对应密码。

4 编译运行。

$ mvn clean package
$ ${FLINK_HOME}/bin/flink run target/example-0.1.jar

Flink 1.11.0 + flink-connector-jdbc

Flink 1.11.0 及之后版本需要采用flink-connector-jdbc+DataStream的方式写入数据到ClickHouse。本节我们使用 Maven 及 Flink 1.11.0 版本进行示例。

  1 用 mvn archetype:generate 命令创建项目,生成过程中会提示输入 group-id 和 artifact-id 等。

$ mvn archetype:generate 
      -DarchetypeGroupId=org.apache.flink 
      -DarchetypeArtifactId=flink-quickstart-scala 
      -DarchetypeVersion=1.11.0

  2 编辑 pom.xml 中的<dependencies />小节添加依赖。

//添加 Flink Table API 相关的依赖
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

//添加 Flink JDBC Connector 以及 Clickhouse JDBC Driver相关的依赖
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>ru.yandex.clickhouse</groupId>
            <artifactId>clickhouse-jdbc</artifactId>
            <version>0.2.4</version>
        </dependency>
View Code

  3 创建数据写入程序文件。

示例程序使用 CsvTableSource 读入 CSV 文件产生 Table Source,通过 TableEnvironment.toAppendStream 将 Table 转换为 DataStream。使用 JdbcSink 将数据写入到 ClickHouse 中。

说明
  • 由于 ClickHouse 单次插入的延迟比较高,我们需要设置 BatchSize 来批量插入数据,提高性能。
  • 当前版本的 flink-connector-jdbc,使用 Scala API 调用 JdbcSink 时会出现 lambda 函数的序列化问题。我们只能采用手动实现 interface 的方式来传入相关 JDBC Statement build 函数(class CkSinkBuilder)。
     
    class CkSinkBuilder extends JdbcStatementBuilder[(String, Long, Float)] {
      def accept(ps: PreparedStatement, v: (String, Long, Float)): Unit = {
        ps.setString(1, v._1)
        ps.setLong(2, v._2)
        ps.setFloat(3, v._3)
      }
    }
package org.myorg.example

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.sources._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api._
import org.apache.flink.types.Row
import org.apache.flink.table.api.{
  TableEnvironment,
  TableSchema,
  Types,
  ValidationException
}
import org.apache.flink.connector.jdbc._
import java.sql.PreparedStatement

//手动实现 interface 的方式来传入相关 JDBC Statement build 函数
class CkSinkBuilder extends JdbcStatementBuilder[(String, Long, Float)] {
  def accept(ps: PreparedStatement, v: (String, Long, Float)): Unit = {
    ps.setString(1, v._1)
    ps.setLong(2, v._2)
    ps.setFloat(3, v._3)
  }
}

object StreamingJob {
  def main(args: Array[String]) {
    val SourceCsvPath =
      "/<your-path-to-test-csv>/source.csv"
    val CkJdbcUrl = "jdbc:clickhouse://<clickhouse-host>:<port>/<database>"
    val CkUsername = "<your-username>"
    val CkPassword = "<your-password>"
    val BatchSize = 500 // 设置您的 batch size

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val tEnv = StreamTableEnvironment.create(env)

    val csvTableSource = CsvTableSource
      .builder()
      .path(SourceCsvPath)
      .ignoreFirstLine()
      .fieldDelimiter(",")
      .field("name", Types.STRING)
      .field("age", Types.LONG)
      .field("sex", Types.STRING)
      .field("grade", Types.LONG)
      .field("rate", Types.FLOAT)
      .build()

    tEnv.registerTableSource("source", csvTableSource)

    val resultTable = tEnv.scan("source").select("name, grade, rate")

//将 Table 转换为 DataStream
    val resultDataStream =
      tEnv.toAppendStream[(String, Long, Float)](resultTable)

    val insertIntoCkSql =
      """
        |  INSERT INTO sink_table (
        |    name, grade, rate
        |  ) VALUES (
        |    ?, ?, ?
        |  )
      """.stripMargin

//将数据写入 ClickHouse  JDBC Sink
    resultDataStream.addSink(
      JdbcSink.sink[(String, Long, Float)](
        insertIntoCkSql,
        new CkSinkBuilder,
        new JdbcExecutionOptions.Builder().withBatchSize(BatchSize).build(),
        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
          .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
          .withUrl(CkJdbcUrl)
          .withUsername(CkUsername)
          .withUsername(CkPassword)
          .build()
      )
    )

    env.execute("Flink DataStream to ClickHouse Example")
  }
}
View Code
参数说明:
  • SourceCsvPath:源CSV文件路径。
  • CkJdbcUrl:目标ClickHouse集群地址。
  • CkUsername:目标ClickHouse集群用户名。
  • CkPassword:目标ClickHouse集群对应密码。

4  编译运行。

$ mvn clean package
$ ${FLINK_HOME}/bin/flink run target/example-0.1.jar

本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13871460.html

原文地址:https://www.cnblogs.com/qiu-hua/p/13871460.html