flink Stream Api cdc demo

最近在研究 flink-cdc ,因为想到table api 的 cdc 都是针对单表,如果在同一个数据库上,有很多表需要实时采集(比如: 100 张表),会不会对 mysql 造成压力,如果 mysql 数据量又比较大,是不是会对 mysql 所在服务器造成磁盘和网络的压力。

对 binlog 有所了解的都知道,binlog 是不区分数据库和表的,所以在读取 binlog 的时候,即使只需要一张表的 binlog,也需要解析全部的 binlog 文件,如果 cdc 的表很多,可以想象,资源的消耗是成倍的增加。

基于这样的问题,有个新的思路,用一个任务把所有需要的表的 binlog 全部解析成 json 发到 kafka 中,将 mysql 的压力转嫁到 kafka 上,而mysql 都可以承受的压力,对 kafka 来说就称不上是压力了(不过这样跟直接部署个 canal 或 Debezium 基本一样了)。

## 官网案例

flink-cdc 官网 Stream API 案例如下:

官网链接: https://github.com/ververica/flink-cdc-connectors/wiki#usage-for-datastream-api

public class MySqlBinlogSourceExample {
  public static void main(String[] args) throws Exception {
    SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
      .hostname("localhost")
      .port(3306)
      .databaseList("inventory") // monitor all tables under inventory database
      .username("flinkuser")
      .password("flinkpw")
      .deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String
      .build();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env
      .addSource(sourceFunction)
      .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

    env.execute();
  }
}

我们需要做的就是将 StringDebeziumDeserializationSchema 修改了,并写个 kafka sink

## 解析

解析 Debezium 格式的 binlog,官方提供了两个 DeserializationSchema: StringDebeziumDeserializationSchema 和 RowDataDebeziumDeserializeSchema , StringDebeziumDeserializationSchema 就是输出 Debezium 的 SourceRecord 的 toString 结果,而 RowDataDebeziumDeserializeSchema 需要预先定义表的 scheme,跟我们的需求不同。

所有,我自己解析了 Debezium 的 SourceRecord,将结果转成了 json 的,并把一些如: host、端口、数据库、表等信息加入了其中(可能会有分库分表)

/**
 * deserialize debezium format binlog
 */
public class CommonStringDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {

    private String host;
    private int port;


