rabbitmq 延时队列

前言

某个产品 或者订单,有个有效期 过了有效期要取消

方法一 : 写个脚本,用crontab 定时扫描 改变状态 但是最低只能一分钟 ,不适合

方法二 : 用swoole得毫秒定时器,每秒钟去扫描表 明显占用资源 mysql受不了 

方法三 :用rabbitmq延时队列 一开始将其丢入mq 死信队列,设置有效期,过时转发到其他队列,再启动一个消费者 消费  更改表状态 

php 安装mq扩展

https://www.cnblogs.com/brady-wang/p/7662393.html

搭建mq服务

https://www.cnblogs.com/brady-wang/p/7660174.html

创建生产者和消费者

生产者  publish.php

<?php

header('Content-Type:text/html;charset=utf8;');
$time = 30;

$params = array(
	'exchangeName' => 'test_cache_exchange'."_".$time,
	'queueName' => 'test_cache_queue'."_".$time,
	'routeKey' => 'test_cache_route'."_".$time,
);

$connectConfig = array(
	'host' => '127.0.0.1',
	'port' => 5672,
	'login' => 'admin',
	'password' => 'password',
	'vhost' => '/'
);

//var_dump(extension_loaded('amqp'));
//
//exit();
try {
	$conn = new AMQPConnection($connectConfig);
	$conn->connect();
	if (!$conn->isConnected()) {
		//die('Conexiune esuata');
		//TODO 记录日志
		echo 'rabbit-mq 连接错误:', json_encode($connectConfig);
		exit();
	}
	$channel = new AMQPChannel($conn);
	if (!$channel->isConnected()) {
		// die('Connection through channel failed');
		//TODO 记录日志
		echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);
		exit();
	}
	$exchange = new AMQPExchange($channel);
	$exchange->setFlags(AMQP_DURABLE);//持久化
	$exchange->setName($params['exchangeName']?:'');
	$exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型
	$exchange->declareExchange();

	//$channel->startTransaction();

	$queue = new AMQPQueue($channel);
	$queue->setName($params['queueName']?:'');
	$queue->setFlags(AMQP_DURABLE);

	// 和普通生产者区别 在这 下面是过期时间和转发到的路由
	$queue->setArguments(array(
		'x-dead-letter-exchange' => 'delay_exchange',
		'x-dead-letter-routing-key' => 'delay_route',
		'x-message-ttl' => $time*1000,
	));
	$queue->declareQueue();

	//绑定
	$queue->bind($params['exchangeName'], $params['routeKey']);
} catch(Exception $e) {

}


//$num = mt_rand(100, 500);
$num = 1;

//生成消息
$exchange->publish(date("Y-m-d H:i:s"), $params['routeKey'], AMQP_MANDATORY, array('delivery_mode'=>2));

  

消费者 consumer.php

<?php

header('Content-Type:text/html;charset=utf8;');


$params = array(
	'exchangeName' => 'delay_exchange',
	'queueName' => 'delay_queue',
	'routeKey' => 'delay_route',
);

$connectConfig = array(
	'host' => 'localhost',
	'port' => 5672,
	'login' => 'admin',
	'password' => 'password',
	'vhost' => '/'
);

//var_dump(extension_loaded('amqp'));

//exit();

try {
	$conn = new AMQPConnection($connectConfig);
	$conn->connect();
	if (!$conn->isConnected()) {
		//die('Conexiuneesuata');
//TODO记录日志
		echo 'rabbit-mq连接错误:', json_encode($connectConfig);
		exit();
	}
	$channel = new AMQPChannel($conn);
	if (!$channel->isConnected()) {
		//die('Connectionthroughchannelfailed');
//TODO记录日志
		echo 'rabbit-mqConnectionthroughchannelfailed:', json_encode($connectConfig);
		exit();
	}
	$exchange = new AMQPExchange($channel);
	$exchange->setFlags(AMQP_DURABLE);//声明一个已存在的交换器的,如果不存在将抛出异常,这个一般用在consume端
	$exchange->setName($params['exchangeName'] ?: '');
	$exchange->setType(AMQP_EX_TYPE_DIRECT);//direct类型
	$exchange->declareExchange();

//$channel->startTransaction();

	$queue = new AMQPQueue($channel);
	$queue->setName($params['queueName'] ?: '');
	$queue->setFlags(AMQP_DURABLE);
	$queue->declareQueue();

//绑定
	$queue->bind($params['exchangeName'], $params['routeKey']);
} catch (Exception$e) {
	echo $e->getMessage();
	exit();
}

function callback(AMQPEnvelope $message){
	global $queue;
	if ($message) {
		$body = $message->getBody();
		echo $body . PHP_EOL;
		$queue->ack($message->getDeliveryTag());
	} else {
		echo 'nomessage' . PHP_EOL;
	}
}
 
//$queue->consume('callback');第一种消费方式,但是会阻塞,程序一直会卡在此处
 
//第二种消费方式,非阻塞
$start = time();
while (true) {
	$message = $queue->get();
	if (!empty($message)) {
		echo $message->getBody()."--失效时间 ".date("Y-m-d H:i:s"). PHP_EOL;
		$queue->ack($message->getDeliveryTag());//应答,代表该消息已经消费
//		$end = time();
//		echo '<br>' . ($end - $start);

	} else {
		//echo'messagenotfound'.PHP_EOL;
	}
}

  

执行推送 我改了不同时间推送,会生成不同的交换机 路由 队列,因为我用得是direct类型  要一一匹配

消费者开启 

[root@localhost mq]# php consumer.php 
2020-07-18 11:07:22--失效时间 2020-07-18 11:07:42
2020-07-18 11:07:22--失效时间 2020-07-18 11:07:42
2020-07-18 11:07:23--失效时间 2020-07-18 11:07:43
2020-07-18 11:07:23--失效时间 2020-07-18 11:07:43
2020-07-18 11:13:04--失效时间 2020-07-18 11:13:24

2020-07-18 11:21:00--失效时间 2020-07-18 11:21:10
2020-07-18 11:21:32--失效时间 2020-07-18 11:22:02
2020-07-18 11:21:32--失效时间 2020-07-18 11:22:02
2020-07-18 11:21:22--失效时间 2020-07-18 11:22:12
2020-07-18 11:21:23--失效时间 2020-07-18 11:22:13
2020-07-18 11:21:23--失效时间 2020-07-18 11:22:13

  

发现正常,都是我设置的事件过期后就到处理队列,在这里消费,处理逻辑即可 

 参考 https://www.cnblogs.com/Zhangcsc/p/11739754.html

https://blog.csdn.net/weixin_34310369/article/details/92262465?utm_medium=distribute.pc_relevant.none-task-blog-baidujs-2

原文地址:https://www.cnblogs.com/brady-wang/p/13335104.html