PHP RabbitMQ 教程(三)

发布/订阅

我们在上一节创建了一个工作队列,并假定队列对应的任务传送给了某个客户端。在这一章节我们会做一些完全不一样的东西–我们会发送一条消息到多个消费者,也称之为“发布/订阅”模式。

为了说明这个模式,我们会创建一个简单的日志系统(logging system,以下简称日志系统),它由两个程序组成–第一个是发送日志信息,第二个是接收日志并打印。

日志系统的每一个运行的接收端程序都会接收信息,这样就可以运行一个接收端就把日志保存到硬盘里,同时运行另一个接收端去实时显示日志到屏幕。

本质上,日志内容是广播给所有的接收端的。

交换器

在之前的章节中我们从一个队列里发送和接收消息,现在该把完整的RabbitMQ消息模型介绍给大家了。

让我们快速的回看一遍在之前的章节中的内容:

>生产者是一个用来发送消息的程序

>队列是一个存储消息的缓冲区

>消费者是一个接收消息的程序

RabbitMQ消息模型的核心思想是,生产者永远不会直接发送给任何消息队列,实际上,生产者一般情况下甚至不知道消息应该发送给哪个队列。

生产者只能发送消息到交换器中,交换器非常简单。一方面从生产者接收消息,另一方面把消息推送到队列中。交换器必须知道如何处理接收到的消息,是推送到某个队列?推送到多个队列?还是丢弃这条消息。这个规则通过交换器类型(exchange type)来指定。

这里是交换器的几个类型:direct,topic,headers,fanout。这里我们主要关注最后一个–fanout,创建一个类型为 fanout 的交换器,命名为 logs。

$channel->exchange_declare('logs','fanout',false,false,false);

fanout交换器非常简单,你可以从名称中猜出它的功能,它把所有接收到的消息广播给所有它知道的队列,这也正是我们的日志系统需要的功能。

列出交换器

可以使用rabbitmqctl 命令列出服务器上的所有交换器:

sudo rabbitmqctl list_exchanges

Listing exchanges ...
        direct
amq.direct      direct
amq.fanout      fanout
amq.headers     headers
amq.match       headers
amq.rabbitmq.log        topic
amq.rabbitmq.trace      topic
amq.topic       topic
logs    fanout
...done.

结果中有一些amq.*和一些未命名的交换器,这是一些默认创建的交换器,它们不太可能是现在需要用到的。

未命名交换器

在之前的章节中我们对交换器一无所知,直到可以发送消息给队列。大概是因为我们当时正在使用一个以空字符串“”定义的默认的交换器。

回想一下之前怎么发布消息:

$channel->basic_publish($msg,'','hello');

这里就是使用默认或者说未命名的交换器:消息被routing_key的值 Here we use the default or nameless exchange: messages are routed to the queue with the name specified by routing_key, if it exists. The routing key is the second argument to basic_publish

现在,可以发布消息到这个队列。

$channel->exchange_declare('logs','fanout',false,false,false);
$channel->basic_publish($msg,'logs');

临时队列

也许你还记得在之前我们使用了一个指定的队列(还记得 hello 队列 和 task_queue 队列吗?)。可以命名一个队列是至关重要的–我们需要指定一个worker到同一个队列。当想让生产者和消费者使用同一个队列时给队列命名是非常重要的。

但是在我们的日志系统中情况不同了,我们想要接收所有的消息,不仅仅是其中的一部分,我们关心的是最新的消息而不是旧的,因此需要做两件事。

首先,当连接到RabbitMQ时,需要一个空的队列,可以手动创建一个名字随机的队列,或者,更好的办法是,让服务器为我们随机选一个队列名字。

其次,一旦与消费者失去连接,队列需要自动删除。大专栏  PHP RabbitMQ 教程(三)p>

php-amqplib中,当我们创建了一个名字为空的队列时,实际上是创建了一个被生成了名字的非持久化的队列。

list($queue_name, ,) = $channel->queue_declare("");

方法执行后,$queue_name变量包含了一个RabbitMQ生成的字符串。比如也许是这样的:amq.gen-JzTY20BRgKO-HjmUJj0wLg。

当连接被关闭的时候,队列也会被删掉,因为队列是独有的。

绑定(Bindlings)

我们已经创建了一个fanout类型的交换器和一个队列。现在需要让交换器发送消息给队列。交换器和队列之间的关系称之为绑定(binding)

$channel->queue_bind($queue_name,'logs');

现在开始,logs 交换器会把消息附加到队列中。

列出绑定(Listing bindings)

可以使用 rabbitmqctl list_bindings列出所有存在的正在使用的绑定。

整合

发送日志消息的生产者,与之前的代码看起来没什么不同,最重要的变化是现在想要发送消息到我们的 logs 交换器中,需要在发送时提供一个routing_key,但是在 fanout类型的交换器中这个值是可以忽略的。下边是emit_log.php的代码。

<?php

require_once __DIR__ .'/verdor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;

$connection = new AMQPStreamConnection('localhost',5672,'guest','guest');
$channel = $channel->channel();

$channel->exchange_declare('logs','fanout',false,false,false);

$data = implode(' ',array_slice($argv,1));

if(empty($data)) $data = "info:Hello World";
$msg = new AMQPMessage($data);

$channel->basic_publish($msg,'logs');

echo "[x]Sent ",$data,"n";

$channel->close();
$connection->close();
?>

(emit_log.php)

如你所见,建立连接后声明了交换器,这一步是必须的,因为发送消息到一个不存在的交换器是被禁止的。

如果还没有队列绑定到交换器,信息会丢失,但是这对于我们是可以的,如果没有消费者监听,我们可以安全的丢弃消息。

receive_logs.php:

<?php

require_once __DIR__ .'/vendor/autoload.php';
use PhpAmqpLibConnectionQMAPStreamConnection;

$connection = new AMQPStreamConnection('localhost',5672,'guest','guest');
$channel = $connection->channel();

$channel->queue_bind($queue_name,'logs');

echo '[*] Waiting for logs. To exit press CTRL+C',"n";

$callback = function($msg){
	echo '[x]',$msg->body,"n";
};

$channel->basic_consume($queue_name,'',false,true,false,false,$callback);

whild(count($channel->callbacks)){
	$channel->wait();
}

$channel->close();
$connection->close();
?>

(receive_logs.php)

如果想保存日志到文件中,可以在命令中输入

php receive_logs.php > logs_from_rabbit.log

如果想在屏幕上查看日志,新打开一个终端并运行:

php receive_logs.php

发送日志:

php emit_log.php

使用 rabbitmqctl list_bindings 可以确认代码确实创建了绑定和队列,当两个receive_logs.php在运行的时候会看到类似这样的:

sudo rabbitmqctl list_bindings
Listing bindings ...
logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
...done.

对于结果的解释很简单,logs交换器中的数据发送到两个服务器指定的队列,而这正是我们要实现的。

想要弄明白怎样去监听部分消息,转到第四部分。

原文地址:Publish/Subscribe

原文地址:https://www.cnblogs.com/lijianming180/p/12099814.html