MQ的调用

mq调用(相关dll)

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MqTest2
{
    class Program
    {

        static void Main(string[] args)
        {

        }
        /// <summary>
        /// 获取数据
        /// </summary>
        static public void GetData()
        {
            MqHelper mqHelper = new MqHelper();
            var e = mqHelper.GetMQMsg();
            byte[] data = null;
            if (e !=null)
            {
                data = e.Body;
                var result = Encoding.UTF8.GetString(data);
            }          
        }
        /// <summary>
        /// 发送数据
        /// </summary>
        static public void SendData()
        {
            MqHelper mqHelper = new MqHelper();
            Student student = new Student()
            {
                name = "测试",
                Age = "12"
            };
            string errorMsg = string.Empty;
            mqHelper.sendMQMessage(student, out errorMsg);//发送数据
        }
    }

    public class Student
    {

        public string name { get; set; }

        public string Age { get; set; }
    }

}

mq帮助类

using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace MqTest2
{
    class MqHelper
    {
        private IModel _channel;
        private QueueingBasicConsumer _consumer;
        private IDictionary<string, IModel> _channels;
        public RabbitMQConfigSection_New RabbitMQConfigSection { get; private set; }
        private readonly object _syncRoot = new object();
        public ConnectionFactory Factory { get; private set; }
        private IConnection Connection { get; set; }
        string RoutingKeys = "CDP.Finish";//路由键(多个路由中间用,隔开)
        string QueueName = "Citms.Queue.VideoTest";//队列名称
        string ExchangeName = "Citms.Exchange.Test";//交换机名称
        string SendRoutingKey = "CDP.Finish";//发送路由键

        public MqHelper()
        {
            this._channels = new Dictionary<string, IModel>();
            this.RabbitMQConfigSection = new RabbitMQConfigSection_New();
            Factory = new ConnectionFactory();
            Factory.UserName = this.RabbitMQConfigSection.RabbitMQUserName;
            Factory.Password = this.RabbitMQConfigSection.RabbitMQPassword;
            Factory.VirtualHost = "/";
            Factory.Uri = this.RabbitMQConfigSection.RabbitMQUri;
            CreateConsumer();
        }
        /// <summary>
        /// 创建路由
        /// </summary>
        public void CreateConsumer()
        {
            try
            {
                _channel = GetNamedChannel(QueueName);
                _channel.BasicQos(0, 100, false);
                GetChannel().QueueDeclare(QueueName, true, false, false, null);
                if (!string.IsNullOrEmpty(RoutingKeys))
                {
                    // 支持绑定多个路由键
                    string[] rks = RoutingKeys.Split(new char[] { ',' }, StringSplitOptions.RemoveEmptyEntries);
                    foreach (var rk in rks)
                    {
                        GetChannel().QueueBind(
                            QueueName,
                            ExchangeName,
                            rk.Trim()
                        );
                    }
                }
                this._consumer = new QueueingBasicConsumer(_channel);
                _channel.BasicConsume(QueueName, false, this._consumer);
            }

            catch (Exception ex)
            {
                throw (ex);
            }
        }
        /// <summary>
        /// 获取消息队列内容
        /// </summary>
        /// <returns></returns>
        public BasicDeliverEventArgs GetMQMsg()
        {
            try
            {
                BasicDeliverEventArgs a;
                bool status = this._consumer.Queue.Dequeue(10000, out a);//队列为空时结束挂起,不做这个判断当队列为空时会一直超时
                if (status)
                {
                    return a;
                }
                else
                {
                    return null;
                }
            }
            catch (Exception ex)
            {

                if (this._consumer == null || !this._consumer.IsRunning)
                {
                    this.CreateConsumer();
                }
                throw;
            }
        }
        /// <summary>
        /// 发送队列消息
        /// </summary>
        /// <param name="iv"></param>
        /// <param name="errorMsg"></param>
        /// <returns></returns>
        public bool sendMQMessage(Student iv, out string errorMsg)
        {
            errorMsg = "";
            dynamic obj = new { DataType = "Student", Data = iv, ReportedTime = "20190826161051" };
            string value = JsonConvert.SerializeObject(obj);
            try
            {
                lock (_channel)
                {
                    _channel.ExchangeDeclare(ExchangeName, "direct", true);
                    byte[] bytes = Encoding.UTF8.GetBytes(value);
                    _channel.BasicPublish(ExchangeName, SendRoutingKey, null, bytes);
                    return true;
                }
            }
            catch (Exception ex)
            {
                errorMsg = ex.Message;
                return false;
            }
        }
        /// <summary>
        /// 获取 rabbitmq 通道 
        /// </summary>
        /// <returns></returns>
        public IModel GetChannel()
        {
            lock (this._syncRoot)
            {
                // 最多重试 4 次
                for (int i = 0; i < 4; ++i)
                {
                    try
                    {
                        // 通道为 null,重新创建
                        if (this._channel == null)
                        {
                            this._channel = this.GetConnection().CreateModel();
                            return this._channel;
                        }
                        // 通道不为 null,并且已经打开,直接返回
                        if (this._channel.IsOpen)
                        {
                            return this._channel;
                        }
                        // 通道不为 null,但是没有打开,关闭通道,continue
                        else
                        {
                            this._channel.Dispose();
                            this._channel = null;
                            // 随机休眠之后再试
                            int sleep = 1000;
                            Thread.Sleep(sleep);
                            continue;
                        }
                    }
                    catch (Exception ex)
                    {

                    }
                    finally
                    {

                    }
                }

                return null;
            }
        }
        /// <summary>
        /// 获取命名通道.
        /// </summary>
        /// <param name="name">The name.</param>
        /// <returns></returns>
        public IModel GetNamedChannel(string name)
        {
            IModel channel;
            lock (this._syncRoot)
            {
                for (int i = 0; i < 4; ++i)
                {
                    try
                    {
                        // 通道为 null,重新创建
                        if (!this._channels.TryGetValue(name, out channel))
                        {
                            channel = this.GetConnection().CreateModel();
                            this._channels[name] = channel;
                            return channel;
                        }
                        // 通道不为 null,并且已经打开,直接返回
                        if (channel.IsOpen)
                        {
                            return channel;
                        }
                        // 通道不为 null,但是没有打开,关闭通道,continue
                        else
                        {
                            channel.Dispose();
                            channel = null;
                            this._channels.Remove(name);
                            // 随机休眠之后再试
                            int sleep = 1000;
                            Thread.Sleep(sleep);
                            continue;
                        }
                    }
                    catch (Exception ex)
                    {
                        throw;
                    }
                    finally
                    {

                    }
                }

                return null;
            }
        }
        public IConnection GetConnection()
        {
            lock (_syncRoot)
            {
                try
                {
                    // 连接为 null,创建之
                    if (this.Connection == null)
                    {
                        if (this.Factory == null)
                        {

                        }
                        this.Connection = this.Factory.CreateConnection();
                        return this.Connection;
                    }
                    // 连接不为 null,但是状态不是已打开,关闭连接并重新创建连接
                    if (!this.Connection.IsOpen)
                    {
                        try
                        {
                            // 释放连接
                            this.Connection.Dispose();
                            this.Connection = null;
                        }
                        catch (Exception ex)
                        {
                            this.Connection = null;
                        }
                        finally
                        {
                            // 创建新连接
                            this.Connection = this.Factory.CreateConnection();

                        }
                    }
                    // 返回连接对象
                    return this.Connection;
                }
                catch (Exception ex)
                {

                    throw ex;
                }
            }
        }

    }
    public class RabbitMQConfigSection_New
    {

        public string RabbitMQUri { get; set; } = "amqp://192.168.0.37:5672";


        public string RabbitMQUserName { get; set; } = "citms";


        public string RabbitMQPassword { get; set; } = "citms@b7";
    }
}
原文地址:https://www.cnblogs.com/macT/p/11452029.html