.net core 2.1 + RabbitMQ 广播模式 简单使用

注* 【这里只有干活,没有花里胡哨的魅惑】,在查看之前希望能简单捎带看下这几句知心话:写一些东西,不是为了显摆,或者是为了一些目的性的东西,曾经写过一些东西,后来在岁月长河中丢失了一些东西.

成长嘛!就是在一步步向前,,强者!世界上没有所谓的强者,真正的强者,是那些无视别人白眼,义无反顾,一步一步,一直不断努力向前的人;【看到这里的盆友,如果觉得废话,那就看盆友想看的吧,不过还是希望盆友能,品品,细细品】

1:实列大概:

 1------(1)工具类部分:

 (1)代码部分(知道盆友大家都喜欢Ctrl+C):

public class RabbitMQHelp
{

/// <summary>
/// 交换器
/// </summary>
public const string ExchangeName = "exchange119";
//队列名称
public const string QueueName = "queue119";
// 队列名称
public const string QueueName3 = "queue118";
//创建连接工厂对象
public static IConnectionFactory conFactory = new ConnectionFactory
{
HostName = "127.0.0.1",//IP地址
Port = 5672,//端口号 区分:15672指客户端,5672服务端
UserName = "guest",//用户账号【默认guest】
Password = "guest",//用户密码 【默认guest】
//VirtualHost = "/vhost001", //如果不设置,虚拟主机名称路径默认为 /
AutomaticRecoveryEnabled = true
};

(2)发布者:

 代码部分:

/// <summary>
/// 发布者
/// </summary>
public static void PushMessage001()
{
using (IConnection con = RabbitMQHelp.conFactory.CreateConnection())//创建连接对象
{
using (IModel channel = con.CreateModel())//创建连接会话对象
{
//arguments:它是扩展参数,用于扩展AMQP协议自制定化的使用
channel.ExchangeDeclare(RabbitMQHelp.ExchangeName, "fanout", durable: false, autoDelete: false, arguments: null); //声明一个Exchange(交换机)

var props = channel.CreateBasicProperties();
props.Persistent = true;

while (true)
{
Console.WriteLine("消息内容:");
String message = Console.ReadLine();

//发送的消息必须是二进制的
var msg = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: RabbitMQHelp.ExchangeName, routingKey: "", basicProperties: props, body: msg);

////消息内容
//byte[] body = Encoding.UTF8.GetBytes(message);
////发送消息
//channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: props, body: body);
Console.WriteLine("成功发送消息:" + message);
}
}
}
}

(2)消费者部分:

 代码部分:

/// <summary>
/// 消费者2
/// </summary>
/// <param name="TypeValue"></param>
public static void ReceiveMessage001(int TypeValue)
{
using (IConnection con = RabbitMQHelp.conFactory.CreateConnection())//创建连接对象
{
using (IModel channel = con.CreateModel())//创建连接会话对象
{
//创建交换机这里将Exchange(交换机)Type设定为fanout 【消费者端的交换机名称要与生产者端的交换机名称保持一致】

channel.ExchangeDeclare(RabbitMQHelp.ExchangeName, "fanout", durable: false, autoDelete: false, arguments: null);
//创建队列【消费者端需要创建队列,生产者端不需要创建队列】
channel.QueueDeclare(RabbitMQHelp.QueueName, durable: false, autoDelete: false, exclusive: false, arguments: null);

channel.QueueBind(RabbitMQHelp.QueueName, RabbitMQHelp.ExchangeName, routingKey: "");
//fanout模式下Exchange与Queue绑定不需要指定RoutingKey,所以这里设为空字符串就行
//channel.BasicAck(1, true);
//var consumer = new EventingBasicConsumer(channel);
//consumer.Received += (model, ea) =>
//{
// byte[] message = ea.Body;//接收到的消息
// Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message));
//};
//channel.BasicConsume(TypeValue > 1 ? RabbitMQHelp.QueueName : RabbitMQHelp.QueueName3, autoAck: false, consumer);

while (true)
{
//System.Threading.Thread.Sleep(TimeSpan.FromSeconds(5));
BasicGetResult msgResponse = channel.BasicGet(RabbitMQHelp.QueueName, autoAck: true);// 这个true表示消费完这条数据是否删除,true表示删除,false表示不删除
if (msgResponse != null)
{
var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
Console.WriteLine(string.Format("接收时间:{0},消息内容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
}
}
}
}
}

(2====1)另一个消费者:(代码类同上)

(3)执行 发布 和 订阅:

(3====1) 发布者:

消费者2:

消费者3:

执行结果:

 

原文地址:https://www.cnblogs.com/tianxujun/p/13023797.html