asp.net 使用rabbitmq事例

本例asp.net 使用rabbitmq需求背景:为了提升用户体验,用户点击下单按钮,后台先做一些简单必要的操作,返回给用户一个友好提示(比如提示处理中,或者订单状态为处理中),然后发通过发消息给队列,把耗时久的操作留给rabbitmq队列处理。

 1、生产者封装类:

public class Publisher
    {
        private readonly string _exchange;
        private readonly string _hostName;
        private readonly string _password;
        private readonly Uri _uri;
        private readonly string _userName;
        private readonly string _virtualHost;

        /// <param name="exchange"></param>
        /// <param name="hostName"></param>
        /// <param name="userName"></param>
        /// <param name="password"></param>
        /// <param name="virtualHost"></param>
        /// <param name="uri">AMQP Address</param>
        public Publisher(string exchange, string hostName, string userName, string password, string virtualHost, Uri uri)
        {
            _hostName = hostName;
            _exchange = exchange;
            _userName = userName;
            _password = password;
            _virtualHost = virtualHost;
            _uri = uri;

            Factory = new ConnectionFactory
            {
                HostName = _hostName,
                UserName = _userName,
                Password = _password,
                VirtualHost = _virtualHost,
                Endpoint = new AmqpTcpEndpoint(_uri),
                RequestedHeartbeat = 0
            };
            Factory.RequestedHeartbeat = 0;
        }

        public string HostName
        {
            get { return _hostName; }
        }

        public string Exchange
        {
            get { return _exchange; }
        }

        public string UserName
        {
            get { return _userName; }
        }

        public string Password
        {
            get { return _password; }
        }

        public string VirtualHost
        {
            get { return _virtualHost; }
        }

        public Uri Uri
        {
            get { return _uri; }
        }

        public ConnectionFactory Factory { get; private set; }

        /// <summary>
        /// 直连式交换机,发消息
        /// </summary>
        /// <param name="queueName">队列名</param>
        /// <param name="message">消息</param>
        /// <param name="durable">消息是否持久化</param>
        public void PublishDirectMessage(string queueName, string message, bool durable=false)
        {
            if (null == Factory)
            {
                throw new ArgumentException("connection factory initialization error");
            }

            if (string.IsNullOrWhiteSpace(message))
            {
                throw new ArgumentNullException("message can not be null.");
            }

            if (string.IsNullOrWhiteSpace(Exchange))
            {
                throw new ArgumentNullException("exchange can not be null.");
            }

            try
            {
                using (var connection = Factory.CreateConnection())
                {
                    //通道 (Channel),在C#客户端里叫Model(不明白为什么这么取名字),其他客户端基本都叫Channel
                    using (var channel = connection.CreateModel())
                    {
                        //定义交换机
                        channel.ExchangeDeclare(Exchange, ExchangeType.Direct, durable: durable,
                            autoDelete: false, arguments: null);

                        //定义队列,如果名称相同不会重复创建
                        channel.QueueDeclare(queueName, durable: durable, exclusive: false,
                            autoDelete: false, arguments: null);

                        //绑定
                        channel.QueueBind(queueName, Exchange, routingKey: queueName);

                        //消息可持久化
                        IBasicProperties props = null;
                        if (durable) {
                            props = channel.CreateBasicProperties();
                            props.SetPersistent(true);
                        }                        

                        //发送消息到队列
                        var msgBody = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(Exchange, routingKey: queueName, basicProperties: props,
                            body: msgBody);
                    }
                }
            }
            catch (Exception ex)
            {
                LogHelper.Log(LogCategory.Error, ex.Message, ex);
            }
        }

        /// <summary>
        /// Fanout(广播)式交换机
        /// </summary>
        /// <param name="message"></param>
        public void PublishFanoutMessage(string message)
        {
            if (null == Factory)
            {
                throw new ArgumentException("connection factory initialization error");
            }

            if (string.IsNullOrWhiteSpace(message))
            {
                throw new ArgumentNullException("message can not be null.");
            }

            if (string.IsNullOrWhiteSpace(Exchange))
            {
                throw new ArgumentNullException("exchange can not be null.");
            }

            try
            {
                using (var connection = Factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(Exchange, ExchangeType.Fanout);
                        byte[] body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(Exchange, "", null, body);
                    }
                }
            }
            catch (Exception ex)
            {
                LogHelper.Log(LogCategory.Error, ex.Message, ex);
            }
        }
    }
生产者

2、消费者封装类:

