thinkphp5.1+think-queue

最近有一个需求,A用户充值积分到钱包,但是钱包只能在一分钟之后做出响应,那么就需要异步执行查看钱包是否到账的操作,本来打算用swoole异步,突然想到think-queue,那不妨就用对列来玩玩

本文参考 CSDN 鼠你有钱 tp5.1 + think-queue + supervisor博文 点此穿越

第一步 安装 think-queue

composer require topthink/think-queue

think-queue包地址 需要注意框架版本问题,现版本默认tp6框架

第二步 配置

安装好之后 默认会在 config文件夹下生成 queue.php配置文件

<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2016 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
use thinkEnv;

return [
    'connector' => 'redis',
    "expire"=>60,//任务过期时间默认为秒,禁用为null
    "default"=>"default",//默认队列名称
    "host"=>"127.0.0.1",//Redis主机IP地址
    "port"=>6379,//Redis端口
    "password"=>"******",//Redis密码
    "select"=>5,//Redis数据库索引
    "timeout"=>0,//Redis连接超时时间
    "persistent"=>false,//是否长连接
];

这里我使用的是 redis驱动 也可以根据官方文档选择数据库驱动

第三步 编写代码

在application/index/controller下创建Jobtest.php


<?php


namespace appindexcontroller;


use thinkQueue;

class Jobtest{
    public function actionWithHelloJob(){
        $params = request()->param();
        // 1.当前任务将由哪个类来负责处理。
        //   当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法
        $jobHandlerClassName  = 'appindexjobHello';

        // 2.当前任务归属的队列名称,如果为新队列,会自动创建
        $jobQueueName        = "send";

        // 3.当前任务所需的业务数据 . 不能为 resource 类型,其他类型最终将转化为json形式的字符串
        $data = $this->add($params);

        // 4.将该任务推送到消息队列,等待对应的消费者去执行

        // $isPushed = Queue::push( $jobHandlerClassName , $data , $jobQueueName );

        $isPushed = Queue::later(60,$jobHandlerClassName,$data,$jobQueueName); //把任务分配到队列中,延迟60s后执行

        // database 驱动时,返回值为 1|false  ;   redis 驱动时,返回值为 随机字符串|false
        if( $isPushed !== false ){
            echo '成功';
        }else{
            echo '错误';
        }
    }
    
    public function add($params){
        $data =[
            'order_no'=>$params['orderNo'],
            'msg'=>$params['orderNo'],
            'create_time'=>date('Y-m-d H:i:s'),
        ];
        Db::name('test')->insert($data);
    }
}

这里我创建了 test表来检测,你也可以根据自己的需求来创建数据库

CREATE TABLE `test` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `order_no` varchar(255),
  `msg` varchar(255),
  `create` varchar(255),  
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=13 DEFAULT CHARSET=utf8;

下来我们需要在 application/index 下在创建一个job目录 在job目录下创建Hello.php文件

<?php
namespace appindexjob;
use thinkqueueJob;
use thinkDb;

class Hello {
    public function fire(Job $job,$data) {
        // 有些消息在到达消费者时,可能已经不再需要执行了
        $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
        if(!$isJobStillNeedToBeDone){
            $job->delete();
            return;
        }

        $isJobDone = $this->doHelloJob($data);

        if ($isJobDone) {
            // 如果任务执行成功, 记得删除任务
            $job->delete();
        }else{
            $job->release(3); //$delay为延迟时间
        }
        if ($job->attempts() > 3) {
            //通过这个方法可以检查这个任务已经重试了几次了
            print("<warn>Hello Job has been retried more than 3 times!"."</warn>
");

            $job->delete();

            // 也可以重新发布这个任务
            //print("<info>Hello Job will be availabe again after 2s."."</info>
");
            //$job->release(2); //$delay为延迟时间,表示该任务延迟2秒后再执行
        }
    }
    
    public function failed($data)
    {
        // ...任务达到最大重试次数后,失败了
    }

    /**
    * 有些消息在到达消费者时,可能已经不再需要执行了
    * @param array|mixed    $data     发布任务时自定义的数据
    * @return boolean                 任务执行的结果
    */
    private function checkDatabaseToSeeIfJobNeedToBeDone($data){
        return true;
    }

    /**
    * 根据消息中的数据进行实际的业务处理...
    */
    private function doHelloJob($data)
    {
        //根据你的业务需求写逻辑即可 成功返回true失败返回false即可
    }
}

到这里代码基本就完成了,我们使用浏览器来访问我们的 Jobtest下actionWithHelloJob方法

然后在终端执行

php think queue:work --queue send 

这里的 send就是你的对列名称

执行后我们可以在redis里看到具体的数据对列 如果你没有安装redis那就需要在安装think-queue之前安装redis扩展

第四步 使用supervisor 将queue进程常驻

以下内容均来自 CSDN 鼠你有钱 tp5.1 + think-queue + supervisor博文 点此穿越  如有侵权 可联系本人第一时间删除
1.安装supervisor

# yum install epel-release
# yum install supervisor

//设置成开机自动启动
# systemctl enable supervisord

2.配置

在这里我创建了一个命名为supervisor的目录用于存放supervisor和队列的日志文件以及include的配置文件,其目录结构为:

/var/supervisor/log/    #可以自定义
               /run/    #可以自定义
               /conf/   #可以自定义

然后找到/etc/supervisord.conf配置文件,编辑如下信息:

; 将supervisor.sock 的路径换成如下
[unix_http_server]
file=/var/supervisor/run/supervisor.sock   ; (the path to the socket file)

; 将supervisord.log 和 supervisord.pid 的路径换成如下
[supervisord]
logfile=/var/supervisor/log/supervisord.log  ; (main log file;default $CWD/supervisord.log)
pidfile=/var/supervisor/run/supervisord.pid ; (supervisord pidfile;default supervisord.pid)

; 将supervisor.sock 的路径换成如下
[supervisorctl]
serverurl=unix:///var/supervisor/run/supervisor.sock ; use a unix:// URL  for a unix socket

; 将最底部的files路径换成如下
[include]
files = /var/supervisor/conf/*.conf

/var/supervisor/conf目录里创建一个.conf文件,这里命名为queue_work.conf,内容如下:

[program:queue_worker] ;项目名称
directory = /opt/www/tp5.1 ; 程序的启动目录,项目根目录的上一级
command = php think queue:work --queue queueName --daemon ; 启动命令 queueName就是队列名
process_name=%(program_name)s_%(process_num)02d
numprocs = 3         ; 开启的进程数量
autostart = true     ; 在 supervisord 启动的时候也自动启动
startsecs = 5        ; 启动 5 秒后没有异常退出,就当作已经正常启动了
autorestart = true   ; 程序异常退出后自动重启
startretries = 3     ; 启动失败自动重试次数,默认是 3
user = root          ; 用哪个用户启动
redirect_stderr = true  ; 把 stderr 重定向到 stdout,默认 false
stdout_logfile_maxbytes = 50MB  ; stdout 日志文件大小,默认 50MB
stdout_logfile_backups = 20     ; stdout 日志文件备份数
; stdout 日志文件,需要手动创建目录(supervisord 会自动创建日志文件)
stdout_logfile = /var/supervisor/log/queue_worker.log
loglevel=info

对于index这个单模块而言,不同的业务逻辑为了区分可能会存在多个队列名,这种情况将多个队列名用逗号拼接起来:

command = php think queue:work --queue queueName1,queueName2 --daemon ;

重启

# systemctl stop supervisord
# systemctl start supervisord

# systemctl restart supervisord

 

原文地址:https://www.cnblogs.com/we-jack/p/14023333.html