TP6(thinkphp6)队列与延时队列

安装

在此我就不再略过TP6的项目创建过程了,大致就是安装composer工具,安装成功以后,再使用composer去创建项目即可。

think-queue 安装

composer require topthink/think-queue

项目中添加驱动配置

我们需要在安装好的config下找到 queue.php

<?php
return [
    'default'     => 'redis',
    'connections' => [
        'sync'     => [
            'type' => 'sync',
        ],
        'database' => [
            'type'       => 'database',
            'queue'      => 'default',
            'table'      => 'jobs',
            'connection' => null,
        ],
        'redis'    => [
            'type'       => 'redis',
            'queue'      => 'default',
            'host'       => '127.0.0.1',
            'port'       => 6379,
            'password'   => '',
            'select'     => 4,
            'timeout'    => 0,
            'persistent' => false,
        ],
    ],
    'failed'      => [
        'type'  => 'none',
        'table' => 'failed_jobs',
    ],
];

生产者

<?php
namespace app\controller;

use app\BaseController;
use think\facade\Queue;

class Index extends BaseController
{
    public function queue()
    {
        //当前任务将由哪个类来负责处理。
        //当轮到该任务时,系统将生成一个该类的实例,并默认调用其 fire 方法
        $jobHandlerClassName = 'app\Job\Order';

        //当前任务归属的队列名称,如果为新队列,会自动创建
        //php think queue:work --queue orderJobQueue
        //php think queue:work --queue orderJobQueue --daemon

        $jobQueueName = "orderJobQueue";

        //数组数据
        $orderData = [
            'id'      => uniqid(),
            'time'    => time(),
            'message' => 'later message83'
        ];

        //将该任务推送到消息队列,等待对应的消费者去执行
        //这里只是负责将数据添加到相应的队列名称的队列里,消费者与生产者并无联系

        //立即执行
        $isPushed = Queue::push($jobHandlerClassName, $orderData, $jobQueueName);
        //延迟10秒后执行
        //$isPushed = Queue::later(10, $jobHandlerClassName, $orderData, $jobQueueName);

        if ($isPushed !== false) {
            echo date('Y-m-d H:i:s') . " 队列添加成功";
        } else {
            echo '队列添加失败';
        }
    }
}

消费者

<?php
namespace app\Job;

use think\facade\Log;
use think\queue\Job;

/**
 * @Title: app\task\job$Order
 * @Package package_name
 * @Description: todo(测试订单消费者)
 * @author Jack
 */
class Order
{
    /**
     * @Title: fire
     * @Description: todo(fire方法是消息队列默认调用的方法)
     * @param Job $job
     * @param array $data
     * @author Jack
     * @throws
     */
    public function fire(Job $job, array $data)
    {
        //有些消息在到达消费者时,可能已经不再需要执行了
        $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
        if(!$isJobStillNeedToBeDone){
            $job->delete();
            return;
        }
        $jobId =  $job->getJobId();
        $isJobDone = $this->orders($data, $jobId);
        if ($isJobDone) {
            //如果任务执行成功,记得删除任务
            $job->delete();
        } else {
            //通过这个方法可以检查这个任务已经重试了几次了
            if ($job->attempts() > 3){
                Log::error('试了3次了');
                $job->delete();

                //也可以重新发布这个任务
                //print("<info>Hello Job will be availabe again after 2s."."</info>\n");
                //$job->release(2); //$delay为延迟时间,表示该任务延迟2秒后再执行
            }
        }
    }

    /**
     * @Title: checkDatabaseToSeeIfJobNeedToBeDone
     * @Description: todo(有些消息在到达消费者时,可能已经不再需要执行了)
     * @param array $data
     * @return boolean
     * @author Jack
     * @throws
     */
    private function checkDatabaseToSeeIfJobNeedToBeDone($data)
    {
        return true;
    }

    /**
     * @Title: orders
     * @Description: todo(数据处理)
     * @param array $data
     * @author Jack
     * @throws
     */
    public function orders(array $data,  $jobId)
    {
        //对订单进行数据库操作或其他等等
        Log::info(date('Y-m-d H:i:s') . ' - data:' . json_encode($data));
        return true;
    }
}

服务器执行常驻命令

php think queue:work --queue orderJobQueue
原文地址:https://www.cnblogs.com/dawuge/p/15703634.html