RabbitMQ笔记-Demo(C#)

创建Connection,推荐使用长连接,长连接基础上创建多个通道

        public void CreateConnection()
        {
            this.ConnectionFactory = new ConnectionFactory
            {
                HostName = "xx.xx.xx.xx",
                //Port = 5672,
                UserName = "admin",
                Password = "admin",
                VirtualHost = "my_vhost"
            };
            this.connection = this.ConnectionFactory.CreateConnection();

            //方式2
            //ConnectionFactory factory = new ConnectionFactory();
            //factory.Uri = new Uri("amqp://user:pass@hostName:port/vhost");
            //this.connection = factory.CreateConnection();
        }

Direct交换机案例

private void btnRouteKeyPublish_Click(object sender, EventArgs e)
        {
            string exchangeName = "myexchange1";
            string queueName_logElse = "log_else";
            string queueName_logError = "log_error";

            //2:创建channel
            using (var channel = connection.CreateModel())
            {
                //创建交换机
                channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, true, false, null);
                //创建队列
                channel.QueueDeclare(queueName_logElse, true, false, false, null);
                channel.QueueDeclare(queueName_logError, true, false, false, null);
                //绑定
                channel.QueueBind(queueName_logElse, exchangeName, "info", null);
                channel.QueueBind(queueName_logElse, exchangeName, "debug", null);
                channel.QueueBind(queueName_logElse, exchangeName, "warn", null);
                channel.QueueBind(queueName_logError, exchangeName, "error", null);

                //发布info消息
                for (int i = 0; i < 10; i++)
                {
                    var msg = Encoding.UTF8.GetBytes($"{i}:haha-info");
                    channel.BasicPublish(exchangeName, "info", null, msg);
                }
                //发布debug消息
                for (int i = 0; i < 10; i++)
                {
                    var msg = Encoding.UTF8.GetBytes($"{i}:haha-debug");
                    channel.BasicPublish(exchangeName, "debug", null, msg);
                }
                //发布warn消息
                for (int i = 0; i < 10; i++)
                {
                    var msg = Encoding.UTF8.GetBytes($"{i}:haha-warn");
                    channel.BasicPublish(exchangeName, "warn", null, msg);
                }
                //发布error消息
                for (int i = 0; i < 10; i++)
                {
                    var msg = Encoding.UTF8.GetBytes($"{i}:haha-error");
                    channel.BasicPublish(exchangeName, "error", null, msg);
                }
            }
        }

Topic交换机案例

       private void btnTopicPublish_Click(object sender, EventArgs e)
        {
            string exchangeName = "myTopicExchange1";
            string queueName1 = "topic_queue1";
            string queueName2 = "topic_queue2";

            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare("myTopicExchange1", ExchangeType.Topic, true, false, null);
                channel.QueueDeclare(queueName1, true, false, false, null);
                channel.QueueDeclare(queueName2, true, false, false, null);
                channel.QueueBind(queueName1, exchangeName, "#.cn", null);
                channel.QueueBind(queueName2, exchangeName, "*.cn", null);

                //发布info消息(消息会发送到两个队列,因为都匹配)
                for (int i = 0; i < 100; i++)
                {
                    var msg = Encoding.UTF8.GetBytes($"{i}:haha");
                    channel.BasicPublish(exchangeName, "fan.cn", null, msg);
                }
            }
        }

Header交换机案例

        private void btnHeadersPublish_Click(object sender, EventArgs e)
        {
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare("myHeadersExchange1", ExchangeType.Headers, true, false, null);
                channel.QueueDeclare("headers_queue1", true, false, false, null);
                channel.QueueBind("headers_queue1", "myHeadersExchange1", string.Empty, new Dictionary<string, object>() {
                        {"x-match","all" },//any
                        { "username","fan"},
                        { "password","123456"}
                    });
                //properties
                var properties = channel.CreateBasicProperties();
                properties.Headers = new Dictionary<string, object>();
                properties.Headers.Add("username", "fan");
                properties.Headers.Add("password", "123456");
                properties.Persistent = true;//消息持久化
                                             //发布info消息
                for (int i = 0; i < 100; i++)
                {
                    var msg = Encoding.UTF8.GetBytes($"{i}:haha");
                    channel.BasicPublish("myHeadersExchange1", string.Empty, properties, msg);
                }
            }
        }

