WinForm实现Rabbitmq官网6个案例-Publishe/Subscribe

代码:

namespace RabbitMQDemo
{
    public partial class PublishSubscribe : Form
    {
        private string exchangeName = "logs";
        private string exchangeType = ExchangeType.Fanout;//广播模式
        Action<string, TextBox> SetText;
        private readonly static PublishSubscribe _PublishSubscribe;
        static PublishSubscribe()
        {
            _PublishSubscribe = new PublishSubscribe();
        }
        /// <summary>
        /// 单例模式
        /// </summary>
        public static PublishSubscribe SingleForm { get { return _PublishSubscribe; } }
        private PublishSubscribe()
        {
            CheckForIllegalCrossThreadCalls = false;
            InitializeComponent();
            ReceiveMsg(txtConsumer1);//消费者1
            ReceiveMsg(txtConsumer2);//消费者2
            SetText += OnSetText;
        }

        private void btnSendMsg_Click(object sender, EventArgs e)
        {
            SendMsg();
        }
        /// <summary>
        /// 发送消息
        /// </summary>
        private void SendMsg()
        {
            string message = txtPublisher.Text;
            if (message.Trim().Length <= 0)
            {
                MessageBox.Show("请输入要发送的消息");
            }
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(
                    exchange: exchangeName,
                    type: exchangeType);

                var body = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish(exchange: exchangeName,
                                     routingKey: "",
                                     basicProperties: null,
                                     body: body);
            }
        }
        /// <summary>
        /// 接收消息
        /// </summary>
        private void ReceiveMsg(TextBox box)
        {
            try
            {
                var factory = new ConnectionFactory() { HostName = "localhost" };
                var connection = factory.CreateConnection();
                var channel = connection.CreateModel();

                //声明交换机
                channel.ExchangeDeclare(
                    exchange: exchangeName,
                    type: exchangeType);

                //rabbitmq随机生成队列名 durable=false exclusive=true, autodelete=true
                //exchangeName发来的消息在接收端永远都是新的队列在接收
                var queueName = channel.QueueDeclare().QueueName;

                //绑定队列和交换机
                //必须绑定了名为exchangeName的queueName队列才能收到消息
                channel.QueueBind(
                    queue: queueName,
                    exchange: exchangeName,
                    routingKey: "");

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                  {
                      var msg = Encoding.UTF8.GetString(ea.Body);

                      txtConsumer1.Invoke(SetText, msg, box);
                  };
                channel.BasicConsume(
                    queue: queueName,
                    noAck: true,
                    consumer: consumer);
            }
            catch (Exception ex)
            {
                MessageBox.Show(ex.ToString());
            }
        }

        private void OnSetText(string text, TextBox box)
        {
            box.Text += string.Format("{0}
", text);
        }
    }
}
View Code

界面:

大概流程:

生产者发送一条消息通过exchange交换机绑定到2个队列上,两个队列上都会有这一条消息,消费者1,2向队列取出消息并做处理

测试结果:

原文地址:https://www.cnblogs.com/zhyue93/p/rabbitmq-publishsubscribe.html