public delegate void ReceiveMessageHandle(string inputStr);
    public delegate bool ReceiveAnswerMessageHandle(string inputStr);

    /// <typeparam name="T">要接收的数据类型</typeparam>
    public class Subscriber<T> : IDisposable
    {
        private readonly string _exchange;
        private readonly string _hostName;
        private readonly string _password;
        private readonly Uri _uri;
        private readonly string _userName;
        private readonly string _virtualHost;

        /// <param name="exchange"></param>
        /// <param name="hostName"></param>
        /// <param name="userName"></param>
        /// <param name="password"></param>
        /// <param name="virtualHost"></param>
        /// <param name="uri">AMQP Address</param>
        public Subscriber(string exchange, string hostName, string userName, string password, string virtualHost,
            Uri uri)
        {
            _hostName = hostName;
            _exchange = exchange;
            _userName = userName;
            _password = password;
            _virtualHost = virtualHost;
            _uri = uri;

            Factory = new ConnectionFactory
            {
                HostName = _hostName,
                UserName = _userName,
                Password = _password,
                VirtualHost = _virtualHost,
                Endpoint = new AmqpTcpEndpoint(_uri),
                RequestedHeartbeat = 0
            };

            Connection = Factory.CreateConnection();
            Channel = Connection.CreateModel();
        }

        public string HostName
        {
            get { return _hostName; }
        }

        public string Exchange
        {
            get { return _exchange; }
        }

        public string UserName
        {
            get { return _userName; }
        }

        public string Password
        {
            get { return _password; }
        }

        public string VirtualHost
        {
            get { return _virtualHost; }
        }

        public Uri Uri
        {
            get { return _uri; }
        }

        public ConnectionFactory Factory { get; private set; }

        public IModel Channel { get; private set; }
        public IConnection Connection { get; private set; }
        public EventingBasicConsumer Consumer { get; private set; }

        private string QueueName { get; set; }

        public string Message { get; set; }

        //public delegate string MessageHandle();

        /// <summary>
        /// 手动释放
        /// </summary>
        void IDisposable.Dispose()
        {
            if (Channel != null)
            {
                Consumer = null;
                Channel.Close();
                Channel.Dispose();
            }

            if (Connection != null)
            {
                Consumer = null;
                Connection.Close();
                Connection.Dispose();
            }

            GC.SuppressFinalize(this);
        }

        /// <summary>
        /// 托管释放
        /// </summary>
        ~Subscriber()
        {
            if (Channel != null)
            {
                Consumer = null;
                Channel.Close();
                Channel.Dispose();
            }

            if (Connection != null)
            {
                Consumer = null;
                Connection.Close();
                Connection.Dispose();
            }
        }

        public ReceiveMessageHandle ReceiveMessageHandler { get; set; }
        public ReceiveAnswerMessageHandle ReceiveAnswerMessageHandle { get; set; }

        /// <summary>
        /// 接受广播消息
        /// <param name="tryTimes">消费失败后,继续尝试消费的次数</param>
        /// </summary>
        public void ReceiveFanoutMessage(int tryTimes = 3)
        {
            try
            {
                //Channel.ExchangeDeclare(Exchange, ExchangeType.Fanout);
                //QueueName = Channel.QueueDeclare().QueueName;
                //Channel.QueueBind(QueueName, Exchange, "");
                //Consumer = new EventingBasicConsumer(Channel);
                //Consumer.Received += (model, dlvrArgs) =>
                //{
                //    byte[] body = dlvrArgs.Body;
                //    Message = Encoding.UTF8.GetString(body);
                //    ReceiveMessageHandler(Message);
                //};
                //Channel.BasicConsume(QueueName, true, Consumer);

                Channel.ExchangeDeclare(Exchange, ExchangeType.Fanout);
                QueueName = Channel.QueueDeclare().QueueName;
                Channel.QueueBind(QueueName, Exchange, "");
                Consumer = new EventingBasicConsumer(Channel);
                Consumer.Received += (model, dlvrArgs) =>
                {
                    byte[] body = dlvrArgs.Body;
                    if (body != null && body.Length > 0)
                    {
                        Message = Encoding.UTF8.GetString(body);
                        if (!string.IsNullOrWhiteSpace(Message))
                        {
                            bool isConsumeSuccess = false;// 是否消费成功
                            int consumeCount = 0;//尝试消费次数
                            while (!isConsumeSuccess)
                            {
                                consumeCount++;
                                isConsumeSuccess = ReceiveAnswerMessageHandle(Message);
                                if (isConsumeSuccess || consumeCount >= tryTimes)
                                {
                                    Channel.BasicAck(dlvrArgs.DeliveryTag, false);//将队列里面的消息进行释放
                                    isConsumeSuccess = true;
                                }
                                else
                                {
                                    //重新放入队列,等待再次消费
                                    Channel.BasicAck(dlvrArgs.DeliveryTag, true);
                                }
                            }
                        }
                    }
                };
                Channel.BasicConsume(QueueName, false, Consumer);
            }
            catch (Exception ex)
            {
                LogHelper.Log(LogCategory.Error, ex.Message, ex);
            }
        }

        /// <summary>
        /// 接受直连交换机消息
        /// </summary>
        /// <param name="queueName">队列名</param>
        /// <param name="durable">消息是否持久化</param>
        /// <param name="tryTimes">消费失败后,继续尝试消费的次数</param>
        /// <returns></returns>
        public void ReceiveDirectMessage(string queueName, bool durable = false, int tryTimes = 3)
        {
            try
            {
                Channel.ExchangeDeclare(Exchange, ExchangeType.Direct, durable: durable, autoDelete: false, arguments: null);
                Channel.QueueDeclare(queueName, durable: durable, exclusive: false, autoDelete: false, arguments: null);
                Channel.QueueBind(queueName, Exchange, routingKey: queueName);
                //订阅模式 (有消息到达将被自动接收) 消费者 
                Consumer = new EventingBasicConsumer(Channel);
                //绑定消息接收后的事件委托 
                Consumer.Received += (model, dlvrArgs) =>
                {
                    byte[] body = dlvrArgs.Body;
                    if (body != null && body.Length > 0)
                    {
                        Message = Encoding.UTF8.GetString(body);
                        if (!string.IsNullOrWhiteSpace(Message))
                        {
                            bool isConsumeSuccess = false;// 是否消费成功
                            int consumeCount = 0;//尝试消费次数
                            while (!isConsumeSuccess)
                            {
                                consumeCount++;
                                isConsumeSuccess = ReceiveAnswerMessageHandle(Message);
                                if (isConsumeSuccess || consumeCount >= tryTimes)
                                {
                                    Channel.BasicAck(dlvrArgs.DeliveryTag, false);//将队列里面的消息进行释放
                                    isConsumeSuccess = true;
                                }
                                else
                                {
                                    //重新放入队列,等待再次消费
                                    Channel.BasicAck(dlvrArgs.DeliveryTag, true);
                                }
                            }
                        }
                    }
                };
                Channel.BasicConsume(queueName, false, Consumer);
            }

            catch (Exception ex)
            {
                LogHelper.Log(LogCategory.Error, ex.Message, ex);
            }
        }

        public T ToJson()
        {
            return JsonConvert.DeserializeObject<T>(Message);
        }
    }