Demo1.使用默认交换机收发消息(省略了创建交换机,实际使用的是默认交换机,所有创建的队列都隐式绑定默认交换机)

//发消息
   using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "hello",
                                     durable: true,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);
                channel.BasicPublish(exchange: "",
                                             routingKey: "hello",
                                             basicProperties: null,
                                             body: Encoding.UTF8.GetBytes("Hello World!"));//默认交换机的名字就是""
            }
//收消息
using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "hello",
                                     durable: true,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);
                var result = channel.BasicGet("hello", true);
                string msg = Encoding.UTF8.GetString(result.Body);
            }

Default Exchange:
Default Exchange 其实是AMQP中预先声明的,属于Direct类型,Default Exchange 的名是 "";
他有一个特殊的属性,当你手动创建一个队列时,MQ会自动将这个队列绑定到Default Exchange 上,绑定时 RoutingKey 与队列名称相同
默认交换器隐式绑定到每个队列,路由键等于队列名称。不能显式地绑定到默认交换器,也不能从默认交换器解除绑定。它也不能被删除。

Demo2.消费消息两种方式

        /// <summary>
        /// 手动拉消息
        /// </summary>
        /// <param name="queueName"></param>
        private void ConsumePullQueue(string queueName)
        {
            var channel = connection.CreateModel();
            var result = channel.BasicGet(queueName, false);//autoACK:false 开启手动确认
            try
            {
                //处理消息
                MessageBox.Show(Encoding.UTF8.GetString(result.Body));
                //手动确认
                channel.BasicAck(result.DeliveryTag, false);
            }
            catch
            {
                //退回
                channel.BasicRecover(true);
            }
            //直接扔了
            //channel.BasicReject(result.DeliveryTag, true);
            //否认确认
            //channel.BasicNack()
        }
        /// <summary>
        /// 自动推消息
        /// 如果多个消费者订阅一个队列,将轮询获取消息
        /// </summary>
        /// <param name="queueNames"></param>
        private void ConsumeEventQueue(string queueName)
        {
            var channel = connection.CreateModel();
            //开启QOS并行限制,每次发送一条,ack后再发送一条
            channel.BasicQos(0, 1, false);
            //通过事件订阅方式消费队列
            EventingBasicConsumer consumer1 = new EventingBasicConsumer(channel);
            consumer1.Received += (sender1, e1) =>
            {
                MessageBox.Show(Encoding.UTF8.GetString(e1.Body));
                channel.BasicAck(e1.DeliveryTag, false);//手动确认
            };
            channel.BasicConsume(queueName, false, consumer1);//开始消费.(autoACK:false 手动确认)
        }

Demo3.生产端消息确认、事务执行

//confirm确认(推荐)
            using (var channel = connection.CreateModel())
            {
                var properties = channel.CreateBasicProperties();
                properties.Persistent = true;//消息持久化
                channel.ConfirmSelect();//将信道设置成confirm模式
                                        //5:发布消息
                for (int i = 0; i < 100; i++)
                {
                    var msg = Encoding.UTF8.GetBytes($"{i}:haha");
                    channel.BasicPublish(exchangeName, routingKey, properties, msg);
                }
                bool isSuccess = channel.WaitForConfirms();//是否发布成功
            }
//事务执行(低效,不推荐)
            using (var channel = connection.CreateModel())
            {
                var properties = channel.CreateBasicProperties();
                properties.Persistent = true;//消息持久化
                channel.TxSelect();//将信道设置成事务模式
                try
                {
                    for (int i = 0; i < 100; i++)
                    {
                        var msg = Encoding.UTF8.GetBytes($"{i}:haha");
                        channel.BasicPublish(exchangeName, routingKey, properties, msg);
                    }
                    channel.TxCommit();
                }
                catch
                {
                    channel.TxRollback();
                }
            }

参考:
https://www.rabbitmq.com/getstarted.html

原文地址:https://www.cnblogs.com/fanfan-90/p/13369531.html