OB入门实践4迁移 MySQL 数据到 OceanBase 集群

实践练习四(必选):迁移 MySQL 数据到 OceanBase 集群

历史实践题回顾

  1. 实践1:OceanBase Docker 体验
  2. 实践2:手动部署 OceanBase 集群
  3. 实践3:使用OBD 部署一个 三副本OceanBase 集群

练习目的

本次练习目的掌握从 MySQL 向 OceanBase 迁移数据的基本方法:mysqldump、datax 、canal 等。

练习内容

请记录并分享下列内容:

  • (必选)使用 mysqldump 将 mysql的表结构和数据同步到 OceanBase 的MySQL 租户中。
  • (必选)使用 datax 配置至少一个表的 MySQL 到 OceanBase 的 MySQL 租户的离线同步。
  • (可选)使用 datax 配置至少一个表的 OceanBase 到 CSV 以及 CSV 到 OceanBase 的离线同步。
  • (可选)使用 canal 配置 MySQL 的增量同步到 OceanBase 的 MySQL 租户。

具体实现

准备实验数据

unzip -qo tpcc-mysql-master.zip
cd tpcc-mysql-master/src
make

image-20211224202119904

生成tpcc_load和 tpcc_start 二进制文件

image-20211224202223302

将数据导入到源端(MySQL)
创建tpcc数据库
CREATE DATABASE `tpcc` DEFAULT CHARACTER SET utf8;
导入表对象
# grep -ci 'create table' create_table.sql
mycli tpcc <./create_table.sql
mycli tpcc -e "show tables;"

image-20211224202712275

加载批量数据(tpcc_load)
./tpcc_load -h progs -P 3308 -d tpcc -u root -w 1 
## 用法
]# ./tpcc_load -h
*************************************
*** TPCC-mysql Data Loader        ***
*************************************
./tpcc_load: option requires an argument -- 'h'
Usage: tpcc_load -h server_host -P port -d database_name -u mysql_user -p mysql_password -w warehouses -l part -m min_wh -n max_wh
* [part]: 1=ITEMS 2=WAREHOUSE 3=CUSTOMER 4=ORDERS

image-20211224203738299

  • 查看表记录数

    SELECT TABLE_NAME,DATA_LENGTH,INDEX_LENGTH,(DATA_LENGTH+INDEX_LENGTH) as length,TABLE_ROWS,concat(round((DATA_LENGTH+INDEX_LENGTH)/1024/1024,2), 'MB') as total_size 
    FROM information_schema.TABLES 
    WHERE TABLE_SCHEMA='tpcc' 
    order by length desc
    ;
    

    image-20211224203934357

使用 mysqldump 将 mysql的表结构和数据同步到 OceanBase 的MySQL 租户

仅导出表结构(MySQL源端)
mysqldump -h progs -uroot -proot -P3308 -d tpcc --set-gtid-purged=OFF --compact > tpcc_ddl.sql

image-20211224204918095

检查文件中是否存在特殊语法|变量等
grep -Ei "SQL_NOTES|DEFINER|MAX_ROWS" tpcc_ddl.sql
仅导出表数据(MySQL源端)
mysqldump -h progs -uroot -P3308 --single-transaction --set-gtid-purged=OFF -t tpcc  > tpcc_data.sql

image-20211224205231804

数据同步到Oceanbase MySQL租户
obclient连接到MySQL租户中并创建数据库tpcc
obclient -hprogs -uroot@pay_mysql_tat#obce -P2883 -c -A 

create database tpcc;

image-20211224205826865

导入表对象
obclient -hprogs -uroot@pay_mysql_tat#obce -P2883 -c -A tpcc

MySQL [tpcc]> source /tmp/tpcc_ddl.sql

image-20211224210303638

导入表数据
-- 先禁用外键约束
MySQL [tpcc]> set global foreign_key_checks=off;
MySQL [tpcc]> show global variables like '%foreign%';

MySQL [tpcc]> source /tmp/tpcc_data.sql

image-20211224211926595

检查两端数据量

image-20211224212138934

使用datax (离线)从 MySQL 同步表数据到 OceanBase

使用 datax 配置至少一个表的 MySQL 到 OceanBase 的 MySQL 租户的离线同步

部署datax软件
# 下载 datax 
wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

# 解压
tar -xf datax.tar.gz
cd datax

# 删除datax中的隐藏文件
find ./datax/plugin -name ".*" | xargs rm -f

