RabbitMQ

#region RabbitMQ
            var factory = new ConnectionFactory(); //实例化工厂
            factory.HostName = "localhost";
            factory.UserName = "sa";
            factory.Password = "123321";
            string queue = "Test";
            using (var connection = factory.CreateConnection()) //创建链接
            {
                using (var channel = connection.CreateModel())  //创建通道
                {
                    channel.QueueDeclare(
                        queue: queue, //消息队列名称
                        durable: false,//消息队列是否持久化
                        exclusive: false, //消息队列是否被本次连接connection独享。(本次连接connection创建的信道可以共用).排外的queue在当前连接被断开的时候会自动消失(清除)无论是否设置了持久化.
                        autoDelete: false, //消息队列是否自动删除。也就是说queue会清理自己,但是是在最后一个connection断开的时候。
                        arguments: null  //参数
                        ); //创建一个队列
                    channel.BasicQos(0, 1, false); // Qos即服务质量
                    for (int i = 0; i < 10; i++)
                    {
                        string message = i.ToString();
                        var body = Encoding.UTF8.GetBytes(message); //转换为字节序列
                        channel.BasicPublish("", queue, null, body);//注意路由键在用direct交换器时,要指定为队列名
                        Console.WriteLine("set {0}", message);
                    }
                }
            }
            Console.ReadLine();
            #endregion
添加生产者
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication2
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                HostName = "localhost",
                UserName = "sa",
                Password = "123321"
            };
            string queue = "Test";

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queue, false, false, false, null);//申明队列
                    var consumer = new EventingBasicConsumer(channel);//申明事件型消费者

                    Console.WriteLine("ConsoleApplication2 waiting for message.");
                    channel.BasicConsume(queue, false, consumer);//定义该消费者是否在该队列上为自动应答的
                    channel.BasicQos(0, 1, false); // Qos即服务质量
                    consumer.Received += (sender, ea) =>
                    {
                        int s = GetRandom();
                        Thread.Sleep(s);
                        string message = Encoding.UTF8.GetString(ea.Body);
                        channel.BasicAck(ea.DeliveryTag, false);
                        Console.WriteLine("Received {0} 耗费 {1} 毫秒", message, s);
                    };
                    Console.ReadLine();
                }
            }
        }
        private static int GetRandom()
        {
            Guid guid = Guid.NewGuid();
            Random rd = new Random(guid.GetHashCode());
            return rd.Next(100, 10000);
        }
    }
}
添加消费者1
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication3
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                HostName = "localhost",
                UserName = "sa",
                Password = "123321"
            };
            string queue = "Test";

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queue, false, false, false, null);
                    var consumer = new QueueingBasicConsumer(channel);
                    Console.WriteLine("ConsoleApplication3 waiting for message.");
                    channel.BasicConsume(queue, false, consumer);
                    channel.BasicQos(0, 1, false); // Qos即服务质量
                    while (true)
                    {
                        int s = GetRandom();
                        Thread.Sleep(s);
                        var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                        var message = Encoding.UTF8.GetString(ea.Body);
                        Console.WriteLine("Received {0} 耗费 {1} 毫秒", message, s);
                        channel.BasicAck(ea.DeliveryTag, false);
                    }
                }
            }
        }
        private static int GetRandom()
        {
            Guid guid = Guid.NewGuid();
            Random rd = new Random(guid.GetHashCode());
            return rd.Next(1000, 10000);
        }
    }
}
添加消费者2
#region RabbitMQ direct
//try
//{
//    var factory = new ConnectionFactory(); //实例化工厂
//    factory.HostName = "localhost";
//    factory.UserName = "guest";
//    factory.Password = "guest";
//    string ExchangeName = "Exchange1";
//    using (var connection = factory.CreateConnection()) //创建链接
//    {
//        using (var channel = connection.CreateModel())  //创建通道
//        {
//            //channel.ExchangeDeclare(ExchangeName, "direct", false, false, null);
//            //channel.QueueDeclare(
//            //    queue: queueName, //消息队列名称
//            //    durable: false,//消息队列是否持久化
//            //    exclusive: false, //消息队列是否被本次连接connection独享。(本次连接connection创建的信道可以共用).排外的queue在当前连接被断开的时候会自动消失(清除)无论是否设置了持久化.
//            //    autoDelete: false, //消息队列是否自动删除。也就是说queue会清理自己,但是是在最后一个connection断开的时候。
//            //    arguments: null  //参数
//            //    ); //创建一个队列
//            ////进行绑定
//            //channel.QueueBind(ExchangeName, queueName, routingKey: queueName);
//            ////channel.BasicQos(0, 1, false); // Qos即服务质量
//            channel.ExchangeDelete(ExchangeName);
//            //channel.QueueDelete(queueName);
//            channel.ExchangeDeclare(exchange: ExchangeName, type: "direct", durable: true, autoDelete: false, arguments: null);

