1.TTL+死信队列(DLX)实现
TTL(x-message-ttl)是指队列中的消息在丢弃之前的可存活时间。死信队列是放置没有被成功消费且超过了TTL生存时间消息的队列,如果消息没有在指定的TTL时间内被成功消费,并且给需要延迟执行的队列绑定了死信交换机和死信队列,将信息publish到死信交换机中后可被绑定交换机的死信队列消费,利用这一特性可以实现延迟队列。
消息队列中的消息会在一下几种情况下变成死信
消息被拒绝(basic.reject / basic.nack),并且requeue = false; 消息TTL过期; 队列达到最大长度;
在声明被延迟的任务队列前,需要配置如下参数。x-message-ttl设置队列中消息的生存期,超过这个时间消息将变成死信,也可以在单条消息publish的时候设置ttl,rabbitmq会取两者中较小者。
$arguments = [ 'x-message-ttl' => 6000, //消息在丢弃之前的可存活时间 'x-dead-letter-exchange' => $deadExchangeName, //死信发送的交换机名字 'x-dead-letter-routing-key' => $deadRouteKey, //死信的路由键 ]; $queue->setArguments($arguments);
消费者代码
//创建连接和channel $connect = new AMQPConnection($config); if (!$connect->connect()) { die("Cannot connect to the broker! "); } $channel = new AMQPChannel($connect); //**********************创建一个用于存放死信的交换机和队列************* $deadExchangeName = 'dead_exchange'; $deadQueueName = 'delayed_order'; $deadRouteKey = 'delayed_order'; $deadExchange = new AMQPExchange($channel); $deadExchange->setName($deadExchangeName); $deadExchange->setType(AMQP_EX_TYPE_DIRECT); $deadExchange->declareExchange(); $deadQueue = new AMQPQueue($channel); $deadQueue->setName($deadQueueName); $deadQueue->declareQueue(); $deadQueue->bind($deadExchange->getName(), $deadRouteKey); //***********************创建被延迟的交换机和消息队列******************** $exchangeName = 'exchange1'; $queueName = 'order'; $routeKey = 'order'; $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_DIRECT); // 1:不持久化到磁盘,宕机数据消失 2:持久化到磁盘 // $exchange->setFlags(AMQP_DURABLE); // 声明交换机 $exchange->declareExchange(); // 创建消息队列 $queue = new AMQPQueue($channel); $queue->setName($queueName); $arguments = [ 'x-message-ttl' => 6000, 'x-dead-letter-exchange' => $deadExchangeName, //死信发送的交换机 'x-dead-letter-routing-key' => $deadRouteKey, //死信routeKey ]; // 设置持久性 // $queue->setFlags(AMQP_DURABLE); $queue->setArguments($arguments); // 声明消息队列 $queue->declareQueue(); $queue->bind($exchange->getName(), $routeKey); // 向服务器队列推送10条消息 $msg = 'hello world 1'; $exchange->publish($msg, $routeKey, AMQP_NOPARAM, ['delivery_mode' => 2]);
生产者代码
$exchangeName = 'dead_exchange'; $queueName = 'delayed_order'; $routeKey = 'delayed_order'; //创建连接和channel $connect = new AMQPConnection($config); if (!$connect->connect()) { die("Cannot connect to the broker! "); } $channel = new AMQPChannel($connect); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_DIRECT); // 1:不持久化到磁盘,宕机数据消失 2:持久化到磁盘 // $exchange->setFlags(AMQP_DURABLE); // 声明交换机 $exchange->declareExchange(); // 创建消息队列 $queue = new AMQPQueue($channel); $queue->setName($queueName); // $queue->setArgument('x-message-ttl', 5000); // 设置持久性 // $queue->setFlags(AMQP_DURABLE); // 声明消息队列 $queue->declareQueue(); $queue->bind($exchange->getName(), $routeKey); // 接收消息并处理回调 $queue->consume('receive'); // 处理回调的方法 function receive($envelop, $queue){ echo $envelop->getBody() . " "; // ACK 通知生产者任务完成 $queue->ack($envelop->getDeliveryTag(), AMQP_NOPARAM); }