RabbitMQ 初识

  • RabbitMQ 官网地址 https://www.rabbitmq.com/
  • 使用前先下载ErLang 环境 http://www.erlang.org/download.html
  • 在 cmd 中指向 sbin 目录,并输入以下命令,才能打开 WEB 管理界面  
    rabbitmq-plugins enable rabbitmq_management
  • web端默认地址http://localhost:15672  默认账户名 密码 guest guest

  nuget 程序 RabbitMQ 安装完成后开始测试代码。

 服务端:

  

using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Publish
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory();
            factory.HostName = "localhost";
            factory.UserName = "dp";
            factory.Password = "as1234";

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    bool durable = true;//设置队列持久化
                    channel.QueueDeclare("hello", durable, false, false, null);
                    string message = GetMessage(args);

                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;//设置消息持久化
                    properties.DeliveryMode = 2;//或者可以使用 properties.SetPersistent(true);

                    //需要注意的是,将消息设置为持久化并不能完全保证消息不丢失。虽然他告诉RabbitMQ将消息保存到磁盘上,但是在RabbitMQ接收到消息和将其保存到磁盘上这之间仍然有一个小的时间窗口。 RabbitMQ 可能只是将消息保存到了缓存中,并没有将其写入到磁盘上。持久化是不能够一定保证的,但是对于一个简单任务队列来说已经足够。如果需要消息队列持久化的强保证,可以使用publisher confirms
                    var body = Encoding.UTF8.GetBytes(message);
                    channel.BasicPublish("", "hello", properties, body);

                    Console.WriteLine(" send {0}", message);
                }
            }
            Console.ReadKey();
        }
        private static string GetMessage(string[] args)
        {
            return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
        }
    }
}

  客户端:

  

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Receive
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory();
            factory.HostName = "localhost";
            factory.UserName = "dp";
            factory.Password = "as1234";

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare("hello", false, false, false, null);
                    channel.BasicQos(0, prefetchCount:1, global:false);//配置公平分发机制 设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息,也就确保了当消费端处于忙碌状态时
                    var consumer = new QueueingBasicConsumer(channel);
                    bool autoAck = false;//设置是否自动ack 默认是true
                    channel.BasicConsume("hello", autoAck, consumer);

                    Console.WriteLine(" waiting for message.");
                    while (true)
                    {
                        var ea = consumer.Queue.Dequeue();
                        
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);

                        int dots = message.Split('.').Length - 1;
                        Thread.Sleep(dots * 1000);
                        Console.WriteLine("Received {0}", message);
                        channel.BasicAck(ea.DeliveryTag, false);//ack
                    }
                }
            }
        }
    }
}

以上没有考虑 Exchange和routingKey绑定的模式。只是针对某个队列的操作。其他相关信息参考地址:http://www.cnblogs.com/sheng-jie/p/7192690.html

原文地址:https://www.cnblogs.com/q975261413/p/RabbitMQ.html