【原创】RabbitMQ教程:php实现

参考:https://xiaoxiami.gitbook.io/rabbitmq_into_chinese_php/ (php-amqplib扩展库:demo

参考:https://zhuanlan.zhihu.com/p/63700605 (知乎)

简介:

不要用的太复杂,一般定义好一个直连交换机(route_key)和一个扇形交换机就可以用了。

生产端使用流程:

1,创建连接:connection

$connection = new AMQPStreamConnection('192.168.124.39', 5672, 'fort', 'fort');

2,创建信道:channel

$channel = $connection->channel();

3,创建消息:message

$msg = new AMQPMessage('Hello World!');

$msg = new AMQPMessage($data,
       array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
       );//持久化的消息,与queue都需要定义好持久化参数

4,创建队列:queue

#空交换机的时候才需要定义queue
$channel->queue_declare('hello', false, false, false, false);

$channel->queue_declare('task_queue', false, true, false, false);//持久的消息,第3个参数

5,创建交换机:exchange

#定义了交换机就不需要定义queue
$channel
->exchange_declare('logs', 'fanout', false, false, false); $channel->exchange_declare('direct_logs', 'direct', false, false, false); $channel->exchange_declare('topic_logs', 'topic', false, false, false);

6,发布消息:publish

$channel->basic_publish($msg, '', 'task_queue');

$channel->basic_publish($msg, 'logs');//扇形交换机

$channel->basic_publish($msg, 'direct_logs', $severity);//直连交换机

$channel->basic_publish($msg, 'topic_logs', $routing_key);//主题交换机

7,关闭信道和连接:

$channel->closs();
$connection->close();

消费端使用流程

1,创建连接:connection

$connection = new AMQPStreamConnection('192.168.124.39', 5672, 'fort', 'fort');

2,创建信道:channel

$channel = $connection->channel();

3,申明队列:queue(消费端必须有queue)

//空交换机时,不需要申明交换机
$channel->queue_declare('hello', false, false, false, false);

//扇形,直连,主题交换机时
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); 

4,申明交换机:exchange

//扇形交换机
$channel->exchange_declare('logs', 'fanout', false, false, false);

//直连交换机
$channel->exchange_declare('direct_logs', 'direct', false, false, false);

//主题交换机
$channel->exchange_declare('topic_logs', 'topic', false, false, false);

5,交换机绑定到队列:

//绑定扇形交换机
$channel->queue_bind($queue_name, 'logs');

//绑定直连交换机
foreach($severities as $severity) {
    $channel->queue_bind($queue_name, 'direct_logs', $severity);
}

//绑定主题交换机
foreach($binding_keys as $binding_key) {
    $channel->queue_bind($queue_name, 'topic_logs', $binding_key);
}

6,消费队列;

//空交换机时
$channel->basic_consume('hello', '', false, true, false, false, $callback);
while ($channel->is_consuming()) {
    $channel->wait();
}

//扇形,直连,主题交换机时
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
    $channel->wait();
}

7,关闭信道和连接:

$channel->closs();
$connection->close();
原文地址:https://www.cnblogs.com/tkzc2013/p/13547205.html