PHP+RabbitMQ订单消息发布与订阅

订单模拟发布

<?php

/**
 * 发布消息
 * @Author: hdj
 * @Date:   2020-07-22 16:15:22
 */

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;

$exchange = 'orders';
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare($exchange, 'direct', false, false, false);
//生成订单号
$order_sn=time();
//订单生成后 处理积分
$arr = ['id' => rand(111,999),'order_sn' => 'score_ '. $order_sn];
$data = json_encode($arr);
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, $exchange, 'score');
echo 'Send score message: ' . $data . PHP_EOL;

//订单生成后 处理优惠券
$arr1 = ['id' => rand(111,999),'order_sn' => 'coupon_ '. $order_sn];
$data1 = json_encode($arr1);
$msg1 = new AMQPMessage($data1);
$channel->basic_publish($msg1, $exchange, 'coupon');
echo 'Send coupon message: ' . $data1 . PHP_EOL;


$channel->close();
$connection->close();

积分订阅

<?php

/**
 * 订阅消息
 * @Author: hdj
 * @Date:   2020-07-22 16:22:50
 */

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;

$exchange = 'orders';
$routerKey = 'score'; //只消费积分

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare($exchange, 'direct', false, false, false);
list($queueName, ,) = $channel->queue_declare("", false, false, true, false);

$channel->queue_bind($queueName, $exchange, $routerKey);

echo " [*] Waiting for messages. To exit press CTRL+C" .PHP_EOL;
$callback = function ($msg) {
    //echo " Received message:", $msg->body, PHP_EOL;
    echo ' Received message:',$msg->delivery_info['routing_key'], ':', $msg->body, PHP_EOL;
    sleep(1);  //模拟耗时执行
};
$channel->basic_consume($queueName, '', false, true, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

优惠券订阅

<?php

/**
 * 订阅消息
 * @Author: hdj
 * @Date:   2020-07-22 16:24:57
 */

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;

$exchange = 'orders';
$routerKey = 'coupon'; //只消费优惠券

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare($exchange, 'direct', false, false, false);
list($queueName, ,) = $channel->queue_declare("", false, false, true, false);

$channel->queue_bind($queueName, $exchange, $routerKey);

echo " [*] Waiting for messages. To exit press CTRL+C" .PHP_EOL;
$callback = function ($msg) {
    //echo " Received message:", $msg->body, PHP_EOL;
    echo ' Received message:',$msg->delivery_info['routing_key'], ':', $msg->body, PHP_EOL;
    sleep(1);  //模拟耗时执行
};
$channel->basic_consume($queueName, '', false, true, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

原文地址:https://www.cnblogs.com/houdj/p/13361873.html