beanstalkd队列

 

数据结构里面的队列是先进先出的数据结构,在服务器开发中使用队列可以有效的实现任务的异步处理,把耗时的任务放在一个队列中,由消费者去自动处理,比如客户端用户把错题生成pdf文件下载,可以在客户端点击生成的时候把任务放入队列异步处理,然后直接先返回给用户结果,而不必让用户等待。

队列用处

  • 异步处理:用户注册后查找数据库发送邮件推荐商品,把推荐商品放入队列。
  • 系统解耦:上面的过程将用户注册和推荐商品分离开来了。
  • 定时任务: 把考生的试卷提交信息提前放入队列,每天固定某个时间实现自动批改选择题。

beanstalkd工具

beanstalked是一个简单的快速的通用的轻量级内存队列,可以实现百万级任务处理。

特性

  • 优先级:可以设置任务的优先级
  • 延迟: 设置任务多少秒后才允许被消费者使用
  • 持久化:定时刷新数据到文件,服务器挂掉后数据依旧存在
  • 超时会重发:消费者必须在指定时间内完成任务,否则就会重新放入管道
  • 任务预留:消费者先暂时跳过任务不处理
 
任务和管道
 
任务的状态转换

ready:任务已经准备好了,可以给消费者使用。
delay:任务放入管道的时候设置了延迟时间。
reserve:消费者把任务读取出来
buried:任务先放在一边,以后还会用
delete:任务从队列删除

安装

地址:https://github.com/kr/beanstalkd

可以通过源代码安装或者包管理工具(brew,apt-get)安装,安装完成后输入beanstalkd会由相应的命令.

 
image.png

首先使用下面的命令绑定本地地址和端口号

 beanstalkd -l 127.0.0.1 -p 11300 &

beanstalkd的类管理

可以使用pheantalkphp类操作beanstalkdpheantalk是对beanstalkd命令的封装。可以通过该类实现对任务的操作。

https://github.com/pda/pheanstalk

require 'vendor/autoload.php';
use PheanstalkPheanstalk;
$pheanstalk = new Pheanstalk('127.0.0.1',11300);
//查看beanstalk的当前信息命令行的封装
print_r($pheanstalk->stats());
 
 
比如查看某个任务的信息
print_r($pheanstalk->putInTube('mytube','job',1000));
$job = $pheanstalk->watch('mytube')->reserve();
print_r( $pheanstalk->statsJob($job));
 
 

还有其他的操作如下

require 'vendor/autoload.php';

use PheanstalkPheanstalk;

//现在命令行输入  beanstalkd -l 127.0.0.1 -p 11300 & 开启

$pheanstalk = new Pheanstalk('127.0.0.1',11300);

//查看beanstalk的当前信息

// 可以通过命令行对管道进行管理,本php类对管道的管理就是对beanstalkd命令行的封装

print_r($pheanstalk->stats());

// 当前的管道
print_r($pheanstalk->listTubes());

// 查看默认的管道
print_r($pheanstalk->statsTube('default'));

//在mytube 里面放入任务 one,使用默认的优先级,延迟重发时间,超时重发时间,
$pheanstalk->useTube('mytube')->put('one');


//从管道取出任务

$job = $pheanstalk->watch('mytube')->reserve();
//根据job的编号获取信息
$job = $pheanstalk->peek(1);

$pheanstalk->delete($job);//删除任务


$pheanstalk->watch('mytube')->delete($job);
//会把任务重新放入管道,并把任务设置为ready状态
$pheanstalk->release($job);

//把任务放在一边暂时不处理,条件成熟了在读取出来
$pheanstalk->bury($job);

$pheanstalk->peekBuried('mytube');// 获取处理bury状态的任务

$pheanstalk->kickJob($job);//会把job转变成ready状态

$pheanstalk->kick(200);//把job编号小于200的变成ready状态

//--- 获取某个状态的任务
$pheanstalk->peekDelayed();
$pheanstalk->peekReady();
$pheanstalk->peekBuried();

$pheanstalk->pauseTube('mytube',10);//阻塞整个管道时间

生成者和消费者

生产者将任务放入队列

require  'vendor/autoload.php';
use PheanstalkPheanstalk;
$pheanstalk = new Pheanstalk("127.0.0.1",11300);
$pheanstalk->useTube('mytube')->put('one',1024,10,3);
$pheanstalk->useTube('mytube')->put('two',1023);
$pheanstalk->useTube('mytube')->put('three',1025);
print_r($pheanstalk->stats());

消费者处理任务

require  'vendor/autoload.php';
use PheanstalkPheanstalk;
//终端等待表示没有生产者产生数据
$pheanstalk = new Pheanstalk("127.0.0.1",11300);
//reserve可以设置阻塞时间,不设置会一直等待,watch 可以同时监听两个管道,
$job = $pheanstalk->watch('mytube')->watch('default')->reserve(); // reserver是获取ready的job
$pheanstalk->ignore('default');//忽略管道
//处理代码
if ($job->getData() == 'one') {
    sleep(2);// 超时重回队列3秒,睡眠两秒,delete之前的代码还能执行1秒,如果想在加
    $pheanstalk->watch('mytube')->touch($job);//续命
}

具体对队列和任务的操作可以结合实际逻辑和设置任务和管道的状态。

参考:https://www.jianshu.com/p/82c4c6fee450

原文地址:https://www.cnblogs.com/cap-rq/p/11683967.html