rabbitMQ 通过Topic模式分发消息

Direct和Topic两种模式类似Mysql语言中精确和模糊查询,在Topic模式下有两个特殊字符,类似MySQL “%” 字符

  * (星号) 代表任意 一个单词

  # (井号) 0个或者多个单词

Topic模式可以很好的用于多维度场景。一个日志模块来收集处理不同的日志,日志区分包含三个维度的标准:模块、日志紧急程度、日志重要程度。模块分为:red、black、white;紧急程度分为:critical、normal;把重要程度分为:medium、low、high在RoutingKey字段中我们把这三个维度通过两个“.“连接起来。
现在我们需要对black模块,紧急程度为critical,重要程度为high的日志分配到队列1打印到屏幕;对所以模块重要程度为high的日志和white紧急程度为critical的日志发送到队列2持久化

  • RoutingKey为“black.critical.high”的日志会投递到queue1和queue2,。

  • RoutingKey为“red.critical.high”的日志会只投递到queue2。

  • RoutingKey为“white.critical.high”的日志会投递到queue2,并且虽然queue2的两个匹配规则都符合但只会向queue2投递一份。

测试结果

生产者:topic.php

<?php
// 生产者 p.php
//配置信息
$config = [
    'host'     => 'localhost',
    'port'     => '5672',
    'login'    => 'guest',
    'password' => 'guest',
    'vhost'    => '/'
];

$exchangeName = 'e_topic';
$routeKey1    = "black.critical.high";
$routeKey2    = "red.critical.high";
$routeKey3    = "white.critical.high";

$message1 = 'black-critical-high!';
$message2 = 'red-critical-high!';
$message3 = 'white-critical-high!';

//创建连接和channel
$connect = new AMQPConnection($config);

if (!$connect->connect()) {
    die("Cannot connect to the broker!
");
}

$channel  = new AMQPChannel($connect);
$exchange = new AMQPExchange($channel);

$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_TOPIC);

// 1:不持久化到磁盘,宕机数据消失 2:持久化到磁盘
$exchange->setFlags(AMQP_DURABLE);

// 声明交换机
$exchange->declareExchange();

$exchange->publish($message1, $routeKey1);
$exchange->publish($message2, $routeKey2);
$exchange->publish($message3, $routeKey3);

  

消费者:c_topic1.php

<?php
// 消费者 c.php
//配置信息
$config = [
    'host'     => 'localhost',
    'port'     => '5672',
    'login'    => 'guest',
    'password' => 'guest',
    'vhost'    => '/'
];

$exchangeName = 'e_topic';
$queueName    = 'log_1';
$routeKey     = 'black.critical.high';
 
//创建连接和channel
$connect = new AMQPConnection($config);

if (!$connect->connect()) {
    die("Cannot connect to the broker!
");
}

$channel  = new AMQPChannel($connect);
$exchange = new AMQPExchange($channel);

$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_TOPIC);

// 1:不持久化到磁盘,宕机数据消失 2:持久化到磁盘
$exchange->setFlags(AMQP_DURABLE);

// 声明交换机
$exchange->declareExchange();

// 创建消息队列
$queue = new AMQPQueue($channel);
$queue->setName($queueName);

// 设置持久性
$queue->setFlags(AMQP_DURABLE);

// 声明消息队列
$queue->declareQueue();

$queue->bind($exchange->getName(), $routeKey);

// 接收消息并处理回调
$queue->consume('receive');

// 处理回调的方法
function receive($envelop, $queue){
    echo $envelop->getBody() . "
";

    // ACK 通知生产者任务完成
    $queue->ack($envelop->getDeliveryTag(), AMQP_NOPARAM);
}

消费者:c_topic2.php

<?php
// 消费者 c.php
//配置信息
$config = [
    'host'     => 'localhost',
    'port'     => '5672',
    'login'    => 'guest',
    'password' => 'guest',
    'vhost'    => '/'
];

$exchangeName = 'e_topic';
$queueName    = 'log2';
$routeKey     = '#.high';
 
//创建连接和channel
$connect = new AMQPConnection($config);

if (!$connect->connect()) {
    die("Cannot connect to the broker!
");
}

$channel  = new AMQPChannel($connect);
$exchange = new AMQPExchange($channel);

$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_TOPIC);

// 1:不持久化到磁盘,宕机数据消失 2:持久化到磁盘
$exchange->setFlags(AMQP_DURABLE);

// 声明交换机
$exchange->declareExchange();

// 创建消息队列
$queue = new AMQPQueue($channel);
$queue->setName($queueName);

// 设置持久性
$queue->setFlags(AMQP_DURABLE);

// 声明消息队列
$queue->declareQueue();

$queue->bind($exchange->getName(), $routeKey);

// 接收消息并处理回调
$queue->consume('receive');

// 处理回调的方法
function receive($envelop, $queue){
    echo $envelop->getBody() . "
";

    // ACK 通知生产者任务完成
    $queue->ack($envelop->getDeliveryTag(), AMQP_NOPARAM);
}

  

消费者:c_topic3.php

<?php
// 消费者 c.php
//配置信息
$config = [
    'host'     => 'localhost',
    'port'     => '5672',
    'login'    => 'guest',
    'password' => 'guest',
    'vhost'    => '/'
];

$exchangeName = 'e_topic';
$queueName    = 'log3';
$routeKey     = 'white.critical.*';
 
//创建连接和channel
$connect = new AMQPConnection($config);

if (!$connect->connect()) {
    die("Cannot connect to the broker!
");
}

$channel  = new AMQPChannel($connect);
$exchange = new AMQPExchange($channel);

$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_TOPIC);

// 1:不持久化到磁盘,宕机数据消失 2:持久化到磁盘
$exchange->setFlags(AMQP_DURABLE);

// 声明交换机
$exchange->declareExchange();

// 创建消息队列
$queue = new AMQPQueue($channel);
$queue->setName($queueName);

// 设置持久性
$queue->setFlags(AMQP_DURABLE);

// 声明消息队列
$queue->declareQueue();

$queue->bind($exchange->getName(), $routeKey);

// 接收消息并处理回调
$queue->consume('receive');

// 处理回调的方法
function receive($envelop, $queue){
    echo $envelop->getBody() . "
";

    // ACK 通知生产者任务完成
    $queue->ack($envelop->getDeliveryTag(), AMQP_NOPARAM);
}

  

  

原文地址:https://www.cnblogs.com/xiangdongsheng/p/14259590.html