学习rabbitmq (二) 使用rabbitmq <完结>

以为rabbitmq会折腾很久,但没有想到就这么多点内容,主要是服务端的懒得去折腾,比如docker的转移啊,发布啊,部署啥的

今天写了一些代码,用的c#弄的,新建两个项目,一个sender,一个rec,需要说的都在代码里了

就说一下在vs里安装rabbitmq的client,如果看不懂,也懒得说了

以下是发送端的代码,就一个窗体

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Windows.Forms;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace rabbitmq_example_sender
{
    public partial class Form1 : Form
    {
        public Form1()
        {
            InitializeComponent();
        }

        private void button1_Click(object sender, EventArgs e)
        {
            //简单队列模式
            var factory = new ConnectionFactory();
            factory.HostName = "192.168.165.146";//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
            factory.UserName = "guest";                 //RabbitMQ 连接用户名,默认为 guest
            factory.Password = "guest";                 //RabbitMQ 连接密码,默认为 guest

            //创建连接对象
            using (var connection = factory.CreateConnection())
            {
                //创建一个新的通道、会话和模型
                using (var channel = connection.CreateModel())
                {
                    /*
                    * 创建一个名为 myQueue1 的消息队列,如果名称相同不会重复创建,参数解释:
                    * 参1:myQueue1, 消息队列名称;
                     * 参2:false, 是否持久化,持久化的队列会存盘,服务器重启后任然存在;
                    * 参3:false, 是否为排他队列,排他队列表示仅对首次声明它的连接可见,并在连接断开时自动删除。这种队列适用于一个客户端同时发送和读取消息的应用场景。
                     * 参4:false, 是否自动删除,自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。
                     * 参5:设置队列的其他一些参数,如 x-rnessage-ttl、x-expires、x-rnax-length、x-rnax-length-bytes、x-dead-letter-exchange、x-deadletter-routing-key、x-rnax-priority 等。
                     */
                    channel.QueueDeclare("myQueue1", false, false, false, null);
                    var properties = channel.CreateBasicProperties();
                    properties.DeliveryMode = 1;
  
                    for (int i = 0; i < 10000; i++)
                    {
                        string message = $"Hello RabbitMQ {i}";
                        byte[] body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish("", "myQueue1", properties, body);
                        Console.WriteLine($"Send: {message}, {DateTime.Now.ToString("HH:mm:ss fff")}");
                        System.Threading.Thread.Sleep(1000);    //间隔1秒钟发送一次
                    }

                }

            }
        }

        private void button2_Click(object sender, EventArgs e)
        {
            
            
        }

        private void button2_Click_1(object sender, EventArgs e)
        {
            //工作队列模式
            var factory = new ConnectionFactory();
            factory.HostName = "192.168.165.146";//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
            factory.UserName = "guest";                 //RabbitMQ 连接用户名,默认为 guest
            factory.Password = "guest";                 //RabbitMQ 连接密码,默认为 guest

            //创建连接对象
            using (var connection = factory.CreateConnection())
            {
                //创建一个新的通道、会话和模型
                using (var channel = connection.CreateModel())
                {
                    /*
                    * 创建一个名为 myQueue1 的消息队列,如果名称相同不会重复创建,参数解释:
                    * 参1:myQueue1, 消息队列名称;
                     * 参2:false, 是否持久化,持久化的队列会存盘,服务器重启后任然存在;
                    * 参3:false, 是否为排他队列,排他队列表示仅对首次声明它的连接可见,并在连接断开时自动删除。这种队列适用于一个客户端同时发送和读取消息的应用场景。
                     * 参4:false, 是否自动删除,自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。
                     * 参5:设置队列的其他一些参数,如 x-rnessage-ttl、x-expires、x-rnax-length、x-rnax-length-bytes、x-dead-letter-exchange、x-deadletter-routing-key、x-rnax-priority 等。
                     */
                      
                    bool durable = false;
                    //上面这个参数,我不知道为什么要放在生产端,而且还要求消费端用一样的参数
                    //另外一点跟DeliveryMode或Persistent都是搞持久化的,网上的讲得感觉很模乎,
                    //我理解的是这样的:消息->队列->交换器 ,这三个内容都需要确认是否持久化,也就是说DeliveryMode或Persistent
                    //是决定这个消息是否持久化,QueueDeclare的durable是决定队列是否持久化,而申明exchange 交换器的时候还有一个exchangeDeclare的durable,
                    //是决定这个交换器的持久化
                    //https://blog.csdn.net/u013256816/article/details/60875666/ 只有这个看下是否能理解了

                    channel.QueueDeclare("myQueue1", durable, false, false, null);
                    var properties = channel.CreateBasicProperties();
                    properties.DeliveryMode =1;
                    //properties.Persistent = false; //跟上面的其实是一样的,1和false    2和true
                    for (int i = 0; i < 100; i++)
                    {
                        string message = $"Hello RabbitMQ {i}";
                        byte[] body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish("", "myQueue1", properties, body);
                        Console.WriteLine($"Send: {message}, {DateTime.Now.ToString("HH:mm:ss fff")}");
                        System.Threading.Thread.Sleep(100);    //间隔1秒钟发送一次
                    }

                }

            }
        }

         
        private void button3_Click(object sender, EventArgs e)
        {
            String EXCHANGE_NAME = "fanout_exchange";
            //交换器(fanout)
            var factory = new ConnectionFactory();
            factory.HostName = "192.168.165.146";//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
            factory.UserName = "guest";                 //RabbitMQ 连接用户名,默认为 guest
            factory.Password = "guest";                 //RabbitMQ 连接密码,默认为 guest

            //创建连接对象
            using (var connection = factory.CreateConnection())
            {
                //创建一个新的通道、会话和模型
                using (var channel = connection.CreateModel())
                {
                    bool durable = false;
                    //3.声明交换器     
                    channel.ExchangeDeclare(EXCHANGE_NAME, ExchangeType.Fanout, durable);
 
                    var properties = channel.CreateBasicProperties();
                    properties.DeliveryMode = 1;
                    //properties.Persistent = false; //跟上面的其实是一样的,1和false    2和true
                    for (int i = 0; i < 50; i++)
                    {
                        string message = $"Hello RabbitMQ {i}";
                        byte[] body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(EXCHANGE_NAME, "", properties, body);
                        Console.WriteLine($"Send: {message}, {DateTime.Now.ToString("HH:mm:ss fff")}");
                        System.Threading.Thread.Sleep(100);    //间隔1秒钟发送一次
                    }

                }

            }
        }

        private void button4_Click(object sender, EventArgs e)
        {
            String EXCHANGE_NAME = "direct_exchange";
            //路由(direct)
            var factory = new ConnectionFactory();
            factory.HostName = "192.168.165.146";//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
            factory.UserName = "guest";                 //RabbitMQ 连接用户名,默认为 guest
            factory.Password = "guest";                 //RabbitMQ 连接密码,默认为 guest

            //创建连接对象
            using (var connection = factory.CreateConnection())
            {
                //创建一个新的通道、会话和模型
                using (var channel = connection.CreateModel())
                {
                    bool durable = false;
                    //3.声明交换器     Direct 才会处理 路由key
                    channel.ExchangeDeclare(EXCHANGE_NAME, ExchangeType.Direct, durable);

                    var properties = channel.CreateBasicProperties();
                    properties.DeliveryMode = 1;
                    //properties.Persistent = false; //跟上面的其实是一样的,1和false    2和true
                    for (int i = 0; i < 50; i++)
                    {
                        string message = $"Hello RabbitMQ {i}";
                        byte[] body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(EXCHANGE_NAME, "update", properties, body);
                        Console.WriteLine($"Send: {message}, {DateTime.Now.ToString("HH:mm:ss fff")}");
                        System.Threading.Thread.Sleep(100);    //间隔1秒钟发送一次
                    }

                }

            }
        }

        private void button5_Click(object sender, EventArgs e)
        {
            //这个不想写了,就是路由的key支持通配符
        }
    }
}

