2.RABBITMQ 入门

关于安装和配置,见上一篇 1.RABBITMQ 入门 - WINDOWS - 获取,安装,配置

公司有需求,要求使用winform开发这个东西(消息中间件),另外还要求开发一个日志中间件,但是也是要求做成win form的,这明显不合理,因为之前,服务器上我已经放置了一个  短信的winform的服务。那么到后期的话,登录服务器之后,全是

一个个的窗体挂在那儿,这明显合不合常理,但是领导要求这么玩,也没办法, 因为卧虎要负责的是消费 消息,所以重点说明 消费端

该案例的接收端,源自网上的代码片段 片内容,做了部分修改之后使用

 

日志中心的 功能要求使用注入解耦,所以,这里我也解耦了,如果日至那边使用的是 autofac,我只里使用的MEF实现注入 所以定义了相关的接口对象

IMQContextFactory:

using Ecostar.MQLogger.Core.Infrastructure;
using System;

namespace Ecostar.MQConsumer.Core.Infrastructure
{
    /// <summary>
    ///     仅仅只有sender使用到
    /// </summary>
    public interface IMQContextFactory
    {
        MQContext CreateContext(string mqUri, Action<string, LogLevel> toLog);
    }
}
View Code

对应的实现类:MQContextFactory

using Ecostar.MQLogger.Core.Infrastructure;
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.ComponentModel.Composition;
using System.Security.Cryptography;

namespace Ecostar.MQConsumer.Core.Infrastructure
{
    /// <summary>
    ///     仅仅只有sender使用到
    /// </summary>
    [Export(typeof(IMQContextFactory))]
    public class MQContextFactory : IMQContextFactory
    {
        /// <summary>
        /// 上下文字典
        /// </summary>
        private static readonly Dictionary<string, MQContext> Contexts = new Dictionary<string, MQContext>();

        /// <summary>
        /// 上下文操作锁字典,只创建一次
        /// </summary>
        public static readonly Dictionary<string, object> contextLockers = new Dictionary<string, object>();

        /// <summary>
        /// 更新上下文操作锁字典时的锁,只创建一次
        /// </summary>
        private static readonly object contextLockersLocker = new object();

        /// <summary>
        /// 获取指定的上下文
        /// </summary>
        /// <param name="mqUri">mq地址</param>
        /// <param name="toLog">日志记录</param>
        /// <returns>上下文对象</returns>
        public MQContext CreateContext(string mqUri, Action<string, LogLevel> toLog)
        {
            var key = MD5Encrypt(mqUri);
            var locker = GetFactoryLocker(key);

            lock (locker)
            {
                MQContext context;
                if (!Contexts.TryGetValue(key, out context))
                {
                    Guid contextId = Guid.NewGuid();
                    string logHeader = string.Format("[{0}]", contextId.ToString());

                    context = new MQContext()
                    {
                        ReceiveQueueName = "Logs",
                        Id = contextId
                    };
                    Console.WriteLine(logHeader + "   初始化发送上下文完毕");

                    // 获取连接
                    context.SendConnection = CreateConnection(mqUri);
                    context.SendConnection.AutoClose = false;
                    context.SendConnection.ConnectionShutdown += (o, e) => Console.WriteLine("   RabbitMQ错误,连接被关闭了:" + e.ReplyText);
                    Console.WriteLine(logHeader + "   创建连接完毕", LogLevel.Trace);

                    // 获取通道
                    context.SendChannel = CreateChannel(context.SendConnection);
                    Console.WriteLine(logHeader + "   创建通道完毕", LogLevel.Trace);

                    Contexts.Add(key, context);

                }

                return context;
            }
        }

        #region 私有方法
        /// 创建连接
        /// </summary>
        /// <param name="mqUrl"></param>
        /// <returns></returns>
        private static IConnection CreateConnection(string mqUrl)
        {
            const ushort heartbeta = 120;

            var factory = new ConnectionFactory()
            {
                Uri = mqUrl,
                RequestedHeartbeat = heartbeta,
                AutomaticRecoveryEnabled = true
            };

            return factory.CreateConnection();
        }

