canal与kafka的结合使用

centos7中安装zk: https://www.cnblogs.com/tdyang/p/13895839.html

centos7中安装kafka:  https://www.cnblogs.com/tdyang/p/13898004.html

第一步、下载安装canal

本安装目录:/usr/local/soft,创建一个canal目录,版本1.1.4

cd /usr/local/soft/
mkdir canal
cd canal

 下载解压,这个路径可能会有所变化,canal的github地址:https://github.com/alibaba/canal,如果下载速度慢,建议用迅雷下载

迅雷下载这些文件速度还是不错

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
tar -zxvf canal.deployer-1.1.4.tar.gz

第二步,修改canal的配置文件

1、conf下的canal.properties文件,修改以下两项:建议用xftp连接打开文件,快速找到需要修改的配置。

canal.serverMode=kafka

kafka的服务器地址:

canal.mq.servers = 192.168.146.102:9092

2、修改example/instance.properties,mysql的数据库连接地址

canal.instance.master.address=192.168.146.102:3306

 canal的用户和密码,这里在mysql数据库里面设置

canal.instance.dbUsername=canal
canal.instance.dbPassword=123456

# 接着上面,新增一个配置
canal.instance.defaultDatabaseName=canaltest
# 这个topic会自动创建
canal.mq.topic=canal-topic

 mq的配置中还有其他的一些设置可自己根据需要设置

通过以上配置,还需要开启的mysql的binlog配置

第三步:修改mysql的配置文件

log-bin=/var/lib/mysql/mysql-bin
binlog-format=ROW

 登录数据库以后,查看是否开启了binlog, ON表示开启了

show variables like 'log_%';

设置和上面instance.properties配置文件中canal的用户名和密码:

-- 创建canal用户
CREATE USER canal IDENTIFIED BY '123456';

-- 给canal用户分配查询和复制的权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO canal@'%';

刷新权限:

FLUSH PRIVILEGES;

创建测试数据库:

ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY '123456';

CREATE DATABASE `canaltest` CHARSET `utf8mb4` COLLATE `utf8mb4_unicode_ci`;

第四步:启动各个服务

1、启动zk,在zk的bin目录下

./zkServer.sh start

2、启动kafka,在kafka的安装目录下,后台启动

nohup ./bin/kafka-server-start.sh ./config/server.properties & 

启动kafka消费端:这个topic和上面example/instance.properties配置文件里面一致

./kafka-console-consumer.sh --bootstrap-server 192.168.146.102:9092 --topic canal-topic

3、启动canal,在bin目录下

sh startup.sh 

4、修改测试数据库数据,看kafka消费端是否收到数据

测试:在一张dept表中加入一条数据

 kafka消费端收到消息:

 这样一个简单的测试就完成了

原文地址:https://www.cnblogs.com/tdyang/p/13913539.html