初试DTLE-单向-聚合-分散复制

环境准备

主机名 IP地址 部署服务
study01 10.186.65.68 DTLE、MySQL
study02 10.186.65.71 DTLE、MySQL
study03 10.186.65.72 DTLE、MySQL

Ps:如果不适用容器进行部署,首先安装三台或以上MySQL实例,并开启binlog以及GTID

DTLE概述

dtle (Data-Transformation-le) 是上海爱可⽣信息技术股份有限公司 开发并开源的 CDC ⼯具. 其功能特点是:

  1. 多种数据传输模式

    • ⽀持链路压缩
    • ⽀持同构传输和异构传输
    • ⽀持跨⽹络边际的传输
  2. 多种数据处理模式

    • ⽀持库/表/⾏级别 数据过滤
  3. 多种数据通道模式

    • ⽀持多对多的数据传输
    • ⽀持回环传输
  4. 多种源/⽬标端

    • ⽀持MySQL - MySQL的数据传输
    • ⽀持MySQL - Kafka的数据传输
  5. 集群模式

    • 提供可靠的元数据存储
    • 可进⾏⾃动任务分配
    • ⽀持⾃动故障转移

单向复制/聚合/分散

DTLE支持的常见复制场景如下

  1. 按数据源/数据目标划分

    • 支持1个源端到一个目标端的复制
    • 支持多个源端到一个目标端的聚合复制
    • 支持一个源端到多个目标的拆分复制
  2. 按网络类型划分

    • 支持网络内部数据传输
    • 支持跨网络的数据传输(可使⽤ 链路压缩/链路限流 等功能)
  3. 按集群规模划分

    • 可配置单⼀dtle实例处理单⼀数据通道
    • 可配置 dtle集群 处理 多个数据通道

image

DTLE单项复制

  1. 下载DTLE的RPM安装包
https://github.com/actiontech/dtle/releases/tag/v3.21.03.0
  1. 安装RPM包
[root@study01 ~]# rpm -ivh dtle-3.21.03.0.x86_64.rpm --prefix=/data/dtle
  1. 修改consul配置文件,如果安装时不指定安装路径,默认是在/etc/dtle目录下面
[root@study01 dtle]# pwd
/data/dtle/etc/dtle

[root@study01 dtle]# cat consul.hcl 
# Rename for each node
node_name = "consul0"   #定义consul名称,多集群名字不能重复
data_dir = "/data/dtle/var/lib/consul"
ui = true

disable_update_check = true

# Address that should be bound to for internal cluster communications
bind_addr = "0.0.0.0"
# Address to which Consul will bind client interfaces, including the HTTP and DNS servers
client_addr = "10.186.65.68"    #本机ip地址
advertise_addr = "10.186.65.68" #本机ip地址
ports = {
  # Customize if necessary. -1 means disable.
  #dns = -1
  #server = 8300
  #http = 8500
  #serf_wan = -1
  #serf_lan = 8301
}

limits = {
  http_max_conns_per_client = 4096
}

server = true
# For single node
bootstrap_expect = 1    #一台consul

# For 3-node cluster
#bootstrap_expect = 3   #三台consul,部署集群开启
#retry_join = ["127.0.0.1", "127.0.0.2", "127.0.0.3"] # will use default serf port

log_level = "INFO"
log_file = "/data/dtle/var/log/consul/"

  1. 修改nomad配置文件,如果安装时不指定安装路径,默认是在/etc/dtle目录下面
root@study01 dtle]# ls
consul.hcl  nomad.hcl

[root@study01 dtle]# pwd
/data/dtle/etc/dtle

[root@study01 dtle]# cat nomad.hcl
name = "nomad0" # rename for each node 定义nomad名字,部署集群名字不可重复
datacenter = "dc1"
data_dir  = "/data/dtle/var/lib/nomad"
plugin_dir = "/data/dtle/usr/share/dtle/nomad-plugin"

log_level = "Info"
log_file = "/data/dtle/var/log/nomad/"

disable_update_check = true

bind_addr = "0.0.0.0"   #监听地址
# change ports if multiple nodes run on a same machine
ports {
  http = 4646
  rpc  = 4647
  serf = 4648
}
addresses {
  # Default to `bind_addr`. Or set individually here.
  #http = "127.0.0.1"
  #rpc  = "127.0.0.1"
  #serf = "127.0.0.1"
}
advertise {
  http = "10.186.65.68:4646"
  rpc  = "10.186.65.68:4647"
  serf = "10.186.65.68:4648"
}