以下是接收端的代码

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace rabbitmq_example_rec
{
    class Program
    {
        static void rec_simple()
        {
            var factory = new ConnectionFactory();
            factory.HostName = "192.168.165.146";
            factory.UserName = "guest";
            factory.Password = "guest";
            //不能放入 using 语句中,否则当 using 语句结束后会 Close 连接,EventingBasicConsumer.Received 事件将不会被触发

            var connection = factory.CreateConnection();
            var channel = connection.CreateModel();
            channel.QueueDeclare("myQueue1", false, false, false, null);
            var consumer = new EventingBasicConsumer(channel);  //消费者(指定消息通道)
            channel.BasicConsume("myQueue1", true, consumer);   //消费消息(在当前通道中监听 myQueue1 队列,并进行消费)
            //该事件在接收到消息时触发
            consumer.Received += (sender, e) =>
            {
                byte[] body = e.Body.ToArray();   //消息字节数组
                string message = Encoding.UTF8.GetString(body); //消息内容

                Console.WriteLine($"Received: {message}, {DateTime.Now.ToString("HH: mm:ss fff")}");
            };

            Console.ReadLine();
            connection.Close();
            channel.Close();
        }
        static void rec_work1()
        {
            var factory = new ConnectionFactory();
            factory.HostName = "192.168.165.146";
            factory.UserName = "guest";
            factory.Password = "guest";
            //不能放入 using 语句中,否则当 using 语句结束后会 Close 连接,EventingBasicConsumer.Received 事件将不会被触发

            var connection = factory.CreateConnection();
            var channel = connection.CreateModel();
            channel.QueueDeclare("myQueue1", false, false, false, null);
            channel.BasicQos(0, 1, false); //该方法可以看出休息小半秒和一秒的区别
  
            var consumer = new EventingBasicConsumer(channel);  //消费者(指定消息通道)
            
            //该事件在接收到消息时触发
            consumer.Received += (sender, e) =>
            {
                byte[] body = e.Body.ToArray();   //消息字节数组
                string message = Encoding.UTF8.GetString(body); //消息内容

                Console.WriteLine($"rec_work1 Rec: {message}");
                channel.BasicAck(e.DeliveryTag, false);   //手工确认
                Thread.Sleep(200);
            };
            channel.BasicConsume("myQueue1", false, consumer);   //消费消息(在当前通道中监听 myQueue1 队列,并进行消费)
            Console.ReadLine();
            connection.Close();
            channel.Close();
        }
        static void rec_work2()
        {
            var factory = new ConnectionFactory();
            factory.HostName = "192.168.165.146";
            factory.UserName = "guest";
            factory.Password = "guest";
            //不能放入 using 语句中,否则当 using 语句结束后会 Close 连接,EventingBasicConsumer.Received 事件将不会被触发

            var connection = factory.CreateConnection();
            var channel = connection.CreateModel();
            channel.QueueDeclare("myQueue1", false, false, false, null);
            channel.BasicQos(0, 1, false);  //该方法可以看出休息小半秒和一秒的区别
 
            
            var consumer = new EventingBasicConsumer(channel);  //消费者(指定消息通道)
                                                                //该事件在接收到消息时触发
            consumer.Received += (sender, e) =>
            {
                byte[] body = e.Body.ToArray();   //消息字节数组
                string message = Encoding.UTF8.GetString(body); //消息内容

                Console.WriteLine($"                                       rec_work2 Rec: {message}");
                channel.BasicAck(e.DeliveryTag, false);        //手工确认
                Thread.Sleep(1000);
            };
            channel.BasicConsume("myQueue1", false, consumer);   //消费消息(在当前通道中监听 myQueue1 队列,并进行消费)
            Console.ReadLine();
            connection.Close();
            channel.Close();


 
        }



        static void rec_fanout1()
        {
            String QUEUE_NAME = "fanout_queue_1";

            String EXCHANGE_NAME = "fanout_exchange";

            var factory = new ConnectionFactory();
            factory.HostName = "192.168.165.146";
            factory.UserName = "guest";
            factory.Password = "guest";
            //不能放入 using 语句中,否则当 using 语句结束后会 Close 连接,EventingBasicConsumer.Received 事件将不会被触发

            var connection = factory.CreateConnection();
            var channel = connection.CreateModel();
            //3.声明交换器
            channel.QueueDeclare(QUEUE_NAME, false, false, false, null);
            //4.绑定队列到交换器
            channel.QueueBind (QUEUE_NAME, EXCHANGE_NAME, "");

            channel.BasicQos(0, 1, false); //该方法可以看出休息小半秒和一秒的区别

            var consumer = new EventingBasicConsumer(channel);  //消费者(指定消息通道)
            channel.BasicConsume(QUEUE_NAME, false, consumer);   //消费消息(在当前通道中监听 myQueue1 队列,并进行消费)

            //该事件在接收到消息时触发
            consumer.Received += (sender, e) =>
            {
                byte[] body = e.Body.ToArray();   //消息字节数组
                string message = Encoding.UTF8.GetString(body); //消息内容

                Console.WriteLine($"rec_work1 Rec: {message}");
                channel.BasicAck(e.DeliveryTag, false);   //手工确认
                Thread.Sleep(200);
            };

            Console.ReadLine();
            connection.Close();
            channel.Close();
        }
        static void rec_fanout2()
        {
            Thread.Sleep(15000);
            String QUEUE_NAME = "fanout_queue_2";

            String EXCHANGE_NAME = "fanout_exchange";

            var factory = new ConnectionFactory();
            factory.HostName = "192.168.165.146";
            factory.UserName = "guest";
            factory.Password = "guest";
            //不能放入 using 语句中,否则当 using 语句结束后会 Close 连接,EventingBasicConsumer.Received 事件将不会被触发

            var connection = factory.CreateConnection();
            var channel = connection.CreateModel();
            //3.声明交换器
            channel.QueueDeclare(QUEUE_NAME, false, false, false, null);
            //4.绑定队列到交换器
            channel.QueueBind(QUEUE_NAME, EXCHANGE_NAME, "");

            channel.BasicQos(0, 1, false); //该方法可以看出休息小半秒和一秒的区别

            var consumer = new EventingBasicConsumer(channel);  //消费者(指定消息通道)
            channel.BasicConsume(QUEUE_NAME, false, consumer);   //消费消息(在当前通道中监听 myQueue1 队列,并进行消费)

            //该事件在接收到消息时触发
            consumer.Received += (sender, e) =>
            {
                byte[] body = e.Body.ToArray();   //消息字节数组
                string message = Encoding.UTF8.GetString(body); //消息内容

                Console.WriteLine($"                                       rec_work2 Rec: {message}");
                channel.BasicAck(e.DeliveryTag, false);   //手工确认
                Thread.Sleep(400);
            };

            Console.ReadLine();
            connection.Close();
            channel.Close();             


        }


        static void rec_direct1()
        {
            String QUEUE_NAME = "direct_queue_1";

            String EXCHANGE_NAME = "direct_exchange";

            var factory = new ConnectionFactory();
            factory.HostName = "192.168.165.146";
            factory.UserName = "guest";
            factory.Password = "guest";
            //不能放入 using 语句中,否则当 using 语句结束后会 Close 连接,EventingBasicConsumer.Received 事件将不会被触发

            var connection = factory.CreateConnection();
            var channel = connection.CreateModel();
            //3.声明交换器
            channel.QueueDeclare(QUEUE_NAME, false, false, false, null);
            //4.绑定队列到交换器
            channel.QueueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
            channel.QueueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
            channel.QueueBind(QUEUE_NAME, EXCHANGE_NAME, "add");



            channel.BasicQos(0, 1, false); //该方法可以看出休息小半秒和一秒的区别

            var consumer = new EventingBasicConsumer(channel);  //消费者(指定消息通道)
            channel.BasicConsume(QUEUE_NAME, false, consumer);   //消费消息(在当前通道中监听 myQueue1 队列,并进行消费)

            //该事件在接收到消息时触发
            consumer.Received += (sender, e) =>
            {
                byte[] body = e.Body.ToArray();   //消息字节数组
                string message = Encoding.UTF8.GetString(body); //消息内容

                Console.WriteLine($"rec_work1 Rec: {message}");
                channel.BasicAck(e.DeliveryTag, false);   //手工确认
                Thread.Sleep(200);
            };

            Console.ReadLine();
            connection.Close();
            channel.Close();
        }
        static void rec_direct2()
        {
            String QUEUE_NAME = "direct_queue_2";

            String EXCHANGE_NAME = "direct_exchange";

            var factory = new ConnectionFactory();
            factory.HostName = "192.168.165.146";
            factory.UserName = "guest";
            factory.Password = "guest";
            //不能放入 using 语句中,否则当 using 语句结束后会 Close 连接,EventingBasicConsumer.Received 事件将不会被触发

            var connection = factory.CreateConnection();
            var channel = connection.CreateModel();
            //3.声明交换器
            channel.QueueDeclare(QUEUE_NAME, false, false, false, null);
            //4.绑定队列到交换器
            channel.QueueBind(QUEUE_NAME, EXCHANGE_NAME, "select");
 


            channel.BasicQos(0, 1, false); //该方法可以看出休息小半秒和一秒的区别

            var consumer = new EventingBasicConsumer(channel);  //消费者(指定消息通道)
            channel.BasicConsume(QUEUE_NAME, false, consumer);   //消费消息(在当前通道中监听 myQueue1 队列,并进行消费)

            //该事件在接收到消息时触发
            consumer.Received += (sender, e) =>
            {
                byte[] body = e.Body.ToArray();   //消息字节数组
                string message = Encoding.UTF8.GetString(body); //消息内容

                Console.WriteLine($"                                       rec_work2 Rec: {message}");
                channel.BasicAck(e.DeliveryTag, false);   //手工确认
                Thread.Sleep(400);
            };

            Console.ReadLine();
            connection.Close();
            channel.Close();

             


        }
        static void Main(string[] args)
        {
            //简单模式
            ////rec_simple();

            ////工作模式 启动两个消费者
            //Thread t1 = new Thread(new ThreadStart(rec_work1));
            //t1.Start();
            //Thread t2 = new Thread(new ThreadStart(rec_work2));
            //t2.Start();


            //////交换器(fanout)
            ////Thread t1 = new Thread(new ThreadStart(rec_fanout1));
            ////t1.Start();
            ////Thread t2 = new Thread(new ThreadStart(rec_fanout2));
            ////t2.Start();


            //交换器(direct)  //个人感觉,做ERP相关的管理系统,在实际应中,这种使用的可能性比较大
            Thread t1 = new Thread(new ThreadStart(rec_direct1));
            t1.Start();
            Thread t2 = new Thread(new ThreadStart(rec_direct2));
            t2.Start();


        }
    }
}
原文地址:https://www.cnblogs.com/szyicol/p/13044752.html