rabbitmq之php客户端使用实例

一、 名词解释

  1. Connection、Channel

    Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。

    Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。

    Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。

  2. Queue:

    Queue(队列)是RabbitMQ的内部对象,用于存储消息,用下图表示。

    RabbitMQ中的消息都只能存储在Queue中,生产者(下图中的P)生产消息并最终投递到Queue中,消费者(下图中的C)可以从Queue中获取消息并消费



    多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。

  3. Message acknowledgment

    在实际应用中,可能会发生消费者收到Queue中的消息,但没有处理完成就宕机(或出现其他意外)的情况,这种情况下就可能会导致消息丢失。为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除;如果RabbitMQ没有收到回执并检测到消费者的RabbitMQ连接断开,则RabbitMQ会将该消息发送给其他消费者(如果存在多个消费者)进行处理。这里不存在timeout概念,一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的RabbitMQ连接断开。
    这里会产生另外一个问题,如果我们的开发人员在处理完业务逻辑后,忘记发送回执给RabbitMQ,这将会导致严重的bug——Queue中堆积的消息会越来越多;消费者重启后会重复消费这些消息并重复执行业务逻辑…

  4. Message durability

    如果我们希望即使在RabbitMQ服务重启的情况下,也不会丢失消息,我们可以将Queue与Message都设置为可持久化的(durable),这样可以保证绝大部分情况下我们的RabbitMQ消息不会丢失。但依然解决不了小概率丢失事件的发生(比如RabbitMQ服务器已经接收到生产者的消息,但还没来得及持久化该消息时RabbitMQ服务器就断电了),如果我们需要对这种小概率事件也要管理起来,那么我们要用到事务。由于这里仅为RabbitMQ的简单介绍,所以这里将不讲解RabbitMQ相关的事务。

  5. Prefetch count

    前面我们讲到如果有多个消费者同时订阅同一个Queue中的消息,Queue中的消息会被平摊给多个消费者。这时如果每个消息的处理时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。我们可以通过设置prefetchCount来限制Queue每次发送给每个消费者的消息数,比如我们设置prefetchCount=1,则Queue每次给每个消费者发送一条消息;消费者处理完这条消息后Queue会再给该消费者发送一条消息


  1. Exchange

    在上一节我们看到生产者将消息投递到Queue中,实际上这在RabbitMQ中这种事情永远都不会发生。实际的情况是,生产者将消息发送到Exchange(交换器,下图中的X),由Exchange将消息路由到一个或多个Queue中(或者丢弃)

  2. routing key

    生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。
    在Exchange Type与binding key固定的情况下(在正常使用时一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。
    RabbitMQ为routing key设定的长度限制为255 bytes。

  3. Binding

    RabbitMQ中通过Binding将Exchange与Queue关联起来,这样RabbitMQ就知道如何正确地将消息路由到指定的Queue了。


Binding key 在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key;消费者将消息发送给Exchange时,一般会指定一个routing key;当binding key与routing key相匹配时,消息将会被路由到对应的Queue中。这个将在Exchange Types章节会列举实际的例子加以说明。 在绑定多个Queue到同一个Exchange的时候,这些Binding允许使用相同的binding key。 binding key 并不是在所有情况下都生效,它依赖于Exchange Type,比如fanout类型的Exchange就会无视binding key,而是将消息路由到所有绑定到该Exchange的Queue。

二、生产者模型实例

  1. 生产者----队列-----消费者 模型实例

    (1)生产者代码
    require_once __DIR__ . '/vendor/autoload.php';
    
    use PhpAmqpLibConnectionAMQPStreamConnection;
    use PhpAmqpLibMessageAMQPMessage;
    //创建连接
    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    //创建信道
    $channel = $connection->channel();
    //声明队列
    $channel->queue_declare('hello', false, false, false, false);
    $msg = new AMQPMessage('Hello World!');
    //发送消息
    $channel->basic_publish($msg, '', 'hello');
    echo " [x] Sent 'Hello World!'
    ";
    $channel->close();
    $connection->close();
    

    (2)消费者代码

    require_once __DIR__ . '/vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;
    //创建连接
    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    //创建信道
    $channel = $connection->channel();
    //声明队列
    $channel->queue_declare('hello', false, false, false, false);
    echo " [*] Waiting for messages. To exit press CTRL+C
    ";
    //消费者回调函数
    $callback = function ($msg) {
        echo ' [x] Received ', $msg->body, "
    ";
    };
    //消费者消费数据
    $channel->basic_consume('hello', '', false, true, false, false, $callback);
    //等待
    while (count($channel->callbacks)) {
        $channel->wait();
    }
    $channel->close();
    $connection->close();
    

2.生产者---交换机(fanout)---队列-----消费者模型

交换机Fanout 类型,则会将消息发送给所有与该 Exchange 定义过 Binding 的所有 Queues 中去,其实是一种广播行为。

(1)生产者代码

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;
//创建连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
//创建信道
$channel = $connection->channel();
//声明fanout类型交换机
$channel->exchange_declare('logs', 'fanout', false, false, false);
$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
   $data = "info: Hello World!";
}
$msg = new AMQPMessage($data);
//发送消息给交换机
$channel->basic_publish($msg, 'logs');
echo ' [x] Sent ', $data, "
";
$channel->close();
$connection->close();