server {
  enabled          = true   #服务端开启

  bootstrap_expect = 1
  # Set bootstrap_expect to 3 for multiple (high-availablity) nodes.
  # Multiple nomad nodes will join with consul.
}

client {
  enabled = true
  options = {
    "driver.blacklist" = "docker,exec,java,mock,qemu,rawexec,rkt"
  }

  # Will auto join other server with consul.
}

consul {
  # dtle-plugin and nomad itself use consul separately.
  # nomad uses consul for server_auto_join and client_auto_join.
  # Only one consul can be set here. Write the nearest here,
  #   e.g. the one runs on the same machine with the nomad server.
  address = "10.186.65.68:8500" #客户端开启
}

plugin "dtle" {
  config {
    log_level = "Info" # repeat nomad log level here
    data_dir = "/data/dtle/var/lib/nomad"
    nats_bind = "10.186.65.68:8193"
    nats_advertise = "10.186.65.68:8193"
    # Repeat the consul address above.
    consul = "10.186.65.68:8500"    #对接consul地址

    # By default, API compatibility layer is disabled.
    #api_addr = "127.0.0.1:8190"   # for compatibility API
    nomad_addr = "10.186.65.68:4646" # compatibility API need to access a nomad server

    publish_metrics = false
    stats_collection_interval = 15
  }
}

  1. 启动nomad以及consul服务
systemctl restart dtle-consul.service
systemctl restart dtle-consul.service

[root@study01 dtle]# ps -ef | grep dtle
dtle      8580     1  1 14:21 ?        00:00:00 /data/dtle/usr/bin/consul agent -config-file=/data/dtle/etc/dtle/consul.hcl
dtle      8660     1 10 14:22 ?        00:00:00 /data/dtle/usr/bin/nomad agent -config /data/dtle/etc/dtle/nomad.hcl
root      8720  2787  0 14:22 pts/0    00:00:00 grep --color=auto dtle
  1. 登录nomad和consul的Web页面进行查看

  2. 在源端MySQL实例创建测试库和测试表,并在表中插入数据

mysql> create database test;
mysql> use test;
mysql> create table t1 (id int primary key);
mysql> insert into t1 values(1),(2),(3);
mysql> select * from t1;
  1. 创建job的json文件
[root@study01 dtle]# cat job.json
{
"Job": {
"ID": "job1",   #job名字
"Datacenters": ["dc1"],
"TaskGroups": [{
"Name": "src",
"Tasks": [{
"Name": "src",
"Driver": "dtle",
"Config": {
"Gtid": "",
"ReplicateDoDb": [{
"TableSchema": "test",  #复制的库
"Tables": [{
"TableName": "t1"   #复制的表
}]
}],
"ConnectionConfig": {
"Host": "10.186.65.68", #源端连接信息
"Port": 3333,
"User": "test",
"Password": "test"
}
}
}]
}, {
"Name": "dest",
"Tasks": [{
"Name": "dest",
"Driver": "dtle",
"Config": {
"ConnectionConfig": {
"Host": "10.186.65.72", #目标端连接信息
"Port": 4444,
"User": "test",
"Password": "test"
}
}
}]
}]
}
}
  1. 利用curl命令调用nomad客户端的接口,创建这个job
[root@study01 dtle]# curl -XPOST "http://10.186.65.68:4646/v1/jobs" -d @job.json -s | jq
{
  "EvalID": "a551d0fc-5de9-9e7b-3673-21b115919646",
  "EvalCreateIndex": 151,
  "JobModifyIndex": 150,
  "Warnings": "",
  "Index": 151,
  "LastContact": 0,
  "KnownLeader": false
}
  • -d @指定的是job的json文件的路径,我是在当前路径下执行所以不需要指定绝对路径
  • jq命令是用于提取返回结果的内容,类似与linux命令grep,需要安装才可以使用
  1. 查看创建job的状态
[root@study01 dtle]# curl -XGET "http://10.186.65.68:4646/v1/job/job1" -s | jq '.Status'
  • /v1/job/{创建job的ID名字}
  1. 进入到目标端数据库进行查看数据是否已经复制过去
mysql> show databases;
+--------------------+
| Database           |
+--------------------+
| dtle               |  #有一张二进制的表。用于记录job的GTID号
| information_schema |
| mysql              |
| performance_schema |
| sys                |
| test               |  #同步过来的库
| universe           |
+--------------------+

mysql> select * from test.t1;
+----+
| id |
+----+
|  1 |
|  2 |
|  3 |
+----+
  1. 在源端数据库在插入几条数据,在进行测试
mysql> use test;
mysql> insert into t1 values(7),(8),(9);