        /// <summary>
        /// 创建通道
        /// </summary>
        /// <param name="connection"></param>
        /// <returns></returns>
        private static IModel CreateChannel(IConnection connection)
        {
            if (connection != null)
                return connection.CreateModel();
            return null;
        }


        /// <summary>
        /// 获取上下文操作锁
        /// </summary>
        /// <param name="contextKey">上下文工厂key</param>
        /// <returns></returns>
        private static object GetFactoryLocker(string contextKey)
        {
            lock (contextLockersLocker)
            {
                object locker;
                if (!contextLockers.TryGetValue(contextKey, out locker))
                {
                    locker = new object();
                    contextLockers.Add(contextKey, locker);
                }

                return locker;
            }
        }

        /// <summary>
        /// 获取字符的MD5值
        /// </summary>
        /// <param name="str"></param>
        /// <returns></returns>
        private static string MD5Encrypt(string str)
        {
            MD5 md5 = new MD5CryptoServiceProvider();
            byte[] result = md5.ComputeHash(System.Text.Encoding.Default.GetBytes(str));
            return System.Text.Encoding.Default.GetString(result);
        }
        #endregion


    }
}
View Code

注视我写的很明白,这部分的使用 是  生产者使用的类,也就是  发送消息


下面是消费者:

IReceiver.cs:

namespace Ecostar.MQConsumer.Core
{
    public interface IReceiver
    {
        /// <summary>
        ///     初始化接收程序
        /// </summary>
        /// <param name="mqUrls"></param>
        void InitialReceive(MQReceiverParam receiverParams);


    }
}
View Code

对应的实现类:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.ComponentModel.Composition;
using System.Threading;

namespace Ecostar.MQConsumer.Core
{
    [Export(typeof(IReceiver))]
    public class Receiver : IReceiver
    {
        private MQContext _context;
        private const ushort Heartbeta = 60;
        private string _queueName;
        private bool _isAutoAck;
        private List<string> _mqUrls;
        private Func<byte[], bool> _processFunction;
        private Action<string> _mqActionLogFunc;
        private MQConnectionFactory _ConnectionFactoryParams;

   
        public void InitialReceive(MQReceiverParam receiverParams)
        {
            _queueName       = receiverParams._queueName;
            _isAutoAck       = receiverParams._isAutoAck;
            _mqUrls          = receiverParams._mqUrls;
            _processFunction = receiverParams._processFunction;
            _mqActionLogFunc = receiverParams._mqActionLogFunc;
            _ConnectionFactoryParams = receiverParams.ConnectionFactoryParam;
            receiverParams._mqUrls.ForEach(url => InitReceive(_queueName, _isAutoAck, url));

        }