image-20211224121201514

创建同步作业配置文件
生成模板文件
python ./datax/bin/datax.py -r mysqlreader -w oceanbasev10writer

image-20211224213400376

修改同步作业文件
# 生成模板文件
python ./datax/bin/datax.py -r mysqlreader -w oceanbasev10writer > jog/my2ob.json

# 根据实际环境修改
vi job/my2obce.json
  • 修改完成作业json文件

    {    
    "job": {
            "setting": {
                "speed": {
                    "channel": 2,
                },
                "errorLimit": {
                    "record": 10
                }
            },
            "content": [
                {
                    "reader": {
                        "name": "mysqlreader",
                        "parameter": {
                            "username": "root",
                            "password": "root",
                            "splitPk": "no_o_id",
                            "column": [
                                "*"
                            ],
                            "connection": [
                                {
                                    "jdbcUrl": [
                                        "jdbc:mysql://progs:3308/tpcc"
                                    ],
                                    "table": [
                                        "new_orders"
                                    ]
                                }
                            ]
                        }
                    },
                    "writer": {
                        "name": "oceanbasev10writer",
                        "parameter": {
                            "writeMode": "insert",
                            "username": "root",
                            "password": "root",
                            "writerThreadCount": 5,
                            "column": [
                                "*"
                            ],
                            "connection": [
                               {
                                    "jdbcUrl": "||_dsc_ob10_dsc_||obce:pay_mysql_tat||_dsc_ob10_dsc_||jdbc:mysql://progs:2883/tpcc?useUnicode=true&characterEncoding=utf-8",
                                    "table": [
                                        "new_orders"
                                    ]
                                }
                            ]
                        }
                    }
                }
            ]
        }
    }
    
    
启动同步作业
python ./bin/datax.py ./job/my2obce.json

image-20211224215000269

检查确认同步情况

image-20211224214924003

使用 DataX 实现 CSV 与 OceanBase 间的数据离线同步

实现 OceanBase 到 CSV 同步
编辑作业配置文件
  • 生成模板
python ${DATAX_HOME}/bin/datax.py -r oceanbasev10writer -w txtfilereader > ${DATAX_HOME}/job/ob2csv.json
  • 根据实际环境修改对应值

vi ${DATAX_HOME}/job/ob2csv.json

{
    "job":{
        "setting":{
            "speed":{
                "channel":10
            },
            "errorLimit":{
                "record":0, "percentage": 0.02
            }
        },
        "content":[
            {
                "reader":{
                    "name":"oceanbasev10reader",
                    "parameter":{
                        "where":"",
                        "timeout":10000,
                        "readBatchSize":10000,
                        "readByPartition":"true",
                        "column": [
                            "actor_id",
                            "first_name",
                            "last_name",
                            "last_update"
                        ],
                        "connection":[
                            {
                                "jdbcUrl":["||_dsc_ob10_dsc_||obce:dev_mysql_tnt||_dsc_ob10_dsc_||jdbc:oceanbase://progs:2883/devdb?useUnicode=true&characterEncoding=utf-8"],
                                "table":["actor"]
                            }
                        ],
                        "username": "syncer",
                        "password": "Sy#r3210"
                    }
                },
                "writer":{
                    "name": "txtfilewriter",
                    "parameter": {
                        "path": "/tmp/datax/",
                        "fileName": "actor",
                        "writeMode": "truncate",
                        "dateFormat": "yyyy-MM-dd hh:mm:ss",
                        "charset": "UTF-8",
                        "nullFormat": "\\N",
                        "fileDelimiter": ","
                    }
                }
            }
        ]
    }
}
启动同步作业
python ${DATAX_HOME}/bin/datax.py ${DATAX_HOME}/job/ob2csv.json

image-20211225113117106

image-20211225113240182

image-20211225113305200

检查确认

image-20211225113346002

实现 CSV 到 OceanBase 同步
编辑作业配置文件
  • 生成模板
python ${DATAX_HOME}/bin/datax.py -r txtfilereader -w oceanbasev10writer > ${DATAX_HOME}/job/csv2ob.json
  • 根据实际环境修改对应值

vi ${DATAX_HOME}/job/csv2ob.json