//            string queueName1 = "Test1";
//            channel.QueueDeclare(queueName1, durable: true, autoDelete: false, exclusive: false, arguments: null);
//            channel.QueueBind(queueName1, ExchangeName, routingKey: queueName1);
//            for (int i = 0; i < 10; i++)
//            {
//                string message = i.ToString();
//                var body = Encoding.UTF8.GetBytes(message); //转换为字节序列
//                channel.BasicPublish(
//                    exchange: ExchangeName,
//                    routingKey: queueName1,
//                    basicProperties: null, //消息持久化, DeliveryMode=2 (Non-persistent (1) or persistent (2))
//                    body: body);//注意路由键在用direct交换器时,要指定为队列名
//                Console.WriteLine("set {0}", message);
//            }

//            string queueName2 = "Test2";
//            channel.QueueDeclare(queueName2, durable: true, autoDelete: false, exclusive: false, arguments: null);
//            channel.QueueBind(queueName2, ExchangeName, routingKey: queueName2);
//            for (int i = 0; i < 10; i++)
//            {
//                string message = i.ToString();
//                var body = Encoding.UTF8.GetBytes(message); //转换为字节序列
//                channel.BasicPublish(
//                    exchange: ExchangeName,
//                    routingKey: queueName2,
//                    basicProperties: null, //消息持久化, DeliveryMode=2 (Non-persistent (1) or persistent (2))
//                    body: body);//注意路由键在用direct交换器时,要指定为队列名
//                Console.WriteLine("set {0}", message);
//            }
//        }
//    }
//    Console.ReadLine();
//}
//catch (Exception)
//{

//    throw;
//}
#endregion

#region RabbitMQ Fanout
//try
//{
//    var factory = new ConnectionFactory(); //实例化工厂
//    factory.HostName = "localhost";
//    factory.UserName = "guest";
//    factory.Password = "guest";
//    string ExchangeName = "Exchange1";
//    using (var connection = factory.CreateConnection()) //创建链接
//    {
//        using (var channel = connection.CreateModel())  //创建通道
//        {
//            channel.ExchangeDelete(ExchangeName);
//            //channel.QueueDelete(queueName);
//            string queueName1 = "Test1";

//            channel.ExchangeDeclare(exchange: ExchangeName, type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null);
//            channel.QueueDeclare(queueName1, durable: true, autoDelete: false, exclusive: false, arguments: null);
//            channel.QueueBind(queueName1, ExchangeName, routingKey: queueName1);
//            for (int i = 0; i < 10; i++)
//            {
//                string message = i.ToString();
//                var body = Encoding.UTF8.GetBytes(message); //转换为字节序列
//                channel.BasicPublish(
//                    exchange: ExchangeName,
//                    routingKey: queueName1,
//                    basicProperties: null, //消息持久化, DeliveryMode=2 (Non-persistent (1) or persistent (2))
//                    body: body);//注意路由键在用direct交换器时,要指定为队列名
//                Console.WriteLine("set {0}", message);
//            } 

