RabbitMQ学习笔记

一、rabbitmqctl

启动rabbitmq rabbitmqctl start_app 

关闭rabbitmq  rabbitmqctl stop_app 

格式化rabbitmq   rabbitmqctl reset (格式化之前需要先关闭rabbitmq)

强制格式化rabbitmq   rabbitmqctl force_reset  

二、ExChange

1,Direct (直连)

 通过routingkey发送到指定的queue

using EasyNetQ;
using EasyNetQ.Topology;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace DirectConsumer
{
    class Program
    {
        static void Main(string[] args)
        {
            var bus = RabbitHutch.CreateBus("host=192.168.1.193:5672;username=hunter;password=hunter").Advanced;
            var ex= bus.ExchangeDeclare("direct", ExchangeType.Direct);
            var que= bus.QueueDeclare("001");//001为queue的名称
            bus.Bind(ex, que, "000");//000为routingkey

            bus.Consume(que, (body, properties, info) => Task.Factory.StartNew(() =>
            {
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine("Got message: '{0}'", message);
            }));

            Console.ReadKey();
        }
    }
}
DirectConsumer
using EasyNetQ;
using EasyNetQ.Topology;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace DirectProduce
{
    class Program
    {
        static void Main(string[] args)
        {

            var bus = RabbitHutch.CreateBus("host=192.168.1.193:5672;username=hunter;password=hunter").Advanced;
            var ex = bus.ExchangeDeclare("direct", ExchangeType.Direct);a

            var message = new Message<string>("sad");
            //000:为routingkey
            bus.Publish<string>(ex, "000", false, message);

            bus.Dispose();

        }
    }
}
DirectProduce

2,Fanout(广播)

 

使用这种类型的Exchange,会忽略routing key的存在,直接将message广播到所有的Queue中。

适用场景:

                第一:大型玩家在玩在线游戏的时候,可以用它来广播重大消息。这让我想到电影微微一笑很倾城中,有款游戏需要在世界上公布玩家重大消息,也许这个就是用的MQ实现的。这让我不禁佩服肖奈,人家在大学的时候就知道RabbitMQ的这种特性了。

                第二:体育新闻实时更新到手机客户端。

                第三:群聊功能,广播消息给当前群聊中的所有人。

using EasyNetQ;
using EasyNetQ.Topology;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace FanoutConsumer
{
    class Program
    {
        static void Main(string[] args)
        {
            
            var bus = RabbitHutch.CreateBus("host=192.168.1.193:5672;username=hunter;password=hunter").Advanced;
            var ex = bus.ExchangeDeclare("fanout", ExchangeType.Fanout);
            var que = bus.QueueDeclare("directQueue");//001为queue的名称
            bus.Bind(ex, que, string.Empty);//Fanout不需要设置routingkey
            bus.Consume(que, (body, properties, info) => Task.Factory.StartNew(() =>
            {
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine("Got message: '{0}'", message);
            }));

            Console.ReadKey();

        }
    }
}
FanoutConsumer
using EasyNetQ;
using EasyNetQ.Topology;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace FanoutConsumer2
{
    class Program
    {
        static void Main(string[] args)
        {
            var bus = RabbitHutch.CreateBus("host=192.168.1.193:5672;username=hunter;password=hunter").Advanced;
            var ex = bus.ExchangeDeclare("fanout", ExchangeType.Fanout);
            var que = bus.QueueDeclare("directQueue2");//001为queue的名称
            bus.Bind(ex, que, string.Empty);//Fanout不需要设置routingkey
            bus.Consume(que, (body, properties, info) => Task.Factory.StartNew(() =>
            {
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine("Got message: '{0}'", message);
            }));

            Console.ReadKey();
        }
    }
}
FanoutConsumer2
using EasyNetQ;
using EasyNetQ.Topology;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace FanoutProduce
{
    class Program
    {
        static void Main(string[] args)
        {
            var bus = RabbitHutch.CreateBus("host=192.168.1.193:5672;username=hunter;password=hunter").Advanced;
            var ex = bus.ExchangeDeclare("fanout", ExchangeType.Fanout);
            var message = new Message<string>("sad");

            //Fanout不需要设置routingkey
            bus.Publish<string>(ex, string.Empty, false, message);

            bus.Dispose();


        }
    }
}
FanoutProduce

3,Topic(主题)

Topic Exchange是根据routing key和Exchange的类型将message发送到一个或者多个Queue中

使用场景:

               新闻的分类更新

               同一任务多个工作者协调完成

               同一问题需要特定人员知晓

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using EasyNetQ;