(2)消费者代码

    <?php
    require_once __DIR__ . '/vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;
    //创建连接
    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    //创建信道
    $channel = $connection->channel();
    //声明交换机
    $channel->exchange_declare('logs', 'fanout', false, false, false);
    //临时队列,queue名字是空字符串的队列
    list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
    //把队列和交换机绑定
    $channel->queue_bind($queue_name, 'logs');
    echo " [*] Waiting for logs. To exit press CTRL+C
";
    $callback = function ($msg) {
        echo ' [x] ', $msg->body, "
";
    };
    //消费者消费消息
    $channel->basic_consume($queue_name, '', false, true, false, false, $callback);
    //等待消息
    while (count($channel->callbacks)) {
        $channel->wait();
    }
    $channel->close();
    $connection->close();

3.生产者---交换机(direct)---队列-----消费者 模型

direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中

以上图的配置为例,我们以routingKey=”error”发送消息到Exchange,则消息会路由到Queue1(amqp.gen-S9b…,这是由RabbitMQ自动生成的Queue名称)和Queue2(amqp.gen-Agl…);如果我们以routingKey=”info”或routingKey=”warning”来发送消息,则消息只会路由到Queue2。如果我们以其他routingKey发送消息,则消息不会路由到这两个Queue中。

//info 是routing key

(1)生产者代码

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;
//创建连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
//创建信道
$channel = $connection->channel();
//声明direct类型交换机
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
$severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';
$data = implode(' ', array_slice($argv, 2));
if (empty($data)) {
    $data = "Hello World!";
}
$msg = new AMQPMessage($data);
//绑定交换机和路由key
$channel->basic_publish($msg, 'direct_logs', $severity);
echo ' [x] Sent ', $severity, ':', $data, "
";
$channel->close();
$connection->close();

(2)消费者代码

php . eceive_logs_direct.php info

info是 bind key


require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
//创建连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
//创建信道
$channel = $connection->channel();
//声明direct类型交换机
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$severities = array_slice($argv, 1);
if (empty($severities)) {
    file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]
");
    exit(1);
}

foreach ($severities as $severity) {
    //绑定交换机/队列/路由key(绑定key)
    $channel->queue_bind($queue_name, 'direct_logs', $severity);
}
echo " [*] Waiting for logs. To exit press CTRL+C
";
$callback = function ($msg) {
    echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "
";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while (count($channel->callbacks)) {
    $channel->wait();
}
$channel->close();
$connection->close();

4.生产者---交换机(topic)---队列-----消费者 模型

direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。

图解:
以上图的配置为例,我们以routingKey=”error”发送消息到Exchange,则消息会路由到Queue1(amqp.gen-S9b…,这是由RabbitMQ自动生成的Queue名称)和Queue2(amqp.gen-Agl…);如果我们以routingKey=”info”或routingKey=”warning”来发送消息,则消息只会路由到Queue2。如果我们以其他routingKey发送消息,则消息不会路由到这两个Queue中。

前面讲到direct类型的Exchange路由规则是完全匹配bindingkey与routingkey,但这种严格的匹配方式在很多情况下不能满足实际业务需求。topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定:

  • routing key为一个句点号“. ”分隔的字符串(我们将被句点号“.”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
  • binding key与routing key一样也是句点号“. ”分隔的字符串
  • binding key中可以存在两种特殊字符“”与“#”,用于做模糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)

(1)生产者代码

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;
//php emit_log_topic.php  a.a    data111
//a.a  routingkey
//data1 数据
//声明链接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
//声明信道
$channel = $connection->channel();
//声明topic 类型交换机
$channel->exchange_declare('topic_logs', 'topic', false, true, false);
$routing_key= 'a.a';
$routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';
$data = implode(' ', array_slice($argv, 2));
if (empty($data)) {
    $data = "Hello World123!";
}
//delivery_mode  2: 消息持久化
$msg = new AMQPMessage(
    $data,
    array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
$channel->basic_publish($msg, 'topic_logs', $routing_key);
echo ' [x] Sent ', $routing_key, ':', $data, "
";
$channel->close();
$connection->close();

(2)消费者代码

//php receive_logs_topic.php  two  a.*
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
//创建连接
$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
//创建信道
$channel = $connection->channel();
//声明topic类型交换机
$channel->exchange_declare('topic_logs', 'topic', false, true, false);
list($queue_name) = array_slice($argv, 1,1);
if (empty($queue_name)) {
    file_put_contents('php://stderr', "Usage: $argv[0] [queue_name]
");
    exit(1);
}
//第四个参数:消费者是否唯一
$channel->queue_declare($queue_name, false, true, false, false);
$binding_keys = array_slice($argv, 2);
if (empty($binding_keys)) {
    file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]
");
    exit(1);
}
foreach ($binding_keys as $binding_key) {
    $channel->queue_bind($queue_name, 'topic_logs', $binding_key);
}
echo " [*] Waiting for logs. To exit press CTRL+C
";
//在RabbitMQ中有一个prefetch_count的概念,这个参数的意思是允许Consumer最多同时处理几个任务。我的版本的RabbitMQ默认这个参数是3,也就是说如果某一个Consumer在收到消息后没有发送ACK确认包,RabbitMQ就会任务Consumer还在处理任务,当有3个消息都没有发送ACK确认包时,RabbitMQ就不会再发送消息给该Consumer。
$callback = function ($msg) {
    sleep(3);
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "
";

};
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);
while (count($channel->callbacks)) {
    $channel->wait();
}
$channel->close();
$connection->close();
原文地址:https://www.cnblogs.com/zxqblogrecord/p/14059698.html