{
    "job": {
        "setting": {
            "speed": {
                "channel": 4
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "txtfilereader",
                    "parameter": {
                        "path": ["/tmp/datax"],
                        "fileName": "sakila_actor",
                        "encoding": "UTF-8",
                        "column": ["*"],
                        "dateFormat": "yyyy-MM-dd hh:mm:ss" ,
                        "nullFormat": "\\N" ,
                        "fieldDelimiter": ","
                    }
                },
                "writer": {
                    "name": "oceanbasev10writer",
                    "parameter": {
                        "obWriteMode": "insert",
                        "column": [
                            "actor_id",
                            "first_name",
                            "last_name",
                            "last_update"
                        ],
                        "preSql": [
                            "truncate table actor"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "||_dsc_ob10_dsc_||obce:dev_mysql_tnt||_dsc_ob10_dsc_||jdbc:oceanbase://progs:2883/devdb?useLocalSessionState=true&allowBatch=true&allowMultiQueries=true&rewriteBatchedStatements=true",
                                "table": [
                                    "actor"
                                ]
                            }
                        ],
                        "username": "syncer",
                        "password": "Sy#r3210",
                        "writerThreadCount": 10,
                        "batchSize": 100,
                        "memstoreThreshold": "0.9"
                    }
                }
            }
        ]
    }
}

注意:

  • path: 指定的CSV等文件目录仅存储待导入的文件,其它文件尽量删除避免受到干扰导致作业失败
  • fileName :文件前缀
  • fieldDelimiter :指定 csv 文件的列分隔符
启动同步作业
python ${DATAX_HOME}/bin/datax.py ${DATAX_HOME}/job/csv2ob.json

image-20211225111656046

image-20211225111635392

检查
select * from actor limit 20;
select count(*) cnt from actor;

image-20211225111845436

使用 canal 配置 MySQL 的增量同步到 OceanBase 的 MySQL 租户

