php 操作RabbitMQ

基本流程图

如果exchange 没有绑定queue,则消息将会被丢弃
如果创建exchange,queue,并且已经绑定了,则可以直接使用
为了防止脚本出问题 可以配合supervisor

安装

  • 从网站 https://packagist.org 搜索rabbitmq插件
  • 使用composer安装插件composer require php-amqplib/php-amqplib

使用

  • 1.连接RabbitMQ服务器
  • 2.开始一个新的 channel
  • 3.新建一个exchange
  • 4.新建一个queue
  • 5.绑定queue和exchange
  • 6.发布一个消息
  • 7.建立一个消费者并注册一个回调函数
  • 8.监听数据

新建连接和channel

    <?php
    require "./vendor/autoload.php";
    use PhpAmqpLibConnectionAMQPStreamConnection;
    use PhpAmqpLibMessageAMQPMessage;
    
    
    $host   = "192.168.110.134";
    $port   = 5672;
    $user   = "test";
    $pass   = "test";
    
    $vhost  = "/";
    
    try{
        $connection = new AMQPStreamConnection($host, $port, $user, $pass, $vhost);
    }catch (Exception $e){
        echo 'Caught exception: ',  $e->getMessage(), "
";die;
    }
    
    $channel = $connection->channel();

新建一个exchange

/*
    name: $exchange
    type: fanout
    passive: false // don't check is an exchange with the same name exists
    durable: false // the exchange won't survive server restarts
    auto_delete: true //the exchange will be deleted once the channel is closed.
*/
try{
    $name = 'example_direct_exchange';
    $type = "direct";
    $passive = false;
    $durable = true;
    $auto_delete = true;
    $channel->exchange_declare($name, $type, $passive, $durable, $auto_delete);

}catch (Exception $e){
    echo 'Caught exception: ',  $e->getMessage(), "
";die;
}

参数 name

exchange名称

参数 type

exchange类型
fanout 是广播类型的消息 会给所有绑定的queue发送数据

参数 passive

true    

1.如果exchange已存在 则直接连接 并且不检查配置 比如已存在的exchange是fanout,新需要建立的是direct,也不会报错;

2.如果exchange不存在 则直接报错

false
1.如果exchange不存在 则创建新的exchange

2.如果exchange已存在 则判断配置是否相同。如果配置不相同 则直接报错。比如已存在的exchange是fanout,新需要建立的是direct,会报错。

参数 auto_delete

true 
当最后一个消费者取消订阅之后 exchange会被自动删除 一般用于临时exchange

新建一个queue

/*
    name: $queue    // should be unique in fanout exchange.
    passive: false  // don't check if a queue with the same name exists
    durable: false // the queue will not survive server restarts
    exclusive: false // the queue might be accessed by other channels
    auto_delete: true //the queue will be deleted once the channel is closed.
*/
$queue1 = 'example_direct_queue_1';

$channel->queue_declare($queue1, false, true, false, false);

将queue和exchange绑定起来

    $queue1 = 'example_direct_queue_1';
    $exchange_name = 'example_direct_exchange';
    
    $channel->queue_bind($queue1, $exchange_name);

发布一个消息

$exchange_name = 'example_direct_exchange';
$messageBody = array(
    'example_direct_value'=>date('Y-m-d H:i:s'),
);
$message = new AMQPMessage(json_encode($messageBody));
$channel->basic_publish($message, $exchange_name);

建立一个消费者并注册一个回调函数

/*
    queue: Queue from where to get the messages
    consumer_tag: Consumer identifier
    no_local: Don't receive messages published by this consumer.
    no_ack: Tells the server if the consumer will acknowledge the messages.
    exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
    nowait:
    callback: A PHP Callback
*/

$consumerTag = 'consumer';

$queue  = 'example_direct_queue_1';

$channel->basic_consume($queue, "", false, false, false, false,function($msg){


    $message = json_decode($msg->body, true);

    file_put_contents("./mq.log", $message,FILE_APPEND);

    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
});

参数no_ack

true
消息只有在返回一个ack之后,才会被删除

false
消息被取出之后 会被立即删除

监听数据

try {
    while (count($channel->callbacks)) {
        $channel->wait();
    }
}
catch(PhpAmqpLibExceptionAMQPTimeoutException $e){
    $channel->close();
    $channel->close();
}
原文地址:https://www.cnblogs.com/alin-qu/p/8312874.html