RabbitMQ的工作模式

RabbitMQ的六种工作模式(含PHP代码实现)

     接着上一篇文章RabbitMQ入门,我们再来看下RabbitMQ的工作模式有哪些。

     1、简单队列模式(simple queue)-最简单的收发模式

     1)只包含一个生产者和一个消费者

     2)生产者将消息发送到队列中,消费者从队列中接收消息

      

     工作过程:

     消息的消费者(consumer) 监听消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除。

     可能存在的问题:

     消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失。

     解决办法:

     这里可以设置成手动的ack,但如果设置成手动ack,处理完后及时发送ack消息给队列,否则会造成内存溢出。

     扩展一下:消息确认-自动应答和手动应答

     noack:true 自动应答 ,false(手动应答) 默认为false-关闭

     noack=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ会在队列中消息被消费后立即删除它。

     注意:

     生产者将消息投递到Queue中,实际上这在RabbitMQ中这种事情永远都不会发生。

     实际的情况是:当你手动创建一个队列时,后台会自动将这个队列绑定到一个名称为空,类型为default的交换机(exchange)上,绑定 RoutingKey 与队列名称相同。有了这个默认的交换机和绑定,使我们只关心队列这一层即可,这个比较适合做一些简单的应用。

     下面我们从代码层面来感受一下:

     1)生产

 1 <?php
 2 
 3 namespace consolecontrollers;
 4 
 5 use PhpAmqpLibConnectionAMQPStreamConnection;
 6 use PhpAmqpLibMessageAMQPMessage;
 7 use yiiconsoleController;
 8 
 9 class SendController extends Controller
10 {
11     public function actionIndex()
12     {
13         // RabbitMQ: 简单的收发模式
14 
15         // 打开一个连接和通道
16         $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
17         $channel = $connection->channel();
18         // 声明一个队列
19         $channel->queue_declare('hxq', false, false, false, false);
20 
21         // 向队列发布消息
22         $msg = new AMQPMessage('Hello World!');
23 
24         // 注意这个路由key一定要设置跟队列匹配,交换机名称为空,类型为default
25         $channel->basic_publish($msg, '', 'hxq');
26         echo "[x] Sent 'Hello World!'
";
27 
28         // 关闭通道和连接
29         $channel->close();
30         $connection->close();
31     }
32 }    

    2)费端

 1 <?php
 2 
 3 namespace consolecontrollers;
 4 
 5 use PhpAmqpLibConnectionAMQPStreamConnection;
 6 use yiiconsoleController;
 7 
 8 class ReceiveController extends Controller
 9 {
10     public function actionIndex()
11     {
12         // RabbitMQ: 简单的收发模式
13 
14         // 打开一个连接和通道
15         $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
16         $channel = $connection->channel();
17         // 声明一个队列
18         $channel->queue_declare('hxq', false, false, false, false);
19         echo '[*] Waiting for messages. To exit press CTRL+C', "
";
20 
21         // 接收消息进行处理的回调函数
22         $callback = function ($msg) {
23             echo "[x] Received ", $msg->body, "
";
24         };
25 
26         // basic_consume是一个阻塞函数,在接收消息的时候调用$callback函数
27         $channel->basic_consume('hxq', '', false, true, false, false, $callback);
28         while ($channel->is_consuming()) {
29             $channel->wait();
30         }
31 
32         // 关闭通道和连接
33         $channel->close();
34         $connection->close();
35     }
36 } 

  下面我们开两个命令窗口,模拟发送端和消费端:

     a) 消费者在队列端监听:

      

     b)发送端发送消息

      

     c)消费者获取到消息,进行消费

      

     2、工作队列模式(work Queues)-资源的竞争

     工作队列是为了避免等待一些占用大量资源、时间的操作。当我们把任务(Task)当作消息发送到队列中,一个运行在后台的工作者(worker)进程就会取出任务然后处理。当你运行多个工作者(workers),任务就会在它们之间共享。
     这个概念在网络应用中是非常有用的,它可以在短暂的HTTP请求中处理一些复杂的任务

     特点:

     1) 一个生产者对应多个消费者,一条消息只被一个消费者进行消费

     2)工作队列有轮询分发公平分发两种模式

      

     工作过程:

     消息产生者将消息放入队列消费者可以有多个,消费者C1,消费者C2同时监听同一个队列。

     C1、C2共同争抢当前的消息队列内容,,一条消息只能由一个消费者消费,这样就形成了资源竞争,谁的资源空闲大争抢到的可能性就大,谁先拿到谁负责消费消息。

     可能存在的问题:

     高并发情况下,默认会产生某一个消息被多个消费者共同使用。

     解决办法:

     可以设置一个开关(syncronize) 保证一条消息只能被一个消费者使用。

     注意:

    上图中虽然没有画出交换机的部分,但是原理同simple queue中阐述的一样,不再赘述。

     下面是参考代码:

     1)生产端

 1 <?php
 2 
 3 namespace consolecontrollers;
 4 
 5 use PhpAmqpLibConnectionAMQPStreamConnection;
 6 use PhpAmqpLibMessageAMQPMessage;
 7 use yiiconsoleController;
 8 
 9 class SendController extends Controller
