C#中使用RabbitMQ收发队列消息

一、程序使用NetCore、引入Nuget:

  Install-Package RabbitMQ.Client -Version 4.1.3

二、消息发部端:

  

using RabbitMQ.Client;
using System;
using System.Text;

namespace ClientDemo
{
    public class Client
    {
        static string exchangeName = "my-exchange";
        static string queueName = "my-queue";
        public static void Main()
        {
            Console.InputEncoding = Encoding.Unicode;
            Console.OutputEncoding = Encoding.Unicode;
            ConnectionFactory factory = new ConnectionFactory();
            factory.Uri = new Uri("amqp://guest:guest@localhost:5672/");
            var conn = factory.CreateConnection();
            IModel model = conn.CreateModel();

            //model.ExchangeDelete(exchangeName);
            model.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, durable: false, autoDelete: false, arguments: null);
            model.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
            model.QueueBind(queue: queueName, exchange: exchangeName, routingKey: queueName);

            var props = model.CreateBasicProperties();
            props.Persistent = true;//是否持久化
            while (true)
            {
                Console.WriteLine("请输入要发送的消息:");
                var line = Console.ReadLine();
                if (line == "exit") break;
                model.BasicPublish(exchange: exchangeName, routingKey: queueName, basicProperties: props, body: Encoding.UTF8.GetBytes(line));

            }
            model.Close();
            conn.Close();
        }
    }
}

 二、消息消费端:

using RabbitMQ.Client;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ServerDemo
{
    public class Server
    {
        //static string exchangeName = "my-exchange";
        static string queueName = "my-queue";
        public static void Main()
        {
            Console.InputEncoding = Encoding.Unicode;
            Console.OutputEncoding = Encoding.Unicode;
            ConnectionFactory factory = new ConnectionFactory();
            //factory.Uri = new Uri("amqp://guest:guest@localhost:5672/");
            var conn = factory.CreateConnection();
            IModel model = conn.CreateModel();

            //model.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, durable: false, autoDelete: false, arguments: null);
            //model.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
            //model.QueueBind(queue: queueName, exchange: exchangeName, routingKey: queueName);
            var task = Task.Run(() =>
             {
                 while (true)
                 {
                     var result = model.BasicGet(queue: queueName, autoAck: false);
                     if (result == null) { Thread.Sleep(10);continue; };
                     var msg = Encoding.UTF8.GetString(result.Body);
                     Console.WriteLine(msg);
                 }
             });

            task.Wait();
            model.Close();
            conn.Close();
        }
    }
}
原文地址:https://www.cnblogs.com/songxingzhu/p/7229952.html