php使用Pheanstalk实现消息队列

1.安装(linux安装beanstalkd, windows暂不支持)

# wget https://github.com/kr/beanstalkd/archive/v1.10.tar.gz

 # tar xzvf v1.10

 # cd beanstalkd-1.10/

# make && make install

# beanstalkd -v beanstalkd 1.10 2    

启动:
     进入解压的文件夹里面使用beanstalkd -l 127.0.0.1 -p 11300 &
 
2,php安装pheanstalk依赖

composer require pda/pheanstalk

(官网:https://github.com/pda/pheanstalk)

下面是自己写的封装的一个类里面的方法(类的话自己创建就可以了):

// 添加任务
    public static function addJob()
    {
        //创建队列消息

        $pheanstalk = self::const();
        print_r($pheanstalk->stats());//查看状态
        exit;
        $tubeName = 'email_list';

        $jobData = [
            'email' => '123456@163.com',
            'message' => 'Hello World !!',
            'dtime' => date('Y-m-d H:i:s'),
        ];

        $pda = $pheanstalk->useTube($tubeName)->put(json_encode($jobData));
        return $pda;
    }

    // 引用公共信息(自己写的方法连接Pheanstalk,首先在服务器开启beanstalkd)
    public static function const()
    {
        $pheanstalk = Pheanstalk::create('127.0.0.1', 11300);
        return $pheanstalk;
    }
消费 job

// 消费job
public static function Consumption()
{
$pheanstalk = self::const();
$tubeName = 'email_list';
$job = $pheanstalk->watch($tubeName)->ignore('default')->reserve();
if ($job !== false) {
$res = $pheanstalk->statsJob($job);
if ($res['reserves'] > 5) { //判断一个任务请求失败5次后直接删除
$pheanstalk->delete($job);
} else {
$job_data = $job->getData();
print_r($job_data);
exit;
self::subscribe($job_data);
$pheanstalk->delete($job);
}
/* 继续 Watch 下一个 job */
self::Consumption();
} else {
$pheanstalk->log->error('reserve false', 'reserve false');
}
}
// 添加到数据库
    public static function subscribe($job_data)
    {
        print_r($job_data); 
// 数据格式
{"time":"1212312412","email":"123456@163.com","message":"Hello World !!","dtime":"2020-09-28 15:12:14"}
    }

 

default_socket_timeout 这个参数是一定要加的,php 默认一般是 60s,假如您没有在代码里面设置,采用默认的话(60s),60s 之内如果没有 job 产生,脚本就会报 socket 错误,我写的是 7 天超时,您可以根据业务去调整,记住一定要配置,网上很多搜的 consumer 脚本都没有配置这个,根本不能投入生产环境使用,这是我亲自实践的结果。
  关于 while true 是否死循环,很明确告诉你是死循环,但是不会一直耗性能的那样执行下去,它会在 reserve 这里阻塞不动,直到有消息产生才会往下走,所以大可放心使用,我的项目代码里面是使用了方法调用方法自身去实现循环的。

4.Pheanstalk使用方法

维护方法        

    stats() 查看状态方法        

    listTubes() 目前存在的管道

    listTubesWatched() 目前监听的管道        

    statsTube() 管道的状态        

    useTube() 指定使用的管道        

    statsJob() 查看任务的详细信息        

    peek() 通过任务ID获取任务    

生产者方法        

    putInTube() 往管道中写入数据        

    put() 配合useTube()使用    

消费者方法        

    watch() 监听管道,可以同时监听多个管道        

    ignore() 不监听管道        

    reserve() 以阻塞方式监听管道,获取任务        

    reserveFromTube()        

    release() 把任务重新放回管道        

    bury() 把任务预留        

    peekBuried() 把预留任务读取出来        

    kickJob() 把buried状态的任务设置成ready        

    kick() 批量把buried状态的任务设置成ready        

    peekReady() 把准备好的任务读取出来        

    peekDelayed() 把延迟的任务读取出来        

    pauseTube() 给管道设置延迟        

    resumeTube() 取消管道延迟        

    touch() 让任务重新计算ttr时间,给任务续命

原文地址:https://www.cnblogs.com/smilevv/p/13744598.html