debezium 数据变更工具使用

1.  作用
简单概述就是CDC(change data capture),实时数据分析领域用的比较多
 
2. 简单使用(基于官网的docker 说明)
 备注: 测试没有使用守护进程模式为了方便测试
a. zookeeper
docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:0.6
 
b. kafka
docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:0.6
 
c. mysql 
docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.6
 
d. mysql-client
docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
 
查看表信息
use inventory;
show tables;
SELECT * FROM customers;
 
e. kafka connect
docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:0.6
 
 进行connect 信息查看
curl -H "Accept:application/json" localhost:8083/
curl -H "Accept:application/json" localhost:8083/connectors/
 
f. 监控MySQL 数据库变更
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'

返回信息如下:
{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "database.server.name": "dbserver1",
    "database.whitelist": "inventory",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.inventory"
  }
}
查看注册的connect
 curl -H "Accept:application/json" localhost:8083/connectors/
 ["inventory-connector"]
g. 查看数据变更
docker run -it --name watcher --rm --link zookeeper:zookeeper debezium/kafka:0.6 watch-topic -a -k dbserver1.inventory.customers

应该会看到包含下面的信息:
{
  "schema": {
    "type": "struct",
    "name": "dbserver1.inventory.customers.Key"
    "optional": false,
    "fields": [
      {
        "field": "id",
        "type": "int32",
        "optional": false
      }
    ]
  },
  "payload": {
    "id": 1004
  }
}

进行数据变更
UPDATE customers SET first_name='Anne Marie' WHERE id=1004;
应该会看到下面的信息
{
  "schema": {
    "type": "struct",
    "name": "dbserver1.inventory.customers.Key"
    "optional": false,
    "fields": [
      {
        "field": "id",
        "type": "int32",
        "optional": false
      }
    ]
  },
  "payload": {
    "id": 1004
  }
}

删除、添加的信息类似,同时对于监控数据变更的服务,在停止之后,重新启动数据还是可以同步过来的
 
 
3. 说明
大数据,微服务应用开发、单体应用向微服务迁移的时候使用起来可以减少好多开发的工作量
4. 参考资料
http://debezium.io/docs/tutorial/
遗留系统重建实践
Migrating_to_Microservices_Databases_Red_Hat.pdf (https://developers.redhat.com/promotions/migrating-to-microservice-databases/)
 
原文地址:https://www.cnblogs.com/rongfengliang/p/7616822.html