此时我们在到目标端数据库查看数据是否有增量数据进来

mysql> select * from test.t1;
+----+
| id |
+----+
|  1 |
|  2 |
|  3 |
|  7 |
|  8 |
|  9 |
+----+

HTTP API、nomad 命令⾏⼯具

nomad和consul之间的关系

nomad 本体使⽤consul进⾏多节点注册和发现,dtle nomad 插件使⽤consul进⾏任务元数据储存,也就是说,当我们的nomad agent创建一个job的时候会通过consul进行记录元数据,以及复制位置点,如果我们要删除一个job的时候,不单单只是把job给删除,还需要删除consul中的记录,如果不删除consul中的记录,那么下次创建job的时候默认还是会从consul中记录的位置点开始复制。

HTTP API

实际上我们使用curl命令调用的是nomad agent端的HTTP端的接口,将我们本地的job.json文件提交到nomad agent端

nomad命令行工具使用

nomad工具启动一个job使用的是hcl格式的文件,不可以使用json格式,模板默认放在/usr/share/dtle/scripts下面

  1. 先使用curl的方式删除job
[root@study01 dtle]# curl -XDELETE 10.186.65.68:4646/v1/job/job1?purge=true
{"EvalID":"c3e69b56-eaab-fa02-afbb-a15e0b395170","EvalCreateIndex":198,"JobModifyIndex":197,"VolumeEvalID":"","VolumeEvalIndex":0,"Index":198,"LastContact":0,"KnownLeader":false}
  1. 删除job后如果想清空consul记录使用以下命令
[root@study01 dtle]# curl -XDELETE "10.186.65.68:8500/v1/kv/dtle/job1?recurse"
true
  1. 使用nomad命令创建一个job

    • 复制一份hcl的模板,进行修改
    cp usr/share/dtle/scripts/example.job.hcl .
    
    • 修改hcl文件
    [root@study01 dtle]# mv example.job.hcl job.hcl
    
    [root@study01 dtle]# vim job.hcl    #修改复制源目标信息
    
  2. 使用nomad命令指定这个文件创建job

[root@study01 dtle]# ./usr/bin/nomad job run job.hcl
==> Monitoring evaluation "21c79d55"
    Evaluation triggered by job "job1"
    Evaluation within deployment: "c5aeae1d"
    Allocation "8640b0af" created: node "e1be240c", group "dest"
    Allocation "e44c8d29" created: node "e1be240c", group "src"
    Evaluation status changed: "pending" -> "complete"
==> Evaluation "21c79d55" finished with status "complete"
  1. 查询我们刚才创建job的状态
[root@study01 dtle]# ./usr/bin/nomad job status
ID    Type     Priority  Status   Submit Date
job1  service  50        running  2021-05-07T16:26:21+08:00
  1. 在源端插入数据,在目标端验证数据
  2. 停止job并删除
[root@study01 dtle]# ./usr/bin/nomad job stop -purge job1
==> Monitoring evaluation "dce8f4f3"
    Evaluation triggered by job "job1"
    Evaluation within deployment: "c5aeae1d"
    Evaluation status changed: "pending" -> "complete"
==> Evaluation "dce8f4f3" finished with status "complete"
  • 依然需要手动删除consul记录
[root@study01 dtle]# curl -XDELETE "10.186.65.68:8500/v1/kv/dtle/job1?recurse"
nomad其他命令介绍
  1. 查看server节点
[root@study01 dtle]# ./usr/bin/nomad server members
Name           Address       Port  Status  Leader  Protocol  Build   Datacenter  Region
nomad0.global  10.186.65.68  4648  alive   true    2         0.11.1  dc1         global
  1. 查看client节点
[root@study01 dtle]# ./usr/bin/nomad node status
ID        DC   Name    Class   Drain  Eligibility  Status
e1be240c  dc1  nomad0  <none>  false  eligible     ready
  1. 查看某个job
[root@study01 dtle]# ./usr/bin/nomad job status {jobname}
  1. 查看nomad版本
[root@study01 dtle]# ./usr/bin/nomad version
Nomad v0.11.1 (b43457070037800fcc8442c8ff095ff4005dab33)
  1. 查看某⼀节点的dtle插件版本
[root@study01 dtle]# ./usr/bin/nomad node status -verbose e1be240c | grep dtle
dtle      true      true     Healthy   2021-05-07T14:22:17+08:00
driver.dtle               = 1
driver.dtle.full_version  = 3.21.03.0-3.21.03.x-2df8ad7
driver.dtle.version       = 3.21.03.0
  1. 此时nomad命令作为HTTP客⼾端连接nomad agent, 如果agent不在默认地址,则需要指定 例如:--address=http://127.0.0.1:4646
