通过canal实现把MySQL数据实时增量到kafka

说明:我们有一个业务需要把mysql中一些表实时同步到大数据集群hbase上面,我们先通过sqoop把表中数据全量导入到hbase中,然后再通过canal定位的某个binlog的position,来实现增量同步,canal官网提供了java/go接口,直接写入到Kafka,然后通过sparkstreaming实时写入到hbase中

一. 通过sqoop把mysql表中的数据全量导入到hbase中(需要安装sqoop)

sqoop import 
--connect jdbc:mysql://ip:port/database 
--username username 
--password password 
--table user_info 
--hbase-create-table 
--hbase-table user_info 
--hbase-row-key id 
--column-family order_info

二. 精确定位到binlog位点,进行启动

1. 查看当前数据库的binlog日志,在数据库中通过show binary logs 查看

mysql> show binary logs;
+------------------+-----------+
| Log_name         | File_size |
+------------------+-----------+
| mysql-bin.001112 |    375374 |
| mysql-bin.001113 |    366569 |
| mysql-bin.001114 |    360112 |
| mysql-bin.001115 |    101198 

2. 查看当前binlog日志的position(一般最大的binlog以及最大的position)

show binlog events in 'mysql-bin.001115';

3. 需要先重启canal服务(如果之前正运行canal)

cd /usr/local/canal/bin && ./stop.sh && ./startup.sh

4. 修改zookeeper对应destination里面的配置

说明:我这边是在配置文件里面配置了zookeeper,如果没有配置,则会在你相应的destination目录下面生成meta.dat文件,也只需修改到对应的binlog和position即可

1)连接zookeeper

./zkCli.sh -server zk-address:2181

2)查看对应destination的配置(其中test为destination的名称)

(CONNECTED) 2] get /otter/canal/destinations/test/1001/cursor
"journalName":"mysqlbin.002908"
"position":198601951

3)把上面配置中journalName和position修改为自己需要的binlog日志和偏移量

 (CONNECTED) 3]set /otter/canal/destinations/d_aura_jike/1001/cursor {xxx}

5. 修改一下对应destination的配置文件(主要是触发使其生效)

vim /usr/local/canal/conf/test/instance.properties

6. 通过canal提供的java/go接口,来测试数据的同步(官网有例子https://github.com/alibaba/canal)

原文地址:https://www.cnblogs.com/654wangzai321/p/10655257.html