        /// <summary>
        /// 初始化某个节点的接收
        /// </summary>
        private void InitReceive(string queueName, bool isAutoAck, string mqUrl)
        {
            Guid contextId = Guid.NewGuid();
            string logHeader = string.Format("[{0}, {1}]", queueName, contextId.ToString());
            try
            {
                _context = new MQContext()
                {
                    Id = contextId,
                    ReceiveQueueName = queueName,
                    IsAutoAck = isAutoAck,
                    ReceiveConnection = new ConnectionFactory()
                    {
                        HostName = _ConnectionFactoryParams.HostName,
                        UserName = _ConnectionFactoryParams.UserName,
                        Password = _ConnectionFactoryParams.Password,
                        VirtualHost = _ConnectionFactoryParams.VirtualHost
                    }.CreateConnection()
                };

                // 监听Shutdown事件,记录下LOG便于排查和监管服务的稳定性
                _context.ReceiveConnection.ConnectionShutdown += (o, e) =>
                {
                    _mqActionLogFunc("   RabbitMQ错误,连接被关闭了:" + e.ReplyText);
                };
                // 获取通道
                _context.ReceiveChannel = _context.ReceiveConnection?.CreateModel();

                // 创建事件驱动的消费者
                var consumer = new EventingBasicConsumer(_context.ReceiveChannel);
                consumer.Received += (o, e) =>
                {
                    try
                    {
                        // 接受数据处理逻辑
                        // e.Body
                        var result = _processFunction(e.Body);

                        if (!isAutoAck)
                        {
                            if (!result)
                            {
                                Thread.Sleep(300);

                                // 未能处理完成的话,将消息重新放入队列头
                                _context.ReceiveChannel.BasicReject(e.DeliveryTag, true);
                                _mqActionLogFunc("   消息未处理成功,将消息重新放入队列头");
                            }
                            else if (!_context.ReceiveChannel.IsClosed)
                            {
                                // 处理成功并且通道未关闭时ack回去,删除队列中的消息
                                _context.ReceiveChannel.BasicAck(e.DeliveryTag, false);
                                _mqActionLogFunc("   消息处理成功,发送Ack完毕");
                            }
                        }
                    }
                    catch (Exception ex)
                    {
                        Thread.Sleep(300);
                        if (!isAutoAck)
                        {
                            // 将消息重新放入队列头
                            _context.ReceiveChannel.BasicReject(e.DeliveryTag, true);
                        }
                        _mqActionLogFunc("   处理数据发生异常:" + ex.Message + ex.StackTrace);
                    }
                };

                // 一次只获取一条消息
                _context.ReceiveChannel.BasicQos(0, 1, false);
                _context.ReceiveChannel.BasicConsume(_context.ReceiveQueueName, _context.IsAutoAck, consumer);

                _mqActionLogFunc("   初始化队列完毕");
            }
            catch (Exception ex)
            {
                _mqActionLogFunc("   初始化RabbitMQ出错:" + ex.Message + ex.StackTrace);
            }
        }

    }
}
View Code

使用到的参数 MQReceiverParam:

using System;
using System.Collections.Generic;

namespace Ecostar.MQConsumer.Core
{
    /// <summary>
    ///     消费者入参
    /// </summary>
    public class MQReceiverParam
    {
        public  string _queueName { get; set; }
        public  bool _isAutoAck { get; set; }
        public  List<string> _mqUrls { get; set; }
        public  Func<byte[], bool> _processFunction { get; set; }
        public  Action<string> _mqActionLogFunc { get; set; }
        public MQConnectionFactory ConnectionFactoryParam { get; set; }

    }

    /// <summary>
    ///     服务配置
    /// </summary>
    public class MQConnectionFactory
    {
        public string HostName     {get;set;}
        public string UserName     {get;set;}
        public string Password     {get;set;}
        public string VirtualHost  {get;set;}
    }

}
View Code

重力要说明一下:

Func<byte[], bool> _processFunction { get; set; }
Action<string> _mqActionLogFunc { get; set; }

参数的对象中,有这么两个委托,原因是,如果你在学习 rabbitmq得这块内容的时候,你会发现,网上很多案例,以及官方提供的案例,写法都比较简单,而且,都是讲业务逻辑和   rabbitmq的消费的这跨功能 耦合到了一起

如果其他地方使用的时候,还是重复,创建  connection   创建queue,绑定,,,,,等相关动作,代码不仅不美观,而且显得繁琐,啰嗦,所以,这两个委托类型的参数,起到了接偶的作用,似的 具体的业务逻辑和 rabbitmq的消费逻辑 分离

使用如下:

(我是在窗体上直接放置了一个  richTextBox的控件,讲接收的信息打印出来,)

using Ecostar.MQConsumer.Core;
using System;
using System.Collections.Generic;
using System.ComponentModel.Composition;
using System.ComponentModel.Composition.Hosting;
using System.IO;
using System.Reflection;
using System.Text;
using System.Windows.Forms;

namespace Ecostar.MQConsumer.UI
{
    [Export]
    public partial class MQMainForm : Form
    {

        #region Fields
        private static CompositionContainer _container;//MEF 部件组合 管理
        [Import]
        public IReceiver Receiver { get; set; }

        #endregion

        public MQMainForm()
        {
            InitializeComponent();
        }

        private void MQMainForm_Load(object sender, EventArgs e)
        {
            InitForm();
            InitialListener();
        }