[root@study01 dtle]# ./usr/bin/nomad node status --address=http://10.186.65.68:4646
ID        DC   Name    Class   Drain  Eligibility  Status
e1be240c  dc1  nomad0  <none>  false  eligible     ready

MySQL汇聚复制

准备两个源端MySQL一个目标端MySQL,先记录GTID,再创建测试库和测试表

  1. src1数据库操作
mysql> show master statusG
Executed_Gtid_Set: 60144e94-aa1c-11eb-9d16-02000aba4144:1-140
mysql> use test;
mysql> create table t2(id int);
mysql> insert into t2 values(1),(2),(3);

mysql> select * from test.t2;
+------+
| id   |
+------+
|    1 |
|    2 |
|    3 |
+------+
  1. src2数据库操作
mysql> show master status G
Executed_Gtid_Set: efc79686-af16-11eb-bbaa-02000aba4147:1-135
mysql> use test;
mysql> create table t2(id int);
mysql> insert into t2 values(4),(5),(6);
  1. 创建src1的配置文件
[root@study01 dtle]# cat job_src1.hcl
job "job1" {
  datacenters = ["dc1"]
 
  group "src" {
    task "src" {
      driver = "dtle"
      config {
        ReplicateDoDb = [{
          TableSchema = "test"
          Tables = [{
            TableName = "t2"
          }]
        }]
        DropTableIfExists = false
        Gtid = "60144e94-aa1c-11eb-9d16-02000aba4144:1-140"
        ChunkSize = 2000
        ConnectionConfig = {
          Host = "10.186.65.68"
          Port = 3333
          User = "test"
          Password = "test"
        }
      }
    }
  }
  group "dest" {
    task "dest" {
      driver = "dtle"
      config {
        ConnectionConfig = {
          Host = "10.186.65.72"
          Port = 4444
          User = "test"
          Password = "test"
        }
 
        # For a kafka job, do not set ConnectionConfig in dest task. Set KafkaConfig instead.
        #KafkaConfig = {
        #  Topic = "kafka1"
        #  Brokers = ["127.0.0.1:9192", "127.0.0.1:9092"]
        #  Converter = "json"
        #}
      }
    }
  }
 
  reschedule {
    # By default, nomad will unlimitedly reschedule a failed task.
    # We limit it to once per 30min here.
    attempts = 1
    interval = "30m"
    unlimited = false
  }
}
  1. 创建src2的配置文件
[root@study01 dtle]# cat job_src2.hcl
job "job2" {
  datacenters = ["dc1"]
 
  group "src" {
    task "src" {
      driver = "dtle"
      config {
        ReplicateDoDb = [{
          TableSchema = "test"
          Tables = [{
            TableName = "t2"
          }]
        }]
        DropTableIfExists = false
        Gtid = "efc79686-af16-11eb-bbaa-02000aba4147:1-135"
        ChunkSize = 2000
        ConnectionConfig = {
          Host = "10.186.65.71"
          Port = 5555
          User = "test"
          Password = "test"
        }
      }
    }
  }
  group "dest" {
    task "dest" {
      driver = "dtle"
      config {
        ConnectionConfig = {
          Host = "10.186.65.72"
          Port = 4444
          User = "test"
          Password = "test"
        }
 
        # For a kafka job, do not set ConnectionConfig in dest task. Set KafkaConfig instead.
        #KafkaConfig = {
        #  Topic = "kafka1"
        #  Brokers = ["127.0.0.1:9192", "127.0.0.1:9092"]
        #  Converter = "json"
        #}
      }
    }
  }
 
  reschedule {
    # By default, nomad will unlimitedly reschedule a failed task.
    # We limit it to once per 30min here.
    attempts = 1
    interval = "30m"
    unlimited = false
  }
}
  • 两个文件不同的地方有,job名称,GTID,源端连接地址
  1. 创建job
[root@study01 dtle]# ./usr/bin/nomad job run job_src1.hcl
==> Monitoring evaluation "28ae70e4"
    Evaluation triggered by job "job1"
    Evaluation within deployment: "95f9a76b"
    Allocation "d07787a4" created: node "e1be240c", group "dest"
    Allocation "f46d9291" created: node "e1be240c", group "src"
    Evaluation status changed: "pending" -> "complete"
==> Evaluation "28ae70e4" finished with status "complete"

