rabbitMQ_Publish/Subscribe(三)

发布/订阅

 生产者发布信息,多个订阅者可以同时接收到信息。

转发器

现在是时候在RabbitMQ中引入完整的消息传递模式了。

让我们快速了解我们在以前的教程中介绍的内容:

  • 生产者是一个发送消息的应用程序。
  • 队列是存储消息的缓冲器。
  • 消费者是接收消息的应用程序。

RabbitMQ中的消息传递模型的核心思想是,生产者从不将任何消息直接发送到队列。实际上,生产者通常甚至不知道是否将消息传递到某个队列。

相反,生产者只能将信息发送到转发器。转发是一件非常简单的事情。一方面,它收到来自生产者的消息,另一方将它们推送到队列。转发器必须准确知道接收到的消息如何处理。应该转发到特定队列吗?应该转发到多个队列吗?或者应该丢弃。其规则由转发器类型来决定 。

 

有几种转发类型可用:direct, topic, headers 和 fanout.。我们将重点关注最后一个 - fanout。让我们创建一个这种类型的转发器,并将其称为logs转发器:

channel.exchangeDeclare(“logs”,“fanout”);

fanout类型的转发非常简单。它只是将所有收到的消息广播到所有绑定到这个转发器的队列上。

罗列转发器

要列出服务器上的转发器,您可以运行rabbitmqctl:

sudo rabbitmqctl list_exchanges

在这个列表中会有一些名为amq.*这样前缀的转发器和默认(未命名)转发器。这些是默认创建的,但是不太可能需要使用它们。

匿名转发器

在前面部分,我们没有定义任何转发器,但仍然能够将消息发送到队列上。这是可能的,因为我们使用了默认的转发器,我们通过空字符串(“”)标识。

回想一下我们之前发布的消息:

channel.basicPublish(“”,“hello”,null,message.getBytes());

第一个参数是转发器的名称。空字符串表示默认或匿名转发器:消息通过routekey(第二个参数)路由到指定的队列。

现在,我们可以发布到我们定义的转发器logs:

channel.basicPublish(“logs”,“”,null,message.getBytes());

临时队列

我们之前使用的是具有指定名称的队列(hello ,task_queue这类指定了名称的队列)。当您想要在生产者和消费者之间共享队列时,给队列一个名字很重要。

但是本例中,我们不想自己去创建一个特定名字的队列,我们希望一个消费者连接到rabbitMQ就自动创建一个具有随机名称的队列,然后当这个消费者断开连接的时候

就自动将这个队列删除。

在java中当我们没有为queueDeclare()提供参数时, 我们创建了一个具有生成随机名称的非持久性的,排他的,自动删除的队列:

String queueName = channel.queueDeclare().getQueue();

此时,queueName包含一个随机队列名称。例如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg。

绑定

我们已经创建了一个fanout转发器和随机名称队列。现在我们需要告诉转发器发送消息到这些随机名称队列中,所以我们需要将这些队列绑定到转发器上,它知道要发送消息给哪些队列。

channel.queueBind(queueName,“logs”,“”);

从现在开始,logs转发器将把消息转发到我们的队列中。

列出绑定

你可以列出现有的绑定

rabbitmqctl list_bindings

 

 

生产者EmitLogl.java

 1 package com.rabbitMQ;
 2 
 3 import com.rabbitmq.client.Channel;
 4 import com.rabbitmq.client.Connection;
 5 import com.rabbitmq.client.ConnectionFactory;
 6 
 7 public class EmitLog {
 8 
 9     private static final String EXCHANGE_NAME = "logs";//转发器
10 
11     public static void main(String[] argv)
12                   throws java.io.IOException, Exception {
13 
14         ConnectionFactory factory = new ConnectionFactory();
15         factory.setHost("localhost");
16         Connection connection = factory.newConnection();
17         Channel channel = connection.createChannel();
18 
19         //定义一个转发器,转发器名为logs,转发器类型为fanout,他可以对信息进行广播到所有绑定了的队列中
20         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
21 
22         String message = getMessage(argv);
23         //由于是广播到所有绑定了这个转发器的队列,那么可以不写routeKey
24         channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
25         System.out.println(" [x] Sent '" + message + "'");
26 
27         channel.close();
28         connection.close();
29     }
30     private static String getMessage(String[] strings) {
31         if (strings.length < 1)
32             return "Hello World!";
33         return joinStrings(strings, " ");
34     }
35 
36     private static String joinStrings(String[] strings, String delimiter) {
37         int length = strings.length;
38         if (length == 0)
39             return "";
40         StringBuilder words = new StringBuilder(strings[0]);
41         for (int i = 1; i < length; i++) {
42             words.append(delimiter).append(strings[i]);
43         }
44         return words.toString();
45     }
46 }

消费者ReceiveLogs.java

 1 package com.rabbitMQ;
 2 
 3 import com.rabbitmq.client.*;
 4 
 5 import java.io.IOException;
 6 
 7 public class ReceiveLogs {
 8     //转发器名称
 9   private static final String EXCHANGE_NAME = "logs";
10 
11   public static void main(String[] argv) throws Exception {
12     ConnectionFactory factory = new ConnectionFactory();
13     factory.setHost("localhost");
14     Connection connection = factory.newConnection();
15     Channel channel = connection.createChannel();
16     //可以使用rabbitmqctl list_exchanges查看有哪些转发器
17     channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
18     //定义一个排他的,自动删除的,非持久性的队列,名称是随机的
19     String queueName = channel.queueDeclare().getQueue();
20     //转发器绑定队列
21     //可使用rabbitmqctl list_bindings查看绑定列表
22     channel.queueBind(queueName, EXCHANGE_NAME, "");
23 
24     System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
25 
26     Consumer consumer = new DefaultConsumer(channel) {
27       @Override
28       public void handleDelivery(String consumerTag, Envelope envelope,
29                                  AMQP.BasicProperties properties, byte[] body) throws IOException {
30         String message = new String(body, "UTF-8");
31         System.out.println(" [x] Received '" + message + "'");
32       }
33     };
34     boolean autoAck = true;
35     channel.basicConsume(queueName, autoAck, consumer);
36   }
37 }

启动两个ReceiveLogs.java实例,然后启动EmitLog.java,两个ReceiveLogs.java将同时收到消息

原文地址:https://www.cnblogs.com/honger/p/6963234.html