数据收集之binlog同步 Maxwell --->Kafka

配置MySQL

MySQL 开启Binlog

 1 #开启binlog
 2 #修改my.cnf配置文件 增加如下内容
 3 [root@node2 /root]# vim /etc/my.cnf
 4 
 5 [mysqld]
 6 #binlog文件保存目录及binlog文件名前缀
 7 #binlog文件保存目录: /var/lib/mysql/
 8 #binlog文件名前缀: mysql-binlog
 9 #mysql向文件名前缀添加数字后缀来按顺序创建二进制日志文件 如mysql-binlog.000006 mysql-binlog.000007
10 log-bin=/var/lib/mysql/mysql-binlog
11 #选择基于行的日志记录方式
12 binlog-format=ROW
13 #服务器 id
14 #binlog数据中包含server_id,标识该数据是由那个server同步过来的
15 server_id=1

MySQL 配置权限

 1 --创建Maxwell的用户
 2 mysql>CREATE USER 'maxwell_sync'@'%' IDENTIFIED BY 'maxwell_sync_1';
 3 -- Maxwell需要在待同步的库上建立schema_database库,将状态存储在`schema_database`选项指定的数据库中(默认为`maxwell`)
 4 
 5 --授权
 6 mysql>GRANT ALL on maxwell.* to 'maxwell_sync'@'192.168.197.130';
 7 --解释:grant all指的是授权所有操作权限(增删改查),*.*指的是所有数据库,maxwell指的是用户名,maxwell_sync是密码,192.168.197.130指的是所要授权的远程IP地址
 8 
 9 
10 mysql>GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'maxwell_sync'@'%';
11 
12 mysql>FLUSH PRIVILEGES;  --刷新,使修改生效

Maxwell

下载解压 

[root@node2 /data/software]# wget https://github.com/zendesk/maxwell/releases/download/v1.17.1/maxwell-1.17.1.tar.gz

[root@node2 /data/software]# tar -zxvf maxwell-1.17.1.tar.gz

maxwell配置并同步至kafka  

    将maxwell工具下载到linux机器上,主要是配置config.properties文件,重要的配置参考如下:
    
log_level=info   
host=
user=
password=
port=
jdbc_options=autoReconnect=true  // mysql 超时重连
schema_database=  // 用于在mysql中新建一个binlog相关的数据库实例
producer=kafka
kafka.bootstrap.servers=
kafka_topic=
kafka.compression.type=snappy
kafka.retries=1
kafka.acks=1
kinesis_stream=maxwell
include_dbs=  // 需要处理的数据库实例
include_tables= // 需要处理的表格,用逗号分隔
kafka_version=0.9.0.1
client_id= // 标识符,可以包含英文
replica_server_id= // 只能是数字
expire_logs_days=0 //防止binlog断,maxwell失败
启动maxwell   配置文件
     nohup  bin/maxwell --config config.properties --log_level DEBUG &
 
命令行启动maxwell
#输出到kafka
bin/maxwell --user='maxwell_sync' --password='maxwell_sync_1' --host='localhost'  --port=3306  --producer=kafka --kafka.bootstrap.servers=localhost:9092 --kafka_version=0.9 --kafka_topic=rotopic

#抽取多个库可添加include_dbs,逗号分隔
#指定数据表 include_tables ,逗号分隔
#maxwell模拟mysql slave,所以多个maxwell进程时,每个进程的client.id及replica_server_id保证不同
#binlog如果断了,可能会maxwell失败,最好设置mysql的expire_logs_days=0

#输出到控制台用如下配置
bin/maxwell --user='maxwell_sync' --password='maxwell_sync_1' --host='localhost'  --port=3306  --producer=stdout
kafka

启动kafka

#开启kafka消费者
kafka-console-consumer.sh --zookeeper localhost:2181 --topic rotopic --from-beginning 

改变数据库内容可看到如下结果:

问题描述:

1com.google.code.or.net.TransportException: Access denied; you need the REPLICATION SLAVE privilege for this operation  ##用户权限问题导致;

原文地址:https://www.cnblogs.com/GuoSamael/p/9772392.html