PHP RabbitMQ实现简单的延迟队列

1.TTL+死信队列(DLX)实现

TTL(x-message-ttl)是指队列中的消息在丢弃之前的可存活时间。死信队列是放置没有被成功消费且超过了TTL生存时间消息的队列,如果消息没有在指定的TTL时间内被成功消费,并且给需要延迟执行的队列绑定了死信交换机和死信队列,将信息publish到死信交换机中后可被绑定交换机的死信队列消费,利用这一特性可以实现延迟队列。

消息队列中的消息会在一下几种情况下变成死信

消息被拒绝(basic.reject / basic.nack),并且requeue = false;
消息TTL过期;
队列达到最大长度;

在声明被延迟的任务队列前,需要配置如下参数。x-message-ttl设置队列中消息的生存期,超过这个时间消息将变成死信,也可以在单条消息publish的时候设置ttl,rabbitmq会取两者中较小者。

$arguments = [
    'x-message-ttl' => 6000, //消息在丢弃之前的可存活时间
    'x-dead-letter-exchange' => $deadExchangeName, //死信发送的交换机名字
    'x-dead-letter-routing-key' => $deadRouteKey, //死信的路由键
];
$queue->setArguments($arguments);
 

 

消费者代码

//创建连接和channel
$connect = new AMQPConnection($config);

if (!$connect->connect()) {
    die("Cannot connect to the broker!
");
}

$channel  = new AMQPChannel($connect);

//**********************创建一个用于存放死信的交换机和队列*************
$deadExchangeName = 'dead_exchange';
$deadQueueName    = 'delayed_order';
$deadRouteKey     = 'delayed_order';

$deadExchange = new AMQPExchange($channel);
$deadExchange->setName($deadExchangeName);
$deadExchange->setType(AMQP_EX_TYPE_DIRECT);
$deadExchange->declareExchange();

$deadQueue = new AMQPQueue($channel);
$deadQueue->setName($deadQueueName);
$deadQueue->declareQueue();
$deadQueue->bind($deadExchange->getName(), $deadRouteKey);


//***********************创建被延迟的交换机和消息队列********************
$exchangeName = 'exchange1';
$queueName    = 'order';
$routeKey     = 'order';
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);

// 1:不持久化到磁盘,宕机数据消失 2:持久化到磁盘
// $exchange->setFlags(AMQP_DURABLE);

// 声明交换机
$exchange->declareExchange();

// 创建消息队列
$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$arguments = [
    'x-message-ttl' => 6000,
    'x-dead-letter-exchange' => $deadExchangeName, //死信发送的交换机
    'x-dead-letter-routing-key' => $deadRouteKey, //死信routeKey
];

// 设置持久性
// $queue->setFlags(AMQP_DURABLE);

$queue->setArguments($arguments);
// 声明消息队列
$queue->declareQueue();

$queue->bind($exchange->getName(), $routeKey);

// 向服务器队列推送10条消息
$msg = 'hello world 1';
$exchange->publish($msg, $routeKey, AMQP_NOPARAM, ['delivery_mode' => 2]);

生产者代码

$exchangeName = 'dead_exchange';
$queueName    = 'delayed_order';
$routeKey     = 'delayed_order';
 
//创建连接和channel
$connect = new AMQPConnection($config);

if (!$connect->connect()) {
    die("Cannot connect to the broker!
");
}

$channel  = new AMQPChannel($connect);
$exchange = new AMQPExchange($channel);

$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);

// 1:不持久化到磁盘,宕机数据消失 2:持久化到磁盘
// $exchange->setFlags(AMQP_DURABLE);

// 声明交换机
$exchange->declareExchange();

// 创建消息队列
$queue = new AMQPQueue($channel);
$queue->setName($queueName);
// $queue->setArgument('x-message-ttl', 5000);
// 设置持久性
// $queue->setFlags(AMQP_DURABLE);

// 声明消息队列
$queue->declareQueue();

$queue->bind($exchange->getName(), $routeKey);

// 接收消息并处理回调
$queue->consume('receive');

// 处理回调的方法
function receive($envelop, $queue){
    echo $envelop->getBody() . "
";

    // ACK 通知生产者任务完成
    $queue->ack($envelop->getDeliveryTag(), AMQP_NOPARAM);
}

  

  

原文地址:https://www.cnblogs.com/xiangdongsheng/p/14264493.html