        public void InitForm()
        {
            AggregateCatalog catalog = new AggregateCatalog();
            catalog.Catalogs.Add(new DirectoryCatalog(Directory.GetCurrentDirectory()));
            catalog.Catalogs.Add(new AssemblyCatalog(Assembly.GetExecutingAssembly()));
            _container = new CompositionContainer(catalog);
        }

        /// <summary>
        ///     初始化监听程序
        /// </summary>
        void InitialListener()
        {
            MQMainForm form;
            try
            {
                form = _container.GetExportedValue<MQMainForm>();
            }
            catch (Exception ex)
            {

                throw;
            }
            form.Receiver.InitialReceive(new MQReceiverParam()
            {
                _queueName = "testQueueName",
                _isAutoAck = false,
                _mqUrls = new List<string>() { "amqp://127.0.0.1:5672/" },
                _processFunction = (buffer) =>
                {
                    string receiveMsg = Encoding.UTF8.GetString(buffer);
                    this.rtb_receive.Invoke(new Action(() => { { this.rtb_receive.Text += receiveMsg + "
"; } }));
                    return true;

                },
                _mqActionLogFunc = (msg) =>
                {
                    this.rtb_receive.Invoke(new Action(() =>
                    {
                        this.rtb_receive.Text += "====MQ Action====" + msg + "
";
                    }));
                },
                ConnectionFactoryParam = new MQConnectionFactory()
                {
                    HostName = "127.0.0.1",
                    UserName = "CC",
                    Password = "123qwe",
                    VirtualHost = "/"
                }
            });
        }


    }
}
View Code

 其中的 testQueueName,是客户端发送的 消息列队名称,也就是queue的名称,你也可以(如果是测试),在mq服务器上 人为的添加这个queue名称之后再测试。

这样一来,_processFunction 这个用于消费的方法,可以,写任意的处理方式,比如打印到控制台,输出到床体 控件显示,写入到日志,写入到数据库等等。

而且中的 _mqActionLogFunc,适用于记录mq的消费过程的日志,比如 mq消费操作执行过程中发生异常 ,那么直接找mq的问题即可。


截图中还一个:MQContext类,这是一个部分类,为了方便区分,我把消费者,生产者  公共部分分别放置到了三个部分类中:

MQContext.Consumer.cs

using RabbitMQ.Client;

namespace Ecostar.MQConsumer.Core
{
    /// <summary>
    ///        MQ 消费者
    /// </summary>
    public partial class MQContext
    {
        // <summary>
        /// 用户监听的Connection
        /// </summary>
        public IConnection ReceiveConnection { get; set; }

        /// <summary>
        /// 用于监听的Channel
        /// </summary>
        public IModel ReceiveChannel { get; set; }

        /// <summary>
        /// 监听队列名
        /// </summary>
        public string ReceiveQueueName { get; set; }
    }
}
View Code

MQContext.cs

namespace Ecostar.MQConsumer.Core
{
    /// <summary>
    ///     MQ 生产者消费者公共部分
    /// </summary>
    public partial class MQContext
    {
        /// <summary>
        /// mq地址
        /// </summary>
        public string MQUrl { get; set; }

    }
}
View Code

MQContext.Producer.cs

using RabbitMQ.Client;
using System;

namespace Ecostar.MQConsumer.Core
{
    /// <summary>
    ///        MQ 生产者
    /// </summary>
    public partial class MQContext
    {
        /// <summary>
        /// 用于发送消息的Connection
        /// </summary>
        public IConnection SendConnection { get; set; }

        /// <summary>
        /// 用于发送消息到Channel
        /// </summary>
        public IModel SendChannel { get; set; }

        /// <summary>
        /// 发送的Exchange
        /// </summary>
        public string Exchange { get; set; }

        /// <summary>
        /// 是否启用自动删除
        /// </summary>
        public bool IsAutoAck { get; set; }
        /// <summary>
        /// 上下文ID
        /// </summary>
        public Guid Id { get; set; }

        /// <summary>
        /// 路由
        /// </summary>
        public string RouteKey { get; set; }
        /// <summary>
        /// 是否正在运行,默认false
        /// </summary>
        public bool IsRunning { get; set; }

        /// <summary>
        /// 回收此上下文
        /// </summary>
        public void Recovery()
        {
            IsRunning = false;
        }
    }
}
View Code

到此,这个简单的消费案例就完成了。


下面的是 生产者,(发送消息的案例),为了造数据,所以写的随意些:(一个控制台程序),nuget引入  rabbitmq.client ,指令: install-package rabbitmq.Client

using RabbitMQ.Client;
using System;
using System.Text;

namespace RubbitMQClient
{
    /// <summary>
    ///     1.Routing (按路线发送接收)
    /// </summary>
    public class RoutingType
    {
        public static void RoutingProducer(string[] arguments)
        {
            arguments = new string[] { "0","" };


            string serverAddress = "127.0.0.1";
            string account = "CC";
            string password = "123qwe";

            ConnectionFactory factory = new ConnectionFactory()
            {
                HostName = serverAddress,
                UserName = account,
                Password = password,
                VirtualHost = "/"
            };
            IConnection conn = factory.CreateConnection();
            for (int i = 0; i < 1000; i++)
            {
                arguments[1] = i.ToString();
                string queueName = "testQueueName";


                using (var channel = conn.CreateModel())
                {
                    //---1.声明durable Exchange 和 Queue--------------------------------------------------------------------------------------------------------------
                    channel.ExchangeDeclare(Consts.EXCHANGE_NAME_DIRECT, "direct", durable: true, autoDelete: false, arguments: null);
                    //arguments = new[] { "12321", "32432" };
                    channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
                    channel.QueueBind(queueName, Consts.EXCHANGE_NAME_DIRECT, routingKey: "");//queueName

                    //----------------------------------------------------------------------------------------------------------------------

                    //---2.发布持久化消息到队列 ---------------------------------------------------------------------------------------------------
                    var props = channel.CreateBasicProperties();
                    //props.Priority = 3;//控制优先级
                    props.DeliveryMode = 2;//将信息也持久化
                    props.Persistent = true;///SetPersistent方式提示已经过时,建议使用当前方式
                    string severity = getSeverity(arguments);
                    string message = getMessage(arguments);
                    byte[] buffer = Encoding.UTF8.GetBytes(message);

                    channel.BasicPublish(Consts.EXCHANGE_NAME_DIRECT, routingKey: "", basicProperties: props, body: buffer);

                    ////---消费消息
                    //BasicGetResult msgResponse = channel.BasicGet(queueName, noAck: true);

                    //var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
                    //Console.WriteLine(msgBody);


                    //3.1(发布方式还有一种 基于推送的事件订阅 )第二种方式(使用内置的 QueueingBasicConsumer 提供简化的编程模型,通过允许您在共享队列上阻塞,直到收到一条消息)
                    //var consumer = new QueueingBasicConsumer(channel);
                    //channel.BasicConsume(queueName, noAck: true, consumer: consumer);
                    //var msgResponse = consumer.Queue.Dequeue(); //blocking
                    //var msgBody = Encoding.UTF8.GetString(msgResponse.Body);

                }

            }
            conn.Close();
            Console.ReadKey();
        }




        private static String getSeverity(String[] strings)
        {
            if (strings.Length < 1)
                return "routing(direct) type info";
            return strings[0];
        }

        private static String getMessage(String[] strings)
        {
            if (strings.Length < 2)
                return "routing(direct) --> Hello World!";
            return joinStrings(strings, " ", 1);
        }

        private static String joinStrings(String[] strings, String delimiter, int startIndex)
        {
            return strings[1].ToString();
        }





    }
}
View Code

抽时间将上面涉及到的   mq一些相关  属性(常用的API的),在总结下,主要是零散,其实东西很简单,如何更好的,更灵活的组合到一起,是这个插件使用的 最主要一点。

原文地址:https://www.cnblogs.com/Tmc-Blog/p/5433253.html