    public CommonStringDebeziumDeserializationSchema(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void deserialize(SourceRecord record, Collector<String> out) {
        JsonObject jsonObject = new JsonObject();

        jsonObject.addProperty("host", host);
        jsonObject.addProperty("port", port);
        jsonObject.addProperty("file", (String) record.sourceOffset().get("file"));
        jsonObject.addProperty("pos", (Long) record.sourceOffset().get("pos"));
        jsonObject.addProperty("ts_sec", (Long) record.sourceOffset().get("ts_sec"));
        String[] name = record.valueSchema().name().split("\.");
        jsonObject.addProperty("db", name[1]);
        jsonObject.addProperty("table", name[2]);
        Struct value = ((Struct) record.value());
        String operatorType = value.getString("op");
        jsonObject.addProperty("operator_type", operatorType);
        // c : create, u: update, d: delete, r: read
        // insert update
        if (!"d".equals(operatorType)) {
            Struct after = value.getStruct("after");
            JsonObject afterJsonObject = parseRecord(after);
            jsonObject.add("after", afterJsonObject);
        }
        // update & delete
        if ("u".equals(operatorType) || "d".equals(operatorType)) {
            Struct source = value.getStruct("before");
            JsonObject beforeJsonObject = parseRecord(source);
            jsonObject.add("before", beforeJsonObject);
        }
        jsonObject.addProperty("parse_time", System.currentTimeMillis() / 1000);

        out.collect(jsonObject.toString());
    }

    private JsonObject parseRecord(Struct after) {
        JsonObject jo = new JsonObject();
        for (Field field : after.schema().fields()) {
            switch ((field.schema()).type()) {
                case INT8:
                    int resultInt8 = after.getInt8(field.name());
                    jo.addProperty(field.name(), resultInt8);
                    break;
                case INT64:
                    Long resultInt = after.getInt64(field.name());
                    jo.addProperty(field.name(), resultInt);
                    break;
                case FLOAT32:
                    Float resultFloat32 = after.getFloat32(field.name());
                    jo.addProperty(field.name(), resultFloat32);
                    break;
                case FLOAT64:
                    Double resultFloat64 = after.getFloat64(field.name());
                    jo.addProperty(field.name(), resultFloat64);
                    break;
                case BYTES:
                    // json ignore byte column
                    // byte[] resultByte = after.getBytes(field.name());
                    // jo.addProperty(field.name(), String.valueOf(resultByte));
                    break;
                case STRING:
                    String resultStr = after.getString(field.name());
                    jo.addProperty(field.name(), resultStr);
                    break;
                default:
            }
        }

        return jo;
    }

    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

核心方法是 deserialize 解析数据 和 parseRecord 解析表中字段内容

解析出来的数据如下:

插入:

sql : insert into user_log1(user_id, item_id, category_id, behavior, ts) values('venn1', 'item_1', 'category_1', 'read', now());
{
"host":"localhost","port":3306,"file":"binlog.000002","pos":13781,"ts_sec":null,"db":"venn","table":"user_log","operator_type":"c",
"after":{"id":16,"user_id":"venn1","item_id":"item_1","category_id":"category_1","behavior":"read","ts":1619358456000},"parse_time":1619360320}

更新:

sql : update user_log set user_id = 'zhangsan1' where id = 10;

{"host":"localhost","port":3306,"file":"binlog.000002","pos":14205,"ts_sec":1619360393,"db":"venn","table":"user_log","operator_type":"u",
"after":{"id":10,"user_id":"zhangsan1","item_id":"item_1","category_id":"category_1","behavior":"read","ts":1619342074000},
"before":{"id":10,"user_id":"venn1","item_id":"item_1","category_id":"category_1","behavior":"read","ts":1619342074000},"parse_time":1619360394}

删除:

delete from user_log where id = 10;
{
"host":"localhost","port":3306,"file":"binlog.000002","pos":14598,"ts_sec":1619360441,"db":"venn","table":"user_log","operator_type":"d","before":{"id":10,"user_id":"zhangsan1","item_id":"item_1","category_id":"category_1","behavior":"read","ts":1619342074000},"parse_time":1619360441}

注: operator_type: c : create, u: update, d: delete, r: read
before 为原始数据, after 为插入、修改后的数据

## sink

由于需要解析的表可能很多,所有单独写了个 sink,将不同表的数据,发往不同的 topic,代码如下:

@Override
public void invoke(String element, Context context) {

    JsonObject jsonObject = parser.parse(element).getAsJsonObject();
    String db = jsonObject.get("db").getAsString();
    String table = jsonObject.get("table").getAsString();
    // topic 不存在就自动创建
    String topic = db + "_" + table;
    ProducerRecord<String, String> record = new ProducerRecord<>(topic, element);
    kafkaProducer.send(record);
}

如果不需要将数据写到不同的topic,直接用flink 提供的 FlinkkakfaProducer 即可

遇到个问题: MySQL 8 的报错,不能检索公钥,url 中不能指定 allowPublicKeyRetrieval 参数

Caused by: org.apache.kafka.connect.errors.ConnectException: Error reading MySQL variables: Public Key Retrieval is not allowed
    at io.debezium.connector.mysql.MySqlJdbcContext.querySystemVariables(MySqlJdbcContext.java:342)
    at io.debezium.connector.mysql.MySqlJdbcContext.readMySqlSystemVariables(MySqlJdbcContext.java:321)
    at io.debezium.connector.mysql.MySqlTaskContext.<init>(MySqlTaskContext.java:79)
    at io.debezium.connector.mysql.MySqlTaskContext.<init>(MySqlTaskContext.java:52)
    at io.debezium.connector.mysql.MySqlConnectorTask.createAndStartTaskContext(MySqlConnectorTask.java:350)
    at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:143)
    at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:106)
    at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:758)
    at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:171)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLNonTransientConnectionException: Public Key Retrieval is not allowed
    at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:110)
    at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
    at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
    at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:836)
    at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:456)
    at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:246)
    at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:197)
    at io.debezium.jdbc.JdbcConnection.lambda$patternBasedFactory$1(JdbcConnection.java:230)
    at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:871)
    at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:866)
    at io.debezium.jdbc.JdbcConnection.connect(JdbcConnection.java:412)
    at io.debezium.connector.mysql.MySqlJdbcContext.querySystemVariables(MySqlJdbcContext.java:328)
    ... 11 more

在 git 上提了个 issue,看下大佬的回复吧,不行就自己改下源码,添加这个参数 : https://github.com/ververica/flink-cdc-connectors/issues/173

完整代码参见 github : https://github.com/springMoon/flink-rookie  MySqlBinlogSourceExample

欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文

 

原文地址:https://www.cnblogs.com/Springmoon-venn/p/14702474.html