namespace TopicConsumer
{
    class Program
    {
        static void Main(string[] args)
        {
            var bus = RabbitHutch.CreateBus("host=192.168.1.193:5672;username=hunter;password=hunter");

            //正确的使用方法:
            //c=> { c.WithTopic("*.cn"); }设置    Routing key。如果没有这句,则Routing key为#
            //*.com 只是queue的名称
            //bus.Subscribe<string>("*.com", r => Console.WriteLine(r),c=> { c.WithTopic("*.cn"); });

            //subscriptionId是queue的名称
            //subscriptionId+exchangeType=唯一
            bus.Subscribe<string>("*.com", r => Console.WriteLine(r));
            bus.Subscribe<string>("*.cn", r => Console.WriteLine(r));

            Console.ReadKey();
        }
    }
}
TopicConsumer
using EasyNetQ;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace TopicProduce
{
    class Program
    {
        static void Main(string[] args)
        {
            var bus = RabbitHutch.CreateBus("host=192.168.1.193:5672;username=hunter;password=hunter");

            bus.Publish<string>("你好", "www.oyunkeji.com");
            //bus.Publish<string>("你好", c => c.WithTopic("www.oyunkeji.com"));

            bus.Dispose();
        }
    }
}
TopicProduce

4,Headers(头信息)

 

它是根据Message的一些头部信息来分发过滤Message,忽略routing key的属性,如果Header信息和message消息的头信息相匹配,那么这条消息就匹配上了

x-match的头部必须设置:

当x-match的值设置为all时,header信息必须全部满足才会匹配上

当x-match的值设置为any时,header信息满足其中任意一个就会匹配上

using EasyNetQ;
using EasyNetQ.Topology;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace HeadersConsumer
{
    class Program
    {
        static void Main(string[] args)
        {
            var bus = RabbitHutch.CreateBus("host=192.168.1.193:5672;username=hunter;password=hunter").Advanced;
            var ex = bus.ExchangeDeclare("headers", ExchangeType.Header);
            var que = bus.QueueDeclare("headersQueue");//001为queue的名称
            bus.Bind(ex, que, string.Empty,new Dictionary<string, object>() {
                { "x-match","all"},
                { "username","hunter"},
                { "password","hunter"}
            });//Header不需要设置routingkey
            bus.Consume(que, (body, properties, info) => Task.Factory.StartNew(() =>
            {
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine("Got message: '{0}'", message);
            }));

            Console.ReadKey();

        }
    }
}
HeadersConsumer
using EasyNetQ;
using EasyNetQ.Topology;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace HeadersProduce
{
    class Program
    {
        static void Main(string[] args)
        {
            var bus = RabbitHutch.CreateBus("host=192.168.1.193:5672;username=hunter;password=hunter").Advanced;
            var ex = bus.ExchangeDeclare("headers", ExchangeType.Header);

            var properties = new MessageProperties();
            properties.Headers.Add("username", "hunter");
            properties.Headers.Add("password", "hunter");

            //Fanout不需要设置routingkey
            bus.Publish(ex, string.Empty, false, properties, Encoding.UTF8.GetBytes("你好"));
            bus.Dispose();

        }
    }
}
HeadersProduce

案例下载:https://pan.baidu.com/s/1gVBO3qLl9Dw5tIhIpETvkw

三、Arguments

1,Message TTL(x-message-ttl)

发布到队列的消息在丢弃之前可以存活多长时间(毫秒)。

①针对队列中的所有消息

using EasyNetQ;
using EasyNetQ.Topology;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            var bus = RabbitHutch.CreateBus("host=localhost:5672;username=hunter;password=hunter").Advanced;
            var ex = bus.ExchangeDeclare("directText", ExchangeType.Direct);

            //001为queue的名称
            //001队列下的消息5秒钟没有被消费自动删除
            var que = bus.QueueDeclare("001",perQueueMessageTtl:5000);
            bus.Bind(ex, que, "000");//000为routingkey

            bus.Consume(que, (body, properties, info) => Task.Factory.StartNew(() =>
            {
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine("Got message: '{0}'", message);
            }));


            Console.ReadKey();
        }
    }
}
consumer
using EasyNetQ;
using EasyNetQ.Topology;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;

namespace ConsoleApp2
{
    class Program
    {
        static void Main(string[] args)
        {
            var bus = RabbitHutch.CreateBus("host=localhost:5672;username=hunter;password=hunter").Advanced;
            var ex = bus.ExchangeDeclare("directText", ExchangeType.Direct);
            var properties = new MessageProperties();
            var message = new Message<string>("你好");
            //000:为routingkey
            bus.Publish<string>(ex, "000", false, message);

            bus.Dispose();

        }
    }
}
produce

