rabbitmq (三) 发布/订阅

rabbitmq的目的并不是让生产者把消息直接发到队列里面去,

这样不能实现解耦的目的,也不利于程序的扩展.

所以就有交换机(exchanges)的概念.

交换机有几种类型:direct, topic, headers 和fanout,

可以为交换机命名,还有一种没有命名的交换机,上几章的消息都是发布到没有命名的交换机.

channel.ExchangeDeclare(exchange: "logs", type: "fanout");
var
message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body);

感觉交换机主要控制消息的投递方式.

临时队列:

可以通过创建队列的方式对消息的存储等方式进行管理.

var queueName = channel.QueueDeclare().QueueName;

绑定:

最后通过绑定的方式把交换机和队列进行关联.

channel.QueueBind(queue: queueName,
                                  exchange: "logs",
                                  routingKey: "");

publish:

using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace RMQ_Publish
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "your host name", UserName = "wc", Password = "wc" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "logs", type: "fanout");

                var message = GetMessage(args);
                int i = 100;
                while (true)
                {
                    var body = Encoding.UTF8.GetBytes(message + ":" + i.ToString());
                    channel.BasicPublish(exchange: "logs",
                                         routingKey: "",
                                         basicProperties: null,
                                         body: body);
                    Console.WriteLine(" [x] Sent {0}", message);
                    
                    if (i-- == 0)
                        i = 100;
                }
            }

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }

        private static string GetMessage(string[] args)
        {
            return ((args.Length > 0)
                   ? string.Join(" ", args)
                   : "info: Hello World!");
        }
    }
}

subscribe:

var factory = new ConnectionFactory() { HostName = "your host name", UserName = "wc", Password = "wc" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "logs", type: "fanout");

                var queueName = channel.QueueDeclare().QueueName;
                channel.QueueBind(queue: queueName,
                                  exchange: "logs",
                                  routingKey: "");

                Console.WriteLine(" [*] Waiting for logs.");

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    
                    Console.WriteLine(" [x] {0}", message);
                    Thread.Sleep(100);
                };
                channel.BasicConsume(queue: queueName,
                                     autoAck: true,
                                     consumer: consumer);

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
原文地址:https://www.cnblogs.com/weichao975/p/8079381.html