RabbitMQ学习笔记

一、开发环境搭建

1.安装Erlang环境

下载地址:OTP 18.3 Windows 64-bit Binary File

2.安装RabbitMQ服务端

下载地址:Windows

打开命令行工具,进入RabbitMQ目录下的sbin文件夹下,输入以下命令:

以服务的形式安装RabbitMQ

rabbitmq-service install

启动RabbitMQ服务

rabbitmq-service start

RabbitMQ所在的路径不能存在空格,否则会出现莫名其妙的错误。

3.下载RabbitMQ的客户端程序集。

下载地址:rabbitmq-dotnet-client-3.6.1-dotnet-4.5.zip

解压得到的RabbitMQ.Client.dll就是客户端的dll

4.启用管理界面工具

rabbitmq-plugins enable rabbitmq_management

在浏览器中输入地址:http://localhost:15672/#/可以访问管理页面

 

一、消息队列中消息的整体处理流程及名词解析

RabbitMQ处理消息流程图

 二、Exchange三种常用的模式

Fanout模式

    /// <summary>
    /// 生产者
    /// </summary>
    class Program
    {
        static void Main(string[] args)
        {
            var factor = new ConnectionFactory() { Uri = "amqp://admin:123456789@localhost:5672/Foo" };
            using (var connection = factor.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    var exchangeName = "exchange_Fanout";
                    channel.ExchangeDeclare(exchangeName, "fanout");
                    string message = "Hello,World";
                    channel.BasicPublish(exchangeName, "", null, Encoding.UTF8.GetBytes(message));
                    Console.WriteLine("消息{0}被发送", message);
                }
            }
            Console.ReadKey();
        }
    }
    /// <summary>
    /// 消费端
    /// </summary>
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { Uri = "amqp://admin:123456789@localhost:5672/Foo" };
            var connection = factory.CreateConnection();
            using (var channel = connection.CreateModel())
            {
                var queueName = channel.QueueDeclare().QueueName;
                channel.ExchangeDeclare(exchange: "exchange_Fanout", type: "fanout");
                channel.QueueBind(queueName, "exchange_Fanout", "");
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    Console.WriteLine("接受到消息:{0}", Encoding.UTF8.GetString(ea.Body));
                };
                channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer);
                Console.ReadLine();//阻断主线程,否则channel将会被释放
            }
            Console.ReadKey();
        }
    }

Direct模式 

    /// <summary>
    /// 生产者
    /// </summary>
    class Program
    {
        static string queueName = "queue_direct";
        static string exchangeName = "exchange_direct";
        static string routingKey = "Hello";
        static void Main(string[] args)
        {
            var factor = new ConnectionFactory() { Uri = "amqp://admin:123456789@localhost:5672/Foo" };
            using (var connection = factor.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    //声明持久化队列
                    channel.QueueDeclare(queueName, true, false, false, null);
                    //声明持久化转发器
                    channel.ExchangeDeclare(exchangeName, "direct", true, false, null);
                    //绑定转发器和队列
                    channel.QueueBind(queueName, exchangeName, routingKey, null);
                    var property = channel.CreateBasicProperties();
                    //消息持久化
                    property.DeliveryMode = 2;
                    string message = "Hello,World";
                    channel.BasicPublish(exchangeName, routingKey, property, Encoding.UTF8.GetBytes(message));
                    Console.WriteLine("消息{0}被发送", message);
                }
            }
            Console.ReadKey();
        }
    }
    /// <summary>
    /// 消费端
    /// </summary>
    class Program
    {
        static string queueName = "queue_direct";
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { Uri = "amqp://admin:123456789@localhost:5672/Foo" };
            var connection = factory.CreateConnection();
            using (var channel = connection.CreateModel())
            {
                //声明持久化队列
                channel.QueueDeclare(queueName, true, false, false, null);
                //设置最大服务转发消息数量
                channel.BasicQos(0, 1, false);
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    Console.WriteLine("接受到消息:{0}", Encoding.UTF8.GetString(ea.Body));
                    //消息应答
                    channel.BasicAck(ea.DeliveryTag, false);
                };
                channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer);
                Console.ReadLine();
            }
            Console.ReadKey();
        }
    }

Topic模式 

    /// <summary>
    /// 生产者
    /// </summary>
    class Program
    {
        static string exchangeName = "exchange_topic";
        static string queueName = "queue_topic";
        static void Main(string[] args)
        {
            var factor = new ConnectionFactory() { Uri = "amqp://admin:123456789@localhost:5672/Foo" };
            using (var connection = factor.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchangeName, "topic", true, false, null);
                    channel.QueueDeclare(queueName, true, false, false, null);
                    channel.QueueBind(queueName, exchangeName, "Hello.Topic.*");
                    var property = channel.CreateBasicProperties();
                    //消息持久化
                    property.DeliveryMode = 2;
                    string message = "Hello,World";
                    channel.BasicPublish(exchangeName, "Hello.Topic.World", property, Encoding.UTF8.GetBytes(message));
                    Console.WriteLine("消息{0}被发送", message);
                }
            }
            Console.ReadKey();
        }
    }
    /// <summary>
    /// 消费端
    /// </summary>
    class Program
    {
        static string queueName = "queue_topic";
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { Uri = "amqp://admin:123456789@localhost:5672/Foo" };
            var connection = factory.CreateConnection();
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queueName, true, false, false, null);
                channel.BasicQos(0, 1, false);
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    Console.WriteLine("接受到消息:{0}", Encoding.UTF8.GetString(ea.Body));
                    channel.BasicAck(ea.DeliveryTag, false);
                };
                channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer);
                Console.ReadLine();
            }
            Console.ReadKey();
        }
    }
原文地址:https://www.cnblogs.com/Jabben_Yi/p/5334340.html