C# 使用Canal监控Mysql

一、canal安装与配置

1、电脑中首先需要下载安装canal,可以去阿里的github上面下载(更多版本选择),也可以从下面的地址进行提取

链接:https://pan.baidu.com/s/1oysSnGP-e_Zw6eEk9NT8rg
提取码:j33k

canal支持多种语言使用。

 2、下载完成,将它解压,如:

  

3、 修改D:Toolsalibabacanalconfexample下的 instance.properties(刚解压出的example文件夹中,可以将除了instance.properties文件以外的文件全部删除)

修改instance.properties文件的相关mysql数据库配置

4、完成配置以后,进入bin文件夹,windows双击startup.bat文件(注意,canal需要java运行环境,如果电脑没有java环境的,可以只配置一个jre)

 

  如上图显示,canal就正常运行了。接下来就只需要开启mysql的binlog日志,在自己的程序中使用canal即可

二、对 mysql的操作

1、查看binlog是否启用,在mysql中执行 show variables like '%log_bin%';
2、电脑,右键管理,打开服务,找到mysql的my.ini配置文件位置,添加
server_id=1918
log_bin = mysql-bin
#binlog_format="ROW"
binlog_format="MIXED" #开启MIXED模式
#binlog_format="STATEMENT"

3、重启mysql服务就可以启用binlog。开启MIXED模式才可看见执行的语句

------三种模式详细解释:
互联网公司使用MySQL的功能较少(不用存储过程、触发器、函数),选择默认的Statement level

用到MySQL的特殊功能(存储过程、触发器、函数)则选择Mixed模式

用到MySQL的特殊功能(存储过程、触发器、函数),又希望数据最大化一直则选择Row模式

 完成对mysql的配置,可以在mysql的安装目录下的data文件夹中看到000001文件,就是binlog日志文件。如:

三、C#代码

阿里的github上面也有详细的说明。

1、首先需要用到canal的包,去nuget下载CanalSharp.Client

2、在代码中添加引用

using CanalSharp.Client.Impl;

//canal 配置的 destination,默认为 example
var destination = "example";
//创建一个简单CanalClient连接对象(此对象不支持集群)传入参数分别为 canal地址、端口、destination、用户名、密码
var connector = CanalConnectors.NewSingleConnector("127.0.0.1", 11111, destination, "", "");//这里可以就这样
//连接 Canal
connector.Connect();
//订阅,同时传入Filter,如果不传则以Canal的Filter为准。Filter是一种过滤规则,通过该规则的表数据变更才会传递过来
//允许所有数据 .*\..*
//允许某个库数据 库名\..*
//允许某些表 库名.表名,库名.表名"
connector.Subscribe(".*\..*");

while (true)
{
    //获取数据 1024表示数据大小 单位为字节
    var message = connector.Get(1024);
    //批次id 可用于回滚
    var batchId = message.Id;
    if (batchId == -1 || message.Entries.Count <= 0)
    {
        Thread.Sleep(300);
        continue;
    }
    PrintEntrys(message.Entries);
}
/// <summary>
/// 输出数据
/// </summary>
/// <param name="entrys">一个entry表示一个数据库变更</param>
private void PrintEntrys(List<Entry> entrys)
{
    foreach (var entry in entrys)
    {
        if (entry.EntryType == EntryType.Transactionbegin || entry.EntryType == EntryType.Transactionend)
        {
            continue;
        }

        RowChange rowChange = null;
        try
        {
            //获取行变更
            rowChange = RowChange.Parser.ParseFrom(entry.StoreValue);
        }
        catch (Exception ex)
        {
            logMan.Error("mysql获取行变更错误:" + ex.Message, ex);
        }

        if (rowChange != null)
        {
            //变更类型 insert/update/delete 等等
            EventType eventType = rowChange.EventType;                   

            //输出 insert/update/delete 变更类型列数据
            foreach (var rowData in rowChange.RowDatas)
            {
                if (eventType == EventType.Delete)
                {
                    PrintColumn(rowData.BeforeColumns.ToList(), entry.Header.TableName);
                }
                else if (eventType == EventType.Insert)//添加
                {
                    PrintColumn(rowData.AfterColumns.ToList(), entry.Header.TableName);
                }
                else
                {
                    if (entry.Header.TableName == "tablename1")//这里可以过滤自己想监测的表名
                        PrintColumn(rowData.AfterColumns.ToList(), entry.Header.TableName, rowData.BeforeColumns.ToList());
                }
            }
        }
    }
}
private void PrintColumn(List<Column> columns, string tbName, List<Column> oldColumns = null)
{
    //修改的值,以及修改列的id值
    string newNum = "", userid = "";
    foreach (var column in columns)
    {    
        if (column.Updated)
        {
            //输出列 列值 是否变更 也可以拼出执行的语句
            Console.WriteLine($"{column.Name} : {column.Value}  update=  {column.Updated}");             
        }
    }
}

 附上一个完整的控制台应用。完成一、二中对mysql,canal的配置后,可以直接运行下列的控制台进行测试

