rabbitmq系列——(4 Exchange Type -- Headers)

  // headers类型的exchange不依赖于routing key、binding key的匹配规则来路由消息,
  // 而是根据发送消息内容中的headers属性进行匹配。
  // 在绑定queue、exchange时指定一组键值对以及x - match参数,
  // x-match参数是字符串类型,可以设置为any、all.
  // any,表示只要匹配到headers表中的任何一个键值即可,
  // all,表示需要全部匹配;

1. 生产者

using RabbitMQMsgProducer.MessageProducer;
using Microsoft.Extensions.Configuration;
using System;
using System.IO;
using RabbitMQMsgProducer.ExchangeDemo;

namespace RabbitMQMsgProducer
{
    class Program
    {
        static void Main(string[] args)
        {
            try
            {
                {
                    // ExchangeType: Headers
                    // headers类型的exchange不依赖于routing key、binding key的匹配规则来路由消息,
                    // 而是根据发送消息内容中的headers属性进行匹配。
                    // 在绑定queue、exchange时指定一组键值对以及x - match参数,
                    // x-match参数是字符串类型,可以设置为any、all.
                    // any,表示只要匹配到headers表中的任何一个键值即可,
                    // all,表示需要全部匹配;
                    ProducerHeadersExchange.Send();
                }
                Console.ReadLine();
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }
    }
}
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;

namespace RabbitMQMsgProducer.ExchangeDemo
{
    public class ProducerHeadersExchange
    {
        public static void Send()
        {
            var factory = new ConnectionFactory();
            factory.HostName = "localhost";//服务地址
            factory.UserName = "guest";//用户名
            factory.Password = "guest";//密码 
            string allExchangeName = "AllHeadersExchange";
            string anyExchangeName = "AnyHeadersExchange";
            string allHeadersQueueName = "AllHeadersQueue";
            string anyHeadersQueueName = "AnyHeadersQueue";
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    {
                        // exchange 
                        channel.ExchangeDeclare(exchange: allExchangeName, type: ExchangeType.Headers, durable: true, autoDelete: false, arguments: null);
                        // 队列
                        channel.QueueDeclare(queue: allHeadersQueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);

                        // 绑定
                        channel.QueueBind(queue: allHeadersQueueName, exchange: allExchangeName, routingKey: string.Empty,
                            arguments: new Dictionary<string, object> {
                                                                    { "x-match","all"},
                                                                    { "aaa","111"},
                                                                    { "bbb","222"}});

                        Console.WriteLine("******** all is ok , send some message .************");
                        {
                            string msg = "all : aaa and bbb is ok.";
                            var props = channel.CreateBasicProperties();
                            props.Headers = new Dictionary<string, object>() {
                                                                           { "aaa","111"},
                                                                           { "bbb","222"}
                                                                          };
                            var body = Encoding.UTF8.GetBytes(msg);
                            //基本发布
                            channel.BasicPublish(exchange: allExchangeName, routingKey: string.Empty, basicProperties: props, body: body);
                            Console.WriteLine($"the message : {msg} is send.");
                        }
                        {
                            string msg = "all : aaa is ok and bbb isn't ok.";
                            var props = channel.CreateBasicProperties();
                            props.Headers = new Dictionary<string, object>() {
                                                                           { "aaa","111"},
                                                                           { "bbb","333"}
                                                                          };
                            var body = Encoding.UTF8.GetBytes(msg);
                            channel.BasicPublish(exchange: allExchangeName, routingKey: string.Empty, basicProperties: props, body: body);
                            Console.WriteLine($"the message : {msg} is send.");
                        }
                    }
                    {
                        // exchange 
                        channel.ExchangeDeclare(exchange: anyExchangeName, type: ExchangeType.Headers, durable: true, autoDelete: false, arguments: null);
                        // 声明queue
                        channel.QueueDeclare(queue: anyHeadersQueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
                        // 队列重新绑定
                        channel.QueueBind(queue: anyHeadersQueueName, exchange: anyExchangeName, routingKey: string.Empty,
                            arguments: new Dictionary<string, object> {
                                            { "x-match","any"},
                                            { "aaa","111"},
                                            { "bbb","222"}});
                        Console.WriteLine("******** any is ok , send some message .************");
                        {


                            string msg = "any : aaa and bbb is ok.";
                            var props = channel.CreateBasicProperties();
                            props.Headers = new Dictionary<string, object>() {
                                                 { "aaa","111"},
                                                 { "bbb","222"}
                                            };
                            var body = Encoding.UTF8.GetBytes(msg);
                            channel.BasicPublish(exchange: anyExchangeName, routingKey: string.Empty, basicProperties: props, body: body);
                            Console.WriteLine($"the message : {msg} is send.");
                        }
                        {
                            string msg = "any : aaa is ok and bbb isn't ok.";
                            var props = channel.CreateBasicProperties();
                            props.Headers = new Dictionary<string, object>() {
                                                 { "aaa","111"},
                                                 { "bbb","333"}
                                            };
                            var body = Encoding.UTF8.GetBytes(msg);
                            channel.BasicPublish(exchange: anyExchangeName, routingKey: string.Empty, basicProperties: props, body: body);
                            Console.WriteLine($"the message : {msg} is send.");
                        }
                        {
                            string msg = "any : aaa isn't ok and bbb isn't ok.";
                            var props = channel.CreateBasicProperties();
                            props.Headers = new Dictionary<string, object>() {
                                                 { "aaa","222"},
                                                 { "bbb","333"}
                                            };
                            var body = Encoding.UTF8.GetBytes(msg);
                            channel.BasicPublish(exchange: anyExchangeName, routingKey: string.Empty, basicProperties: props, body: body);
                            Console.WriteLine($"the message : {msg} is send.");
                        }

                    }
                }
            }
            Console.ReadKey();
        }
    }
}