部署 server
解压软件
mkdir deployer && tar -xf canal.deployer-for-ob-rc2.tar.gz -C deployer
编辑配置文件
  • 配置canal-server 主配置文件:vi conf/canal.properties

    #################################################
    ######### 		common argument		#############
    #################################################
    # tcp bind ip
    canal.ip = 192.168.10.181
    # register ip to zookeeper
    canal.register.ip = 192.168.10.181 
    # 运行canal-server服务的主机IP
    canal.port = 11111                 
    # canal-server监听的端口(TCP模式下,非TCP模式不监听1111端口)
    canal.metrics.pull.port = 11112    
    # canal-server metrics.pull监听的端口
    # canal instance user/passwd
    # canal.user = canal
    # canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
    
    # canal admin config
    #canal.admin.manager = 127.0.0.1:8089
    canal.admin.port = 11110
    canal.admin.user = admin
    canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
    # admin auto register
    #canal.admin.register.auto = true
    #canal.admin.register.cluster =
    #canal.admin.register.name =
    
    canal.zkServers =     
    # canal server链接zookeeper集群的链接信息,集群模式下要配置zookeeper进行协调配置,单机模式可以不用配置
    # flush data to zk
    canal.zookeeper.flush.period = 1000    
    # canal持久化数据到zookeeper上的更新频率,单位毫秒
    canal.withoutNetty = false
    # tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
    canal.serverMode = tcp                  
    # canal-server运行的模式,TCP模式就是直连客户端,不经过中间件。kafka和mq是消息队列的模式
    # flush meta cursor/parse position to file
    canal.file.data.dir = ${canal.conf.dir}  
    # 存放数据的路径
    canal.file.flush.period = 1000
    ## memory store RingBuffer size, should be Math.pow(2,n)
    canal.instance.memory.buffer.size = 1024
    ## memory store RingBuffer used memory unit size , default 1kb
    canal.instance.memory.buffer.memunit = 1024 
    ## meory store gets mode used MEMSIZE or ITEMSIZE
    canal.instance.memory.batch.mode = MEMSIZE
    canal.instance.memory.rawEntry = true
    
    ## detecing config     # 心跳检查的配置,做HA时会用到
    canal.instance.detecting.enable = false
    #canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
    canal.instance.detecting.sql = select 1
    canal.instance.detecting.interval.time = 3
    canal.instance.detecting.retry.threshold = 3
    canal.instance.detecting.heartbeatHaEnable = false
    
    # support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
    canal.instance.transaction.size =  1024
    # mysql fallback connected to new master should fallback times
    canal.instance.fallbackIntervalInSeconds = 60
    
    # network config
    canal.instance.network.receiveBufferSize = 16384
    canal.instance.network.sendBufferSize = 16384
    canal.instance.network.soTimeout = 30
    
    # binlog filter config   # binlog过滤的配置,指定过滤那些SQL
    canal.instance.filter.druid.ddl = true
    canal.instance.filter.query.dcl = false  
    # 是否忽略DCL的query语句,比如grant/create user等,默认false
    canal.instance.filter.query.dml = false 
    # 是否忽略DML的query语句,比如insert/update/delete table
    canal.instance.filter.query.ddl = false  
    # 是否忽略DDL的query语句,比如create table/alater table/drop table/rename table/create index/drop index
    canal.instance.filter.table.error = false
    canal.instance.filter.rows = false
    canal.instance.filter.transaction.entry = false
    canal.instance.filter.dml.insert = false
    canal.instance.filter.dml.update = false
    canal.instance.filter.dml.delete = false
    
    # binlog format/image check   # binlog格式检测,使用ROW模式,非ROW模式也不会报错,但是同步不到数据
    canal.instance.binlog.format = ROW,STATEMENT,MIXED 
    canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
    
    # binlog ddl isolation
    canal.instance.get.ddl.isolation = false
    
    # parallel parser config
    canal.instance.parser.parallel = true   
    # 并行解析配置,若单CPU,需改为 false
    ## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
    #canal.instance.parser.parallelThreadSize = 16
    ## disruptor ringbuffer size, must be power of 2
    canal.instance.parser.parallelBufferSize = 256
    
    # table meta tsdb info
    canal.instance.tsdb.enable = true
    canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
    canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
    canal.instance.tsdb.dbUsername = canal
    canal.instance.tsdb.dbPassword = canal
    # dump snapshot interval, default 24 hour
    canal.instance.tsdb.snapshot.interval = 24
    # purge snapshot expire , default 360 hour(15 days)
    canal.instance.tsdb.snapshot.expire = 360
    
    #################################################
    ######### 		destinations		#############
    #################################################
    canal.destinations = example  
    # canal-server创建的实例名称
    # conf root dir
    canal.conf.dir = ../conf
    # auto scan instance dir add/remove and start/stop instance
    canal.auto.scan = true
    canal.auto.scan.interval = 5
    # set this value to 'true' means that when binlog pos not found, skip to latest.
    # WARN: pls keep 'false' in production env, or if you know what you want.
    canal.auto.reset.latest.pos.mode = false
    
    canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
    #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
    
    canal.instance.global.mode = spring
    canal.instance.global.lazy = false
    canal.instance.global.manager.address = ${canal.admin.manager}
    #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
    canal.instance.global.spring.xml = classpath:spring/file-instance.xml
    #canal.instance.global.spring.xml = classpath:spring/default-instance.xml
    
    ##################################################
    ######### 	      MQ Properties      #############
    ##################################################
    # aliyun ak/sk , support rds/mq
    canal.aliyun.accessKey =
    canal.aliyun.secretKey =
    canal.aliyun.uid=
    
    canal.mq.flatMessage = true
    canal.mq.canalBatchSize = 50
    canal.mq.canalGetTimeout = 100
    # Set this value to "cloud", if you want open message trace feature in aliyun.
    canal.mq.accessChannel = local
    
    canal.mq.database.hash = true
    canal.mq.send.thread.size = 30
    canal.mq.build.thread.size = 8
    
    ##################################################
    ######### 		     Kafka 		     #############
    ##################################################
    kafka.bootstrap.servers = 127.0.0.1:9092
    kafka.acks = all
    kafka.compression.type = none
    kafka.batch.size = 16384
    kafka.linger.ms = 1
    kafka.max.request.size = 1048576
    kafka.buffer.memory = 33554432
    kafka.max.in.flight.requests.per.connection = 1
    kafka.retries = 0
    
    kafka.kerberos.enable = false
    kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
    kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"
    
    ##################################################
    ######### 		    RocketMQ	     #############
    ##################################################
    rocketmq.producer.group = test
    rocketmq.enable.message.trace = false
    rocketmq.customized.trace.topic =
    rocketmq.namespace =
    rocketmq.namesrv.addr = 127.0.0.1:9876
    rocketmq.retry.times.when.send.failed = 0
    rocketmq.vip.channel.enabled = false
    rocketmq.tag = 
    
    ##################################################
    ######### 		    RabbitMQ	     #############
    ##################################################
    rabbitmq.host =
    rabbitmq.virtual.host =
    rabbitmq.exchange =
    rabbitmq.username =
    rabbitmq.password =
    rabbitmq.deliveryMode =
    
    
    ##################################################
    ######### 		      Pulsar         #############
    ##################################################
    pulsarmq.serverUrl =
    pulsarmq.roleToken =
    pulsarmq.topicTenantPrefix = 
    

    若文件中canal.destinations指定了多个实例名称,需要在./deployer/conf 同级目录下创建对应的实例,按下面步骤进行

    • 示例:

      grep 'canal.destinations' ./deployer/conf/canal.properties
      canal.destinations = example, order
      
      • 创建对应实例目录

        cd ./deployer/conf
        cp -a example order
        

        image-20211225170125443

  • 配置实例配置文件(以example实例举例说明)