10 {
11     public function actionIndex2($argv)
12     {
13         // RabbitMQ: 工作队列-资源的竞争
14         // 打开一个连接和通道
15         $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
16         $channel = $connection->channel();
17         // 声明一个队列,第三个参数durable(是否消息持久化):true是 false否
18         $channel->queue_declare('task_queue', false, true, false, false);
19 
20 
21         if (empty($argv)) {
22             $argv = "Hello World!";
23         }
24         // 向队列发布消息
25         $msg = new AMQPMessage($argv, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
26 
27         // 注意这个路由key一定要设置跟队列匹配
28         $channel->basic_publish($msg, '', 'task_queue');
29         echo "[x] Sent ", $argv, "
";
30 
31         // 关闭通道和连接
32         $channel->close();
33         $connection->close();
34     }
35 }

     2) 消费端

 1 <?php
 2 
 3 namespace consolecontrollers;
 4 
 5 use PhpAmqpLibConnectionAMQPStreamConnection;
 6 use yiiconsoleController;
 7 
 8 class ReceiveController extends Controller
 9 {
10     public function actionIndex2()
11     {
12         // RabbitMQ: 工作队列-资源的竞争
13 
14         // 打开一个连接和通道,声明一个队列
15         $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
16         $channel = $connection->channel();
17         // 第三个参数durable为true  消息持久化
18         $channel->queue_declare('task_queue', false, true, false, false);
19         echo '[*] Waiting for messages. To exit press CTRL+C', "
";
20 
21         // 接收消息进行处理的回调函数
22         $callback = function ($msg) {
23             echo "[x] Received ", $msg->body, "
";
24             // 模拟耗时操作
25             sleep(substr_count($msg->body, '.'));
26             echo "[x] Done", "
";
27 
28             // 手动ack
29             $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
30 
31         };
32  
33         // 公平调度-只有consumer已经处理并确认了上一条message时queue才分派新的message给它,为了测试使用前后的效果,我们先把下面代码屏蔽掉,等会儿再打开
34         // $channel->basic_qos(null, 1, null);
35 
36         // basic_consume第四个参数:no_ack true 自动应答 false手动应答
37         $channel->basic_consume('task_queue', '', false, false, false, false, $callback);
38         while ($channel->is_consuming()) {
39             $channel->wait();
40         }
41 
42         // 关闭通道和连接
43         $channel->close();
44         $connection->close();
45     }
46 }    

    为了模拟多个worker,我们这里开三个命令窗口,依次开local,local2作为消费端,local3为消费端,然后进行如下操作:

    

    我们从上面图中可以发现,它仍旧没有按照我们期望的那样进行分发。比如图上有两个工作者(workers),local1中工作者处理的第一条消息耗时5秒,local2中工作者处理的第一条消息耗时1秒。明明local1中的工作者还没有处理完,响应消息。然而RabbitMQ并不知道这些,它仍然一如既往的派发消息,将sleep(3)的这条消息依然发给了local1。

    这是因为RabbitMQ只管分发进入队列的消息,不会关心有多少消费者(consumer)没有作出响应。它盲目的把第n-th条消息发给第n-th个消费者。

    公平调度

    

     

     


     为了解决上面的问题,我们可以使用basic.qos方法,并设置prefetch_count=1。这样是告诉RabbitMQ,在同一时刻,不要发送超过1条消息给一个工作者(worker),直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的工作者(worker)。

     于是我们把消费者端的这段代码注释打开:

     $channel->basic_qos(null, 1, null);

     再测试下:

     

     这就比较符合我们的预期了,大家可以自行测试下。  

     队列大小

     如果所有的工作者都处于繁忙状态,那么队列就会被填满。此时需要留意这个问题,要么添加更多的工作者(workers),要么使用其他策略。

     3、发布-订阅模式(Publish/SubScribe)

     1)一个生产者,多个消费者

     2)生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。

 

     工作过程:

     RabbitMQ把所有发送到该Exchange的消息路由到所有与它绑定的Queue中,无视binding key。一个消息可以被多个消费者消费。

     注意:

     如果消息发送到没有队列绑定的交换机时,消息将会消失,因为交换机没有存储消息的能力只有队列才有存储消息的能力。

     应用场景

     为了描述这种模式,我们将会构建一个简单的日志系统。它包括两个程序——第一个程序负责发送日志消息,第二个程序负责获取消息并输出内容。在我们的这个日志系统中,所有正在运行的接收方程序都会接收消息。我们用其中一个接收者(receiver)把日志写入硬盘中,另外一个接收者(receiver)把日志输出到屏幕上。最终,日志消息被广播给所有的接收者(receivers)。

     下面是参考代码:

     1)生产端

 1 <?php
 2 
 3 namespace consolecontrollers;
 4 
 5 use PhpAmqpLibConnectionAMQPStreamConnection;
 6 use PhpAmqpLibMessageAMQPMessage;
 7 use yiiconsoleController;
 8 
 9 class SendController extends Controller
