thinkphp 实现rabbitMq常驻进程消费队列

1,项目一级目录新建一个server文件

#!/usr/bin/env php
<?php
try {
    require __DIR__ . "/start.php";
} catch (Exception $e) {}

$queueNames = config('queue.qnames');
if (empty($queueNames)) exit('队列名称未配置');

$option = $argv[1];
$queueName = isset($argv[2]) ? $argv[2] : null;
if ($queueName && !in_array($queueName, $queueNames)) exit('队列名称不存在');

if ($queueName) {
    unset($queueNames);
    $queueNames[] = $queueName;
}

echo "开始操作... 
";
switch ($option) {
    case 'start':
        startQueue($queueNames);
        break;

    case 'stop':
        stopQueue($queueNames);
        break;

    case 'reload':
        stopQueue($queueNames);
        startQueue($queueNames);
        break;

    case 'monitor':
        demon();
        break;

    default:
        echo '操作类型: start|stop|reload|monitor' . "
";
        break;
}

echo "结束操作... 
";

function startQueue(array $queueNames)
{
    $correct = 'dispatch/Correct/index$';
    $ret = isProcessExist($correct);
    if($ret ==  0){
        $cmd = "nohup php start.php dispatch/Correct/index >> /tmp/correct.log &";
        system($cmd,$result);
        ($result == 0) or die("$cmd 启动失败 
"); 
        echo "$correct 队列已启动 
"; 
    }else {
        echo "$correct 进程已存在,无需重启 
";
    }
    foreach ($queueNames as $_queueName) {
        $proccessname = "dispatch/Consume/index/qname/$_queueName$";
        if(isProcessExist($proccessname) == 1) {
            echo $_queueName . "进程已经存在,无需重启 
";
        }else {
            $command = "nohup php start.php dispatch/Consume/index/qname/" . $_queueName." >> /tmp/$_queueName.log &";
            system($command,$result);
            ($result == 0) or die("$command 启动失败 
");
            echo $_queueName . "队列已启动 
";
        } 
    }
}

function demon()
{
    $queues = config('queue.qnames'); 
    
    
    foreach($queues as $queue) {
        if(!isProcessExist($queue)) {
            try{
                startQueue([$queue]);
            }catch(Exception $e){
                echo "队列监控启动失败: ".$e->getMessage()."
";
            }
        }
    }
}

function stopQueue(array $queueNames)
{
    $redisconf = config('redis');
    $redis = libraryRedis::getInstance($redisconf);
    foreach ($queueNames as $key => $_queueName) {
        $redis->hset('script:signal', $_queueName, false);
        libraryQueue::getInstance()->addEvent('Stop', [])->push($_queueName);
        $proccessname = "dispatch/Consume/index/qname/$_queueName$";
        while(true) {
            usleep(200000); // 0.2s
            $result = isProcessExist($proccessname);
            if(!$result) {
                break;
            }
            unset($result);
        }
        echo $_queueName . "队列已停止 
";
    }
}

function isProcessExist($processname) 
{
    $ps = 'ps axu|grep "'.$processname.'"|grep -v "grep"|wc -l';
    $ret = shell_exec("$ps");
    $ret = rtrim($ret, "
");
    return $ret;
}
View Code

2,项目一级目录新建一个start.php

3,修改application/extra/queue.php

 

<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2016 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------

return [
//    'connector' => 'Sync'
    'qnames'=>[
        'Trade',        //投资,变现,返本相关
        'Activity',        //活动相关
        'Sms',            //发短信相关
        'Repay',        //借款人还款
        'Interest',     //计息,派息相关
        'DebtOnline',   //债权上线相关
        'Other'            //其他
    ],
    'driver' => env('QUEUE_DRIVER', 'redis'),
];
View Code
原文地址:https://www.cnblogs.com/spicy/p/7920867.html