消费者

3、新建控制台程序,发送消息:

 class Program
    {
        public static string MqUri = ConfigHelper.GetConfig("RabbitMQ", "MqUri");
        public static string MqExchange = ConfigHelper.GetConfig("RabbitMQ", "MqExchange");
        public static string MqHostName = ConfigHelper.GetConfig("RabbitMQ", "MqHostName");
        public static string MqUserName = ConfigHelper.GetConfig("RabbitMQ", "MqUserName");
        public static string MqPassword = ConfigHelper.GetConfig("RabbitMQ", "MqPassword");

        static void Main(string[] args)
        {
            string userCommand = "";  
            while (userCommand != "exit")
            {
                Console.WriteLine("请输入:");
                userCommand = Console.ReadLine();

                //发送消息
                var publisher = new Publisher(MqExchange, MqHostName,
                                MqUserName, MqPassword, "/", new Uri(MqUri));

                publisher.PublishFanoutMessage(userCommand);
            }           
        }
    }
发送消息控制台应用

4、新建控制台程序,接受消息:

internal class Program
    {
        public static string MqUri = ConfigHelper.GetConfig("RabbitMQ", "MqUri");
        public static string MqExchange = ConfigHelper.GetConfig("RabbitMQ", "MqExchange");
        public static string MqHostName = ConfigHelper.GetConfig("RabbitMQ", "MqHostName");
        public static string MqUserName = ConfigHelper.GetConfig("RabbitMQ", "MqUserName");
        public static string MqPassword = ConfigHelper.GetConfig("RabbitMQ", "MqPassword");

        static void Main(string[] args)
        {
            Console.WriteLine("Start process data {0}", DateTime.Now);
            try
            {
                var subscriber = new Subscriber<string>(
                    MqExchange, MqHostName, MqUserName, MqPassword, "/",
                    new Uri(MqUri))
                {
                    ReceiveAnswerMessageHandle = SubscriberHandler1
                };

                subscriber.ReceiveFanoutMessage();

            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }
        }

        private static bool SubscriberHandler1(string msg)
        {
            Console.WriteLine(msg);
            return true;
        }
    }
接受消息控制台应用

5、配置文件:

上面两个控制台的配置文件一样,如下:

<RabbitMQ>
<add key="MqUri" value="amqp://localhost/" />
<add key="MqExchange" value="MyExchange" />
<add key="MqHostName" value="localhost" />
<add key="MqUserName" value="root" />
<add key="MqPassword" value="root" />
</RabbitMQ>

6、结果图:

原文地址:https://www.cnblogs.com/qk2014/p/9173269.html