②指定某个消息

using EasyNetQ;
using EasyNetQ.Topology;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            var bus = RabbitHutch.CreateBus("host=localhost:5672;username=hunter;password=hunter").Advanced;
            var ex = bus.ExchangeDeclare("directText", ExchangeType.Direct);

            //001为queue的名称
            //001队列下的消息5秒钟没有被消费自动删除
            var que = bus.QueueDeclare("001");
            bus.Bind(ex, que, "000");//000为routingkey

            bus.Consume(que, (body, properties, info) => Task.Factory.StartNew(() =>
            {
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine("Got message: '{0}'", message);
            }));


            Console.ReadKey();
        }
    }
}
consumer
using EasyNetQ;
using EasyNetQ.Topology;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;

namespace ConsoleApp2
{
    class Program
    {
        static void Main(string[] args)
        {
            var bus = RabbitHutch.CreateBus("host=localhost:5672;username=hunter;password=hunter").Advanced;
            var ex = bus.ExchangeDeclare("directText", ExchangeType.Direct);
            var properties = new MessageProperties();
            properties.Expiration = "5000";//单位:毫秒
            //000:为routingkey
            bus.Publish(ex, "000", false, properties, Encoding.UTF8.GetBytes("你好"));

            bus.Dispose();

        }
    }
}
produce

2,Auto expire(x-expires)

 queue在指定的时间未被访问,就会被删除(毫秒)。

3,Max length(x-max-length)

限定队列的最大长度,

4,Max length bytes(x-max-length-bytes)

限定队列的最大占用空间大小

5,Overflow behaviour(x-overflow)

 设置队列溢出行为。这决定了在到达队列的最大长度时消息会发生什么情况。有效值是 drop-head(删除头)或者 reject-publish(拒绝发布) 。

6,Dead letter exchange/Dead letter routing key(x-dead-letter-exchange/x-dead-letter-routing-key)

queue中的message过期时间。

basicreject...basicnack等等。。。

这三种情况一般会drop这些message。。。

Dead letter exchange:时候我们不希望message被drop掉,而是走到另一个队列中,又或者是保存起来

Dead letter routing key:指定的routing key

8,Maximum priority

(x-max-priority)

定义消息的优先级

9,Lazy mode(x-queue-mode)

将队列设置为延迟模式,在磁盘上保留尽可能多的消息以减少RAM使用量; 如果未设置,队列将保留内存中的缓存以尽可能快地传递消息。

10,Master locator (x-queue-master-locator)

将队列设置为主位置模式,确定队列主节点在节点集群上声明时所处的规则。

四、高可靠消息队列

1,消费端的确认

①自动确认

message出队列的时候就自动确认

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Consumer
{
    class Program
    {
        static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory()
            {
                HostName = "localhost",
                Port = 5672,
                UserName = "hunter",
                Password = "hunter"
            };

            //创建connection
            var connection = factory.CreateConnection();

            //创建chanel
            var channel = connection.CreateModel();

            //创建exchange
            channel.ExchangeDeclare("exchange1", ExchangeType.Direct, true, false, null);

            //创建queue
            channel.QueueDeclare("queue1", true, false, false, null);

            //exchange绑定queue
            channel.QueueBind("queue1", "exchange1", "queue1", null);


            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
            consumer.Received += (send, e) =>
            {
                Console.WriteLine(Encoding.UTF8.GetString(e.Body));
            };

            //autoAck 设置为true:自动确认
            channel.BasicConsume("queue1", true, consumer);

            Console.ReadKey();
        }

    }
}
Consumer
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client;

namespace Produce
{
    class Program
    {
        static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory()
            {
                HostName = "localhost",
                Port = 5672,
                UserName = "hunter",
                Password = "hunter"
            };

            //创建connection
            var connection = factory.CreateConnection();

            //创建chanel
            var channel = connection.CreateModel();

            for (int i = 0; i < 10; i++)
            {
                channel.BasicPublish("exchange1", "queue1", null, Encoding.UTF8.GetBytes(i.ToString()));
            }

            channel.Dispose();

            Console.WriteLine("发布完毕");


            Console.ReadKey();

        }
    }
}
Produce