以example实例配置进行说明,其它实例参照修改

vi conf/example/instance.properties

#################################################
## mysql serverId , v1.0.26+ will autoGen
canal.instance.mysql.slaveId=2000

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=192.168.10.181:3308  
# 指定要读取binlog的源端MySQL的IP地址和端口
canal.instance.master.journal.name=                
# 指定开始读取数据的binlog文件名称
canal.instance.master.position=                    
# 指定偏移量
canal.instance.master.timestamp=                   
# 指定起始的binlog的时间戳
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=syncer         
# 指定连接mysql的用户密码
canal.instance.dbPassword=Sy#r3210
canal.instance.connectionCharset = UTF-8 
# 字符集
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=devdb\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################

注意:

  • 上面的中文注释说明只是为了文档记录说明作用,运行环境中需要删除
启动服务
./bin/startup.sh
检查服务状态
# 查看 Server 日志
tail -100f logs/canal/canal.log
tail -100f logs/canal/canal_stdout.log

# 查看 instance 的日志
tail -10f logs/example/example.log

image-20211225225112300

部署adapter
解压软件
mkdir adapter && tar -xf canal.adapter-for-ob-rc2.tar.gz -C adapter

image-20211225144931617

配置adapter
  • 修改启动器配置:vi conf/application.yml

    server:
      port: 8081
    spring:
      jackson:
        date-format: yyyy-MM-dd HH:mm:ss
        time-zone: GMT+8
        default-property-inclusion: non_null
    
    canal.conf:
      mode: tcp #tcp kafka rocketMQ rabbitMQ
      flatMessage: true
      zookeeperHosts:
      syncBatchSize: 1000
      retries: 0
      timeout:
      accessKey:
      secretKey:
      consumerProperties:
        # canal tcp consumer
        # 指定canal-server的地址和端口
        canal.tcp.server.host: 192.168.10.181:11111
        canal.tcp.zookeeper.hosts:
        canal.tcp.batch.size: 500
        canal.tcp.username:
        canal.tcp.password:
    
      canalAdapters:
      - instance: example # canal instance Name or mq topic name
        groups:
        - groupId: g1
          outerAdapters:
          - name: logger
          - name: rdb
            key: obmysql 
            properties:
              jdbc.driverClassName: com.mysql.jdbc.Driver
              jdbc.url: jdbc:mysql://192.168.10.181:2883/devdb?useUnicode=true
              jdbc.username: syncer@dev_mysql_tnt#obce
              jdbc.password: Sy#r3210
    
  • 修改RDB 映射文件:conf/rdb/mytest_user.yml

    dataSourceKey: defaultDS
    destination: example 
    groupId: g1
    outerAdapterKey: obmysql
    concurrent: true
    dbMapping:
      mirrorDb: true
      database: devdb
    
    • destination: 指定的是 canal instance 名称
    • outerAdapterKey: 指定的是application.yml文件中的key
启动服务
cd adapter && ./bin/startup.sh

image-20211225180155774

检查
# 检查日志
tail -100f logs/adapter/adapter.log

image-20211225225354611

验证增量数据的同步情况
上游MySQL(master)端进行DML 和 DDL变更
  • DDL 测试
create table t2(id int primary key, name varchar(32)) partition by hash(id);
  • DML 测试
insert into t2 values(1, 'go'),(2, 'python'),(3, 'C++');
commit;

update t2 set name='C' where id = 3;
commit;

delete from t2 where id = 2;
检查验证目标端数据同步情况
select * from t2;

image-20211225224521170

  • adapter 日志

    image-20211225225621660

附录

参考资料

原文地址:https://www.cnblogs.com/binliubiao/p/15729416.html