[root@study01 dtle]# ./usr/bin/nomad job run job_src2.hcl
==> Monitoring evaluation "1d2da1f7"
    Evaluation triggered by job "job2"
    Evaluation within deployment: "716b11a5"
    Allocation "2dd7fe5c" created: node "e1be240c", group "src"
    Allocation "724ec296" created: node "e1be240c", group "dest"
    Evaluation status changed: "pending" -> "complete"
==> Evaluation "1d2da1f7" finished with status "complete"
  1. 目标端数据库验证数据
mysql> select * from test.t2;
+------+
| id   |
+------+
|    1 |
|    2 |
|    3 |
|    4 |
|    5 |
|    6 |
+------+
  1. 在任意一个源端写入数据,然后到目标端查看是否增量数据
mysql> insert into t2 values (7);
Query OK, 1 row affected (0.01 sec)

mysql> select * from test.t2;
+------+
| id   |
+------+
|    1 |
|    2 |
|    3 |
|    7 |
+------+
4 rows in set (0.01 sec)
  1. 验证目标端数据库的数据
mysql> select * from test.t2;
+------+
| id   |
+------+
|    1 |
|    2 |
|    3 |
|    4 |
|    5 |
|    6 |
|    7 |
+------+
7 rows in set (0.01 sec)

MySQL分散复制

准备三台MySQL实例,一台源端,两台目标端。根据条件,分别复制到不同的目标端数据库

  1. 首先记录下源端数据库当前的GTID
mysql> show master status G
Executed_Gtid_Set: 60144e94-aa1c-11eb-9d16-02000aba4144:1-143
  1. 创建测试表,写入一些数据
mysql> use test;
mysql> create table t3(id int);
mysql> insert into t3 values(1),(2);
  1. 创建dst1配置文件
[root@study01 dtle]# cat job_dst1.hcl
job "dst1" {
  datacenters = ["dc1"]
 
  group "src" {
    task "src" {
      driver = "dtle"
      config {
        ReplicateDoDb = [{
          TableSchema = "test"
          Tables = [{
            TableName = "t3"
            Where = "id<10"
          }]
        }]
        DropTableIfExists = false
        Gtid = "60144e94-aa1c-11eb-9d16-02000aba4144:1-143"
        ChunkSize = 2000
        ConnectionConfig = {
          Host = "10.186.65.68"
          Port = 3333
          User = "test"
          Password = "test"
        }
      }
    }
  }
  group "dest" {
    task "dest" {
      driver = "dtle"
      config {
        ConnectionConfig = {
          Host = "10.186.65.71"
          Port = 5555
          User = "test"
          Password = "test"
        }
 
        # For a kafka job, do not set ConnectionConfig in dest task. Set KafkaConfig instead.
        #KafkaConfig = {
        #  Topic = "kafka1"
        #  Brokers = ["127.0.0.1:9192", "127.0.0.1:9092"]
        #  Converter = "json"
        #}
      }
    }
  }
 
  reschedule {
    # By default, nomad will unlimitedly reschedule a failed task.
    # We limit it to once per 30min here.
    attempts = 1
    interval = "30m"
    unlimited = false
  }
}
  1. 创建dst2配置文件

两个配置文件不同的地方有,job名字,where条件,以及目标端的连接信息

  1. 创建job
[root@study01 dtle]# ./usr/bin/nomad job run job_dst1.hcl
[root@study01 dtle]# ./usr/bin/nomad job run job_dst2.hcl

[root@study01 dtle]# ./usr/bin/nomad status
ID    Type     Priority  Status   Submit Date
dst1  service  50        running  2021-05-07T20:13:05+08:00
dst2  service  50        running  2021-05-07T20:13:10+08:00
  1. 验证数据,小于10的会在71这台数据库,大于等于10的会在72这台数据库

  2. 在源端插入数据,然后分别到目标端进行查看

mysql> insert into t3 values(9),(10);
mysql> select * from test.t3;
+------+
| id   |
+------+
|    1 |
|    2 |
|    9 |
|   10 |
+------+
  1. 到71上查看数据
mysql> select * from test.t3;
+------+
| id   |
+------+
|    1 |
|    2 |
|    9 |
+------+
  1. 到72上查看数据
mysql> select * from test.t3;
+------+
| id   |
+------+
|   10 |
+------+

因为有悔,所以披星戴月;因为有梦,所以奋不顾身! 个人博客首发:easydb.net 微信公众号:easydb 关注我,不走丢!

因为有悔,所以披星戴月;因为有梦,所以奋不顾身! 个人博客首发:easydb.net 微信公众号:easydb 关注我,不走丢!
原文地址:https://www.cnblogs.com/easydb/p/14742583.html