2. 消费者

2.1 消费者001

using RabbitMQMsgConsumer001.ExchangeDemo;
using RabbitMQMsgConsumer001.MessageConsumer;
using System;
using System.Threading.Tasks;

namespace RabbitMQMsgConsumer001
{
    class Program
    {
        static void Main(string[] args)
        {
            try
            {
                {
                    // ExchangeType: Headers
                    // headers类型的exchange不依赖于routing key、binding key的匹配规则来路由消息,
                    // 而是根据发送消息内容中的headers属性进行匹配。
                    // 在绑定queue、exchange时指定一组键值对以及x - match参数,
                    // x-match参数是字符串类型,可以设置为any、all.
                    // any,表示只要匹配到headers表中的任何一个键值即可,
                    // all,表示需要全部匹配;
                    ConsumerHeadersExchange.Receive();
                }
                Console.ReadLine();
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }
    }
}
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;

namespace RabbitMQMsgConsumer001.ExchangeDemo
{
    public class ConsumerHeadersExchange
    {
        public static void Receive()
        {
            var factory = new ConnectionFactory();
            factory.HostName = "localhost";//服务地址
            factory.UserName = "guest";//用户名
            factory.Password = "guest";//密码 
            string allExchangeName = "AllHeadersExchange";
            string allHeadersQueueName = "AllHeadersQueue";
            //string anyHeadersQueueName = "AnyHeadersQueue";
            using (var connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    // 声明exchange
                    channel.ExchangeDeclare(exchange: allExchangeName, type: ExchangeType.Headers, durable: true, autoDelete: false, arguments: null);
                    // 声明queue
                    channel.QueueDeclare(queue: allHeadersQueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
                    // 绑定
                    channel.QueueBind(queue: allHeadersQueueName, exchange: allExchangeName, routingKey: string.Empty, arguments: new Dictionary<string, object> {
                        { "x-match","all"},
                        { "aaa","111"},
                        { "bbb","222"}
                    });

                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var msg = Encoding.UTF8.GetString(body.ToArray());
                        Console.WriteLine($"the consumer 001 is receive : {msg} .");
                        ////手动确认  消息正常消费  告诉Broker:你可以把当前这条消息删除掉了
                        //channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };
                    // 处理消息
                    //channel.BasicConsume(queue: allHeadersQueueName, autoAck: false, consumer: consumer);
                    channel.BasicConsume(queue: allHeadersQueueName, autoAck: true, consumer: consumer);
                    Console.WriteLine($"the message is all headers .");
                }
            }
        }
    }
}

2.2 消费者002

using RabbitMQMsgConsumer002.ExchangeDemo;
using System;

namespace RabbitMQMsgConsumer002
{
    class Program
    {
        static void Main(string[] args)
        {
            try
            {
                {
                    // ExchangeType: Headers
                    // headers类型的exchange不依赖于routing key、binding key的匹配规则来路由消息,
                    // 而是根据发送消息内容中的headers属性进行匹配。
                    // 在绑定queue、exchange时指定一组键值对以及x - match参数,
                    // x-match参数是字符串类型,可以设置为any、all.
                    // any,表示只要匹配到headers表中的任何一个键值即可,
                    // all,表示需要全部匹配;
                    ConsumerHeadersExchange.Receive();
                }
                Console.ReadLine();
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }
    }
}
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;

namespace RabbitMQMsgConsumer002.ExchangeDemo
{
    public class ConsumerHeadersExchange
    {
        public static void Receive()
        {
            var factory = new ConnectionFactory();
            factory.HostName = "localhost";//服务地址
            factory.UserName = "guest";//用户名
            factory.Password = "guest";//密码 
            string anyExchangeName = "AnyHeadersExchange";
            //string allHeadersQueueName = "AllHeadersQueue";
            string anyHeadersQueueName = "AnyHeadersQueue";
            using (var connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    // 声明exchange
                    channel.ExchangeDeclare(exchange: anyExchangeName, type: ExchangeType.Headers, durable: true, autoDelete: false, arguments: null);
                    // 声明queue
                    channel.QueueDeclare(queue: anyHeadersQueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
                    // 绑定
                    channel.QueueBind(queue: anyHeadersQueueName, exchange: anyExchangeName, routingKey: string.Empty, arguments: new Dictionary<string, object> {
                        { "x-match","any"},
                        { "aaa","111"},
                        { "bbb","222"}
                    });

                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var msg = Encoding.UTF8.GetString(body.ToArray());
                        Console.WriteLine($"the consumer 002 is receive : {msg} .");
                        ////手动确认  消息正常消费  告诉Broker:你可以把当前这条消息删除掉了
                        //channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };
                    // 处理消息
                    //channel.BasicConsume(queue: anyHeadersQueueName, autoAck: false, consumer: consumer);
                    channel.BasicConsume(queue: anyHeadersQueueName, autoAck: true, consumer: consumer);
                    Console.WriteLine($"the message is any headers .");
                }
            }
        }
    }
}

3. 结果

原文地址:https://www.cnblogs.com/Fletcher/p/14189961.html