②手动确认

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Consumer
{
    class Program
    {
        static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory()
            {
                HostName = "localhost",
                Port = 5672,
                UserName = "hunter",
                Password = "hunter"
            };

            //创建connection
            var connection = factory.CreateConnection();

            //创建chanel
            var channel = connection.CreateModel();

            //创建exchange
            channel.ExchangeDeclare("exchange1", ExchangeType.Direct, true, false, null);

            //创建queue
            channel.QueueDeclare("queue1", true, false, false, null);

            //exchange绑定queue
            channel.QueueBind("queue1", "exchange1", "queue1", null);

            var result = channel.BasicGet("queue1", false);

            Console.WriteLine(Encoding.UTF8.GetString(result.Body));

            //拒绝掉
            //requeue:true:重新放回队列 false:直接丢弃
            channel.BasicReject(result.DeliveryTag, false);

            //BasicRecover方法则是进行补发操作,
            //其中的参数如果为true是把消息退回到queue但是有可能被其它的consumer接收到,设置为false是只补发给当前的consumer
            //channel.BasicRecover(true);

            Console.ReadKey();
        }

    }
}
Consumer

2,发布端的确认

其中事务的性能消耗最大,confirm其次

①confirm机制

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client;

namespace Produce
{
    class Program
    {
        static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory()
            {
                HostName = "localhost",
                Port = 5672,
                UserName = "hunter",
                Password = "hunter"
            };

            //创建connection
            var connection = factory.CreateConnection();

            //创建chanel
            var channel = connection.CreateModel();

            channel.ConfirmSelect();
            for (int i = 0; i < 10000; i++)
            {
                channel.BasicPublish("exchange1", "queue1", null, Encoding.UTF8.GetBytes(i.ToString()));
            }
            var isallPublish = channel.WaitForConfirms();
            Console.WriteLine(isallPublish);

            channel.Dispose();
            connection.Dispose();

            Console.WriteLine("发布完毕");

            Console.ReadKey();

        }
    }
}
Produce

② 事物机制

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client;

namespace Produce
{
    class Program
    {
        static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory()
            {
                HostName = "localhost",
                Port = 5672,
                UserName = "hunter",
                Password = "hunter"
            };

            //创建connection
            var connection = factory.CreateConnection();

            //创建chanel
            var channel = connection.CreateModel();

            try
            {
                channel.TxSelect();
                for (int i = 0; i < 10000; i++)
                {
                    channel.BasicPublish("exchange1", "queue1", null, Encoding.UTF8.GetBytes(i.ToString()));
                }
                channel.TxCommit();
            }
            catch (Exception ex)
            {
                channel.TxRollback();
            }

            channel.Dispose();
            connection.Dispose();

            Console.WriteLine("发布完毕");

            Console.ReadKey();

        }
    }
}
Produce

五、Consumer消费问题

Consumer消费时,不管你是否却不确认,消息都会一股脑全部打入到你的consumer中去,导致consumer端内存暴涨(EasynetQ的Subscribe不会出现这种情况)

解决方法:

 ①eventbasicconsumer+QOS

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using EasyNetQ;

namespace Consumer
{
    class Program
    {
        static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory()
            {
                HostName = "localhost",
                Port = 5672,
                UserName = "hunter",
                Password = "hunter"
            };

            //创建connection
            var connection = factory.CreateConnection();

            //创建chanel
            var channel = connection.CreateModel();

            //创建exchange
            channel.ExchangeDeclare("exchange1", ExchangeType.Direct, true, false, null);

            //创建queue
            channel.QueueDeclare("queue1", true, false, false, null);

            //exchange绑定queue
            channel.QueueBind("queue1", "exchange1", "queue1", null);


            //prefetchSize:预取大小   prefetchCount:预取数量
            channel.BasicQos(0, 1, false);

            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
            consumer.Received += (send, e) =>
            {
                Console.WriteLine(Encoding.UTF8.GetString(e.Body));

                channel.BasicAck(e.DeliveryTag, false);//确认送达
                Thread.Sleep(1000000);
            };

            //autoAck 设置为true:自动确认
            channel.BasicConsume("queue1", false, consumer);


            Console.ReadKey();

            Console.ReadKey();
        }

    }
}
Consumer
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client;

namespace Produce
{
    class Program
    {
        static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory()
            {
                HostName = "localhost",
                Port = 5672,
                UserName = "hunter",
                Password = "hunter"
            };

            //创建connection
            var connection = factory.CreateConnection();

            //创建chanel
            var channel = connection.CreateModel();

            try
            {
                channel.TxSelect();
                for (int i = 0; i < 10000; i++)
                {
                    channel.BasicPublish("exchange1", "queue1", null, Encoding.UTF8.GetBytes(i.ToString()));
                }
                channel.TxCommit();
            }
            catch (Exception ex)
            {
                channel.TxRollback();
            }

            channel.Dispose();
            connection.Dispose();

            Console.WriteLine("发布完毕");

            Console.ReadKey();

        }
    }
}
Produce
原文地址:https://www.cnblogs.com/zd1994/p/8659643.html