//            //string queueName2 = "Test2";
//            //channel.QueueDeclare(queueName2, durable: true, autoDelete: false, exclusive: false, arguments: null);
//            //channel.QueueBind(queueName2, ExchangeName, routingKey: queueName2);
//            //for (int i = 0; i < 10; i++)
//            //{
//            //    string message = i.ToString();
//            //    var body = Encoding.UTF8.GetBytes(message); //转换为字节序列
//            //    channel.BasicPublish(
//            //        exchange: ExchangeName,
//            //        routingKey: queueName2,
//            //        basicProperties: null, //消息持久化, DeliveryMode=2 (Non-persistent (1) or persistent (2))
//            //        body: body);//注意路由键在用direct交换器时,要指定为队列名
//            //    Console.WriteLine("set {0}", message);
//            //}
//        }
//    }
//    Console.ReadLine();
//}
//catch (Exception)
//{

//    throw;
//}
#endregion

#region RabbitMQ Topic
try
{
    var factory = new ConnectionFactory(); //实例化工厂
    factory.HostName = "localhost";
    factory.UserName = "guest";
    factory.Password = "guest";

    string ExchangeName = "Exchange1";
    string queueName1 = "Test1";
    string queueName2 = "Test2";
    string queueName3 = "Test3";
    using (var connection = factory.CreateConnection()) //创建链接
    {
        using (var channel = connection.CreateModel())  //创建通道
        {
            //channel.ExchangeDelete(ExchangeName);
            //channel.QueueDelete(queueName1);
            //channel.QueueDelete(queueName2);
            //channel.QueueDelete(queueName3);
            IBasicProperties properties = channel.CreateBasicProperties(); //消息持久化
            properties.Persistent = true;


            channel.ExchangeDeclare(exchange: ExchangeName, type: ExchangeType.Topic, durable: true, autoDelete: false, arguments: null);
            channel.QueueDeclare(queueName1, durable: true, autoDelete: false, exclusive: false, arguments: null);
            channel.QueueBind(queueName1, ExchangeName, routingKey: queueName1);
            for (int i = 0; i < 10; i++)
            {
                string message = queueName1 + i.ToString();
                var body = Encoding.UTF8.GetBytes(message); //转换为字节序列
                channel.BasicPublish(
                    exchange: ExchangeName,
                    routingKey: queueName1,
                    basicProperties: properties, //消息持久化, DeliveryMode=2 (Non-persistent (1) or persistent (2))
                    body: body);//注意路由键在用direct交换器时,要指定为队列名
                Console.WriteLine("set {0}", message);
            }

            channel.QueueDeclare(queueName2, durable: true, autoDelete: false, exclusive: false, arguments: null);
            channel.QueueBind(queueName2, ExchangeName, routingKey: queueName2);
            for (int i = 0; i < 10; i++)
            {
                string message = queueName2 + i.ToString();
                var body = Encoding.UTF8.GetBytes(message); //转换为字节序列
                channel.BasicPublish(
                    exchange: ExchangeName,
                    routingKey: queueName2,
                    basicProperties: properties, //消息持久化, DeliveryMode=2 (Non-persistent (1) or persistent (2))
                    body: body);//注意路由键在用direct交换器时,要指定为队列名
                Console.WriteLine("set {0}", message);
            }

            channel.QueueDeclare(queueName3, durable: true, autoDelete: false, exclusive: false, arguments: null);
            channel.QueueBind(queueName3, ExchangeName, routingKey: queueName3);
            for (int i = 0; i < 10; i++)
            {
                string message = queueName3 + i.ToString();
                var body = Encoding.UTF8.GetBytes(message); //转换为字节序列
                channel.BasicPublish(
                    exchange: ExchangeName,
                    routingKey: queueName3,
                    basicProperties: properties, //消息持久化, DeliveryMode=2 (Non-persistent (1) or persistent (2))
                    body: body);//注意路由键在用direct交换器时,要指定为队列名
                Console.WriteLine("set {0}", message);
            }
        }
    }
    Console.ReadLine();
}
catch (Exception)
{

    throw;
}
#endregion
direct、Fanout、Topic 三种交换机
原文地址:https://www.cnblogs.com/Jacob-Wu/p/10267771.html