10 {
11     public function actionIndex3($argv)
12     {
13         // RabbitMQ: 工作队列-发布/订阅(fanout)
14         $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
15         $channel = $connection->channel();
16         // 声明交换机
17         $channel->exchange_declare('logs', 'fanout', false, false, false);
18 
19         if (empty($argv)) {
20             $argv = "info: Hello World!";
21         }
22 
23         // 发送消息到我们命名为logs的交换机
24         $msg = new AMQPMessage($argv);
25         $channel->basic_publish($msg, 'logs');
26         echo "[x] Sent ", $argv, "
";
27 
28         // 关闭通道和连接
29         $channel->close();
30         $connection->close();
31     }
32 }

     2)消费端

 1 <?php
 2 
 3 namespace consolecontrollers;
 4 
 5 use PhpAmqpLibConnectionAMQPStreamConnection;
 6 use yiiconsoleController;
 7 
 8 class ReceiveController extends Controller
 9 {
10     public function actionIndex3()
11     {
12         // RabbitMQ: 工作队列-发布/订阅(fanout)
13         $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
14         $channel = $connection->channel();
15 
16         // 声明交换机
17         $channel->exchange_declare('logs', 'fanout', false, false, false);
18         list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
19 
20         // 绑定队列到交换机
21         $channel->queue_bind($queue_name, 'logs');
22         echo ' [*] Waiting for logs. To exit press CTRL+C', "
";
23 
24         // 定义接收消息进行处理的回调函数
25         $callback = function ($msg) {
26             echo "[x] Received ", $msg->body, "
";
27         };
28 
29         $channel->basic_consume($queue_name, '', false, true, false, false, $callback);
30 
31         while (count($channel->callbacks)) {
32             $channel->wait();
33         }
34 
35         // 关闭通道和连接
36         $channel->close();
37         $connection->close();
38     }
39 }   

      我们来模拟实现一个日志系统,分别开两个窗口来作为消费端监听,其中一个将监听消息写入日志,另外一个直接命令窗口输出,如下图:

      

     再开一个窗口来发送消息:

     

    4、路由模式(routing)

    生产者将消息发送到direct交换机,它会把消息路由到那些binding key 与 routing key 完全匹配的queue中。在相应队列监听的消费者才能消费消息。这样就能实现消费者有选择的去消费消息。

    

   说明: 如果routing key为black或者green,那么交换机会将消息路由到队列Q2中。

   

   说明: 如果routing key为black,那么交换机会将消息路由到队列Q1和队列Q2中。

   应用场景

   我们的日志系统广播所有的消息给所有的消费者(consumers)。我们打算扩展它,使其基于日志的严重程度进行消息过滤。例如我们也许只是希望将比较严重的错误(error)日志写入磁盘,以免在警告(warning)或者信息(info)日志上浪费磁盘空间。

   下面参考代码:

   1)生产端

 1 <?php
 2 
 3 namespace consolecontrollers;
 4 
 5 use PhpAmqpLibConnectionAMQPStreamConnection;
 6 use PhpAmqpLibMessageAMQPMessage;
 7 use yiiconsoleController;
 8 
 9 class SendController extends Controller