using CanalSharp.Client.Impl;
using Com.Alibaba.Otter.Canal.Protocol;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            //canal 配置的 destination,默认为 example
            var destination = "example";
            //创建一个简单 CanalClient 连接对象(此对象不支持集群)传入参数分别为 canal 地址、端口、destination、用户名、密码
            var connector = CanalConnectors.NewSingleConnector("127.0.0.1", 11111, destination, "", "");
            //连接 Canal
            connector.Connect();
            //订阅,同时传入 Filter。Filter是一种过滤规则,通过该规则的表数据变更才会传递过来
            //允许所有数据 .*\..*
            //允许某个库数据 库名\..*
            //允许某些表 库名.表名,库名.表名
            connector.Subscribe(".*\..*");
            Console.WriteLine("监控创建成功,开始监控。");
            while (true)
            {
                //获取数据 1024表示数据大小 单位为字节
                var message = connector.Get(1024);
                //批次id 可用于回滚
                var batchId = message.Id;
                if (batchId == -1 || message.Entries.Count <= 0)
                {
                    Thread.Sleep(300);
                    continue;
                }

                PrintEntry(message.Entries);
            }
        }
        /// <summary>
        /// 输出数据
        /// </summary>
        /// <param name="entrys">一个entry表示一个数据库变更</param>
        private static void PrintEntry(List<Entry> entrys)
        {
            Console.WriteLine("监控到改动,进入监控输出。");
            foreach (var entry in entrys)
            {
                if (entry.EntryType == EntryType.Transactionbegin || entry.EntryType == EntryType.Transactionend)
                {
                    continue;
                }

                RowChange rowChange = null;

                try
                {
                    //获取行变更
                    rowChange = RowChange.Parser.ParseFrom(entry.StoreValue);
                    Console.WriteLine("行变更。");
                }
                catch (Exception ex)
                {
                    //_logger.LogError(ex.ToString());
                }

                if (rowChange != null)
                {
                    //变更类型 insert/update/delete 等等
                    EventType eventType = rowChange.EventType;
                    //输出binlog信息 表名 数据库名 变更类型
                    //_logger.LogInformation(
                    //    $"================> binlog[{entry.Header.LogfileName}:{entry.Header.LogfileOffset}] , name[{entry.Header.SchemaName},{entry.Header.TableName}] , eventType :{eventType}");

                    Console.WriteLine("改变的数据库名:" + entry.Header.SchemaName);
                    //输出 insert/update/delete 变更类型列数据
                    foreach (var rowData in rowChange.RowDatas)
                    {
                        if (eventType == EventType.Delete)
                        {
                            PrintColumn(rowData.BeforeColumns.ToList(), entry.Header.TableName);
                        }
                        else if (eventType == EventType.Insert)
                        {
                            PrintColumn(rowData.AfterColumns.ToList(), entry.Header.TableName);
                        }
                        else
                        {
                            PrintColumn(rowData.AfterColumns.ToList(), entry.Header.TableName);
                        }
                    }
                }
            }
        }

        /// <summary>
        /// 输出每个列的详细数据
        /// </summary>
        /// <param name="columns"></param>
        private static void PrintColumn(List<Column> columns, string tbName)
        {
            Console.WriteLine("有改动");
            foreach (var column in columns)
            {
                //if (column.Name == lookName) userid = column.Value;
                //Console.WriteLine($"{column.Name} : {column.Value}  update=  {column.Updated}");
                if (column.Updated)
                {
                    //输出列明 列值 是否变更
                    Console.WriteLine($"修改的表名:{tbName},列名:{column.Name} : {column.Value}  update=  {column.Updated}");
                    //修改以后的值 column.Name=
                    //if (column.Name == alterName) newNum = column.Value;
                    //if (column.Name == lookName) userid = column.Value;
                    //Console.WriteLine($"update {tbName} set {alterName}={column.Value} where {lookName}={userid}");
                }
            }
        }
    }
}
原文地址:https://www.cnblogs.com/hllxy/p/12572943.html