RabbitMQ PHP 代码示例

docker 启动 rabbitmq,其中 --network=host 不必绑定任何端口,相当于宿主机本地,直接通过 localhost 就能访问。
 
docker run -d --name rabbit --network=host rabbitmq
docker exec -it rabbit bash
容器中> rabbitmq-plugins enable rabbitmq_management
 
然后浏览器访问: http://localhost:15672 登录账号默认: guest  gues
 
receive.php 文件
<?php
/**
 * 接收消息
 * php receive.php
 */

$config = [
    'host' => '127.0.0.1',
    'port' => '5672',
    'login' => 'guest',
    'password' => 'guest',
    'vhost'=>'/'
];

#定义名称
define('ExchangeName', 'exchange_name_1');
define('QueueName',    'queue_name_1');
define('RoutingKey',   'routing_key_1');

//创建连接
$conn = new AMQPConnection($config);
if(!$conn->connect()) die("Cannot connect to the broker!
");

//创建通道
$channel = new AMQPChannel($conn);

//创建交换机
$ex = new AMQPExchange($channel);
$ex->setName(ExchangeName);
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型
$ex->setFlags(AMQP_DURABLE); //持久化交换机
$ex->declareExchange();

//创建队列
$queue = new AMQPQueue($channel);
$queue->setName(QueueName);
$queue->setFlags(AMQP_DURABLE);//持久化队列
$queue->declareQueue();

//队列绑定到交换机
$queue->bind(ExchangeName,  RoutingKey);

//接收消息
$queue->consume(function ($envelope, $queue){

    $msg = $envelope->getBody();
    p($msg);

}, AMQP_AUTOACK); //自动应答


$conn->disconnect();

sent.php

<?php
/**
 * 发送消息
 * php sent.php <message>
 *
 */

$config = [
    'host' => '127.0.0.1',
    'port' => '5672',
    'login' => 'guest',
    'password' => 'guest',
    'vhost'=>'/'
];
#定义名称
define('ExchangeName', 'exchange_name_1');
define('QueueName',    'queue_name_1');
define('RoutingKey',   'routing_key_1');


//创建连接和channel
$conn = new AMQPConnection($config);
if(!$conn->connect()) die("Cannot connect to the broker!
");
$channel = new AMQPChannel($conn);

//创建交换机
$ex = new AMQPExchange($channel);
$ex->setName(ExchangeName);
$ex->setType(AMQP_EX_TYPE_DIRECT); // 设置交换机类型
$ex->setFlags(AMQP_DURABLE); //持久化交换机
$ex->declareExchange();


//创建队列
$queue = new AMQPQueue($channel);
$queue->setName(QueueName);
$queue->setFlags(AMQP_DURABLE); //持久化队列
$queue->declareQueue();

//队列绑定到交换机
$queue->bind(ExchangeName,  RoutingKey);


echo "Send: $argv[1] 
";
$ex->publish($argv[1],  RoutingKey);

打开两个命令行,分别执行两个命令

php sent.php hello

php receive.php

封装一个类

<?php


class Amqp extends AMQPConnection {

//    protected AMQPChannel $channel;
//    protected AMQPExchange $exchange;
//    protected AMQPQueue $queue;

    protected  $channel;
    protected  $exchange;
    protected  $queue;

    public function __construct($host='127.0.0.1', $port=5672, $user='guest', $passwd='guest', $vhost='/'){

        $credentials = is_array($host)? $host : [
            'host'      => $host,
            'port'      => $port,
            'login'     => $user,
            'password'  => $passwd,
            'vhost'     => $vhost
        ];
        parent::__construct($credentials);

    }


    /**
     * 创建频道
     * @return AMQPChannel
     * @throws AMQPConnectionException
     */
    public function channel(){

        if(!$this->channel){
            parent::connect();
            $this->channel = new AMQPChannel($this);
        }
        return $this->channel;
    }

    /**
     * 创建交换器
     * @param string $name
     * @param string $type
     * @param null $flags
     * @return AMQPExchange
     * @throws AMQPConnectionException
     * @throws AMQPExchangeException
     */
    public function exchange($name='', $type='', $flags=null){

        if(!$this->exchange){
            $this->exchange = new AMQPExchange($this->channel());

        }
        $name && $this->exchange->setName($name);
        $type && $this->exchange->setType($type);
        is_integer($flags) && $this->exchange->setFlags($flags);

        return $this->exchange;

    }

    /**
     * 创建队列
     * @param string $name
     * @param null $flags
     * @return AMQPQueue
     * @throws AMQPConnectionException
     * @throws AMQPQueueException
     */
    public function queue($name='', $flags=null){

        if(!$this->queue){
            $this->queue = new AMQPQueue($this->channel());
        }

        $name && $this->queue->setName($name);
        is_integer($flags) && $this->queue->setFlags($flags);

        return $this->queue;

    }

    /**
     * 发布交换器
     * @return bool
     * @throws AMQPChannelException
     * @throws AMQPConnectionException
     * @throws AMQPExchangeException
     */
    public function declareExchange(){
        return $this->exchange()->declareExchange();
    }

    /**
     * 发布队列
     * @return int
     * @throws AMQPChannelException
     * @throws AMQPConnectionException
     * @throws AMQPQueueException
     */
    public function declareQueue(){
        return $this->queue()->declareQueue();
    }

    /**
     * 绑定队列
     * @param string $routing_key
     * @param array $arguments
     * @return bool
     * @throws AMQPChannelException
     * @throws AMQPConnectionException
     * @throws AMQPExchangeException
     * @throws AMQPQueueException
     */
    public function bindQueue($routing_key='', $arguments=[]){
        return $this->queue()->bind($this->exchange()->getName(), $routing_key, $arguments);
    }


    /**
     * 发布消息
     * @param callable|null $callback
     * @param int $flags
     * @param null $consumerTag
     * @return bool
     * @throws AMQPChannelException
     * @throws AMQPConnectionException
     * @throws AMQPExchangeException
     */
    public function publish($message, $routing_key=null,  $flags=AMQP_NOPARAM,  array $attributes=[]){
        return $this->exchange()->publish($message, $routing_key, $flags, $attributes);
    }

    public function consume(callable $callback=null, $flags=AMQP_NOPARAM, $consumerTag = null){
        $this->queue()->consume($callback, $flags, $consumerTag);
    }

    /**
     * 删除交换器
     * @param null $name
     * @param int $flags
     * @throws AMQPChannelException
     * @throws AMQPConnectionException
     * @throws AMQPExchangeException
     */
    public function deleteExchange($name = null, $flags=AMQP_NOPARAM){
        $this->exchange()->delete($name, $flags);
    }

    /**
     * 删除队列
     * @param int $flags
     * @throws AMQPChannelException
     * @throws AMQPConnectionException
     * @throws AMQPQueueException
     */
    public function deleteQueue($flags=AMQP_NOPARAM){
        $this->queue()->delete($flags);
    }


}

测试:test.php

#定义名称
define('ExchangeName', 'exchange_name_15');
define('QueueName',    'queue_name_15');
define('RoutingKey',   'routing_key_15');


//如果带有命令行参数,则表示发送消息
if(count($argv) > 1){

    $amqp = new Amqp();
    $amqp->channel();
    $amqp->exchange(ExchangeName, AMQP_EX_TYPE_DIRECT, AMQP_DURABLE)->declareExchange();
    $amqp->queue(QueueName, AMQP_DURABLE)->declareQueue();
    $amqp->bindQueue(RoutingKey);
    $amqp->publish($argv[1], RoutingKey);
    $amqp->disconnect();
}else{

    $amqp = new Amqp();
    $amqp->channel();
    $amqp->exchange(ExchangeName, AMQP_EX_TYPE_DIRECT, AMQP_DURABLE)->declareExchange();
    $amqp->queue(QueueName, AMQP_DURABLE)->declareQueue();
    $amqp->bindQueue(RoutingKey);

    //接收消息
    $amqp->consume(function($envelope, $queue){
        $msg = $envelope->getBody();
        echo $msg;
    }, AMQP_AUTOACK); //自动应答

}
原文地址:https://www.cnblogs.com/zbseoag/p/14279249.html