10 {
11     public function actionIndex4($argv)
12     {
13         // RabbitMQ: 路由模式
14         $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
15         $channel = $connection->channel();
16 
17         // 声明交换机
18         $channel->exchange_declare('direct_logs', 'direct', false, false, false);
19         if (strpos($argv, '/') !== false) {
20             $arr = explode('/', $argv);
21             // routing_key
22             $severity = $arr[0];
23             // 消息内容
24             $data = $arr[1];
25         } else {
26             echo '参数错误';
27             return;
28         }
29         // 发送消息到我们命名为direct_logs的交换机
30         $msg = new AMQPMessage($data);
31         $channel->basic_publish($msg, 'direct_logs', $severity);
32         echo "[x] Sent ", $data, "
";
33 
34         // 关闭通道和连接
35         $channel->close();
36         $connection->close();
37     }
38 }

    2)消费端

 1 <?php
 2 
 3 namespace consolecontrollers;
 4 
 5 use PhpAmqpLibConnectionAMQPStreamConnection;
 6 use yiiconsoleController;
 7 
 8 class ReceiveController extends Controller
 9 {
10     public function actionIndex4($argv)
11     {
12         // RabbitMQ: 路由模式
13 
14         $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
15         $channel = $connection->channel();
16 
17         // 声明交换机
18         $channel->exchange_declare('direct_logs', 'direct', false, false, false);
19         list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
20 
21         // 为我们感兴趣的每个严重级别分别创建一个新的绑定
22         if (strpos($argv, '/') !== false) {
23             $severities = explode('/', $argv);
24             foreach ($severities as $severity) {
25                 // 第三个参数为routing_key
26                 $channel->queue_bind($queue_name, 'direct_logs', $severity);
27             }
28         } else {
29             $severity = $argv;
30             $channel->queue_bind($queue_name, 'direct_logs', $severity);
31         }
32 
33         echo ' [*] Waiting for logs. To exit press CTRL+C', "
";
34 
35         // 接收消息进行处理的回调函数
36         $callback = function ($msg) {
37             echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "
";
38         };
39         $channel->basic_consume($queue_name, '', false, true, false, false, $callback);
40         while (count($channel->callbacks)) {
41             $channel->wait();
42         }
43 
44         // 关闭通道和连接
45         $channel->close();
46         $connection->close();
47     }
48 }    

    消费端监听消息:

     

    生产端发送消息:

    

   5、主题模式(Topic) - 路由模式的一种

      主题交换机是很强大的,它可以表现出跟其他交换机类似的行为。

  •       当一个队列的绑定键为 "#"(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。
  •       当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。

     1)"#" 和 "*" 符号代表通配符

     2)""#"代表零个或者多个单词, "*" 代表一个单词

     3)路由功能添加模糊匹配

     4)消息产生者产生消息,把消息交给交换机

     5)单词之间用英文句点"."隔开

      

     工作过程:

     交换机根据binding key的规则,使用routing Key来模糊匹配到对应的队列,由队列的监听消费者接收消息消费。如果没有匹配到相应队列,则消息被丢弃。

     1)生产端

 1 <?php
 2 
 3 namespace consolecontrollers;
 4 
 5 use PhpAmqpLibConnectionAMQPStreamConnection;
 6 use PhpAmqpLibMessageAMQPMessage;
 7 use yiiconsoleController;
 8 
 9 class SendController extends Controller
10 {
11     public function actionIndex5($argv)
12     {
13         // RabbitMQ: 主题模式
14         $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
15         $channel = $connection->channel();
16 
17         // 声明交换机
18         $channel->exchange_declare('topic_logs', 'topic', false, false, false);
19         if (strpos($argv, '/') !== false) {
20             $arr = explode('/', $argv);
21             // routing_key
22             $routing_key = $arr[0];
23             // 消息内容
24             $data = $arr[1];
25         } else {
26             echo '参数错误';
27             return;
28         }
29 
30         // 发送消息
31         $msg = new AMQPMessage($data);
32         $channel->basic_publish($msg, 'topic_logs', $routing_key);
33         echo "[x] Sent ", $routing_key, ':', $data, "
";
34 
35         // 关闭通道和连接
36         $channel->close();
37         $connection->close();
38     }
39 }

     2)消费端

 1 <?php
 2 
 3 namespace consolecontrollers;
 4 
 5 use PhpAmqpLibConnectionAMQPStreamConnection;
 6 use yiiconsoleController;
 7 
 8 class ReceiveController extends Controller
 9 {
10     public function actionIndex5($argv)
11     {
12         // RabbitMQ: 主题模式
13         $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
14         $channel = $connection->channel();
15 
16         // 声明交换机
17         $channel->exchange_declare('topic_logs', 'topic', false, false, false);
18         list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
19 
20         // 多个队列和这个名称为topic_logs的交换机绑定
21         if (strpos($argv, '/') !== false) {
22             $binding_keys = explode('/', $argv);
23             foreach ($binding_keys as $binding_key) {
24                 // 第三个参数为routing_key
25                 $channel->queue_bind($queue_name, 'topic_logs', $binding_key);
26             }
27         } else {
28             $binding_key = $argv;
29             $channel->queue_bind($queue_name, 'topic_logs', $binding_key);
30         }
31 
32         echo ' [*] Waiting for logs. To exit press CTRL+C', "
";
33 
34         // 接收消息进行处理的回调函数
35         $callback = function ($msg) {
36             echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "
";
37         };
38         $channel->basic_consume($queue_name, '', false, true, false, false, $callback);
39         while (count($channel->callbacks)) {
40             $channel->wait();
41         }
42 
43         // 关闭通道和连接
44         $channel->close();
45         $connection->close();
46     }
47 
48 }    

    我们分别开4个窗口,监听消息

   

    发送消息后

    

   6、RPC模式- 远程过程调用

    这里面有两个重要的概念:

    1) replyTo:  存储回调队列的名称

    2) correlationId:  唯一标识本次的请求,主要用于RPC调用。

 

    工作过程:

     使用 RabbitMQ 实现 RPC,相应的角色是由生产者来作为客户端,消费者作为服务端。

     但 RPC 调用一般是同步的,客户端和服务器也是紧密耦合的。即客户端通过 IP/域名和端口链接到服务器,向服务器发送请求后等待服务器返回响应信息。

     但 MQ 的生产者和消费者是完全解耦的,那么如何用 MQ 实现 RPC 呢?很明显就是把 MQ 当作中间件,实现一次双向的消息传递

     客户端和服务端既是生产者也是消费者。客户端发布请求,消费响应;服务端消费请求,发布响应。

 

     纸上谈兵终觉浅,后面有空再来补充一些RabbitMQ在项目中使用的案例。

     

参考链接:

https://www.rabbitmq.com/getstarted.html

原文地址:https://www.cnblogs.com/hld123/p/14687401.html