php模拟多线程下载

php多线程下载

上篇说到需要打包七牛文件, 所以需要先将七牛文件下载到本地。下载单个文件还是比较好实现的。

常用打开url的函数

代码写多了, 不仅会关心结果, 还会关心性能和代码的优雅。这次我希望同时下载多个文件, 而不是串行下载。主要用到了cURL函数。去官方手册看了下, 找到了, 但关于curl函数的介绍却很少,踩的坑也是一堆一堆的。

官网的例子

// 创建一对cURL资源
$ch1 = curl_init();
$ch2 = curl_init();
// 设置URL和相应的选项
curl_setopt($ch1, CURLOPT_URL, "http://www.example.com/");
curl_setopt($ch1, CURLOPT_HEADER, 0);
curl_setopt($ch2, CURLOPT_URL, "http://www.php.net/");
curl_setopt($ch2, CURLOPT_HEADER, 0);
// 创建批处理cURL句柄
$mh = curl_multi_init();
// 增加2个句柄
curl_multi_add_handle($mh,$ch1);
curl_multi_add_handle($mh,$ch2);
$running=null;
// 执行批处理句柄
do {
usleep(10000);
curl_multi_exec($mh,$running);
} while ($running > 0);
// 关闭全部句柄
curl_multi_remove_handle($mh, $ch1);
curl_multi_remove_handle($mh, $ch2);
curl_multi_close($mh);

上面的代码有完整的注释,但如果请求有返回该如何处理呢, 继续看手册, 找到了curl_multi_getcontent

处理请求的响应

$aURLs = array("http://www.php.net","http://www.w3cschools.com"); // array of URLs
$mh = curl_multi_init(); // init the curl Multi
$aCurlHandles = array(); // create an array for the individual curl handles
foreach ($aURLs as $id=>$url) { //add the handles for each url
    $ch = curl_setup($url,$socks5_proxy,$usernamepass);
    $ch = curl_init(); // init curl, and then setup your options
    curl_setopt($ch, CURLOPT_URL, $url);
    curl_setopt($ch, CURLOPT_RETURNTRANSFER,1); // returns the result - very important
    curl_setopt($ch, CURLOPT_HEADER, 0); // no headers in the output

    $aCurlHandles[$url] = $ch;
    curl_multi_add_handle($mh,$ch);
}

$active = null;
//execute the handles
do {
    $mrc = curl_multi_exec($mh, $active);
} 
while ($mrc == CURLM_CALL_MULTI_PERFORM);

while ($active && $mrc == CURLM_OK) {
    if (curl_multi_select($mh) != -1) {
        do {
            $mrc = curl_multi_exec($mh, $active);
        } while ($mrc == CURLM_CALL_MULTI_PERFORM);
    }
}

/* This is the relevant bit */
 // iterate through the handles and get your content
foreach ($aCurlHandles as $url=>$ch) {
    $html = curl_multi_getcontent($ch); // get the content
            // do what you want with the HTML
    curl_multi_remove_handle($mh, $ch); // remove the handle (assuming  you are done with it);
}
/* End of the relevant bit */
curl_multi_close($mh); // close the curl multi handler

第一次看到这样的代码, 我是懵逼的, 尤其是那两个循环。相关的资料很少,还好找到一篇,我来大致梳理下循环的流程。
先说说那几个常量的意思吧

CURLMcode

  • CURLM_CALL_MULTI_PERFORM (-1) This is not really an error. It means you should call curl_multi_perform again without doing select() or similar in between. Before version 7.20.0 this could be returned by curl_multi_perform, but in later versions this return code is never used.
  • CURLM_OK (0) Things are fine.

我们来看看第一个循环

$active = null;
//execute the handles
do {
    $mrc = curl_multi_exec($mh, $active);
} 
while ($mrc == CURLM_CALL_MULTI_PERFORM);

curl_multi_exec试图加载批处理句柄的一些信息。$mh是之前通过调用curl_multi_init生成的。$active和$mrc均是整数。curl_multi_exec将$active赋值为一个用来判断操作是否仍在执行的标识的引用。也就是说,如果你用该句柄处理5个URL, curl_multi_exec当它正在处理所有的URL时, 它就会返回5,然后当每个URL完成时,$active每次将会以步长为1递减直到为0。

继续看第二个循环

while ($active && $mrc == CURLM_OK) {
    if (curl_multi_select($mh) != -1) {
        do {
            $mrc = curl_multi_exec($mh, $active);
        } while ($mrc == CURLM_CALL_MULTI_PERFORM);
    }
}

这个循环是说

(while): 只要有活动连接且$mrc OK
    (if) 如果有数据
        (do/while) 处理数据 只要系统告诉我们一直保持获取

这个循环负责检查所有的socket是否都完成了。

理解代码之后, 我实现了第一版。但仔细思考后,好像有点问题, 当请求很多时, 我同时发出所有请求, 服务器能hold住吗?跑起来之后, 会不会把资源都耗尽了。这是一个很大的并发,不太合理。我们需要自己实现一个线程池,来掌控任务进度。

我们建立一个n个线程数的线程池,我们先通过curl_multi_add_handle将n个URL添加到线程池中,每执行完毕一个任务, 就将对应的句柄资源移除,同时加入新的URL,直到所有URL一次执行完毕。别人已经做好了, 我就把重要的代码贴出来

处理多个curl请求

/**
 * Performs multiple curl requests
 *
 * @throws RollingCurlException
 * @param array $requests 需要处理的url
 * @param int $window_size 线程池的容量
 * @return bool
 */
 function rolling_curl(array $requests, $window_size = 5) {
    // make sure the rolling window isn't greater than the # of urls
    if (count($requests) < $window_size)
        $window_size = count($requests);
    if ($window_size < 2) {
        throw new RollingCurlException("Window size must be greater than 1");
    }
    $master = curl_multi_init();        
    for ($i = 0; $i < $window_size; $i++) {
        $ch = curl_init();
        $options = []; //  配置项
        curl_setopt_array($ch,$options);
        curl_multi_add_handle($master, $ch);

        // Add to our request Maps
        $key = (string) $ch;
        $this->requestMap[$key] = $i;
    }
    do {
        while(($execrun = curl_multi_exec($master, $running)) == CURLM_CALL_MULTI_PERFORM);
        if($execrun != CURLM_OK) {
            break;
        }
        // 找出当前完成的请求
        while($done = curl_multi_info_read($master)) {
            // 添加新的请求前, 先将旧的删除掉
            if ($i < count(requests) && isset($requests[$i])) {
                $ch = curl_init();
                $options = []; // 配置项
                curl_setopt_array($ch,$options);
                curl_multi_add_handle($master, $ch);
                $i++;
            }

            // 删除已完成的句柄
            curl_multi_remove_handle($master, $done['handle']);

        }

        // Blocks until there is activity on any of the curl_multi connections. 防止cpu飙升
        if ($running) {
            curl_multi_select($master);
        }

    } while ($running);
    curl_multi_close($master);
    return true;
}

终于理顺了, 上面的代码能基本实现我的需求了,但像是面条代码,继续寻找社区的轮子。之前一直听说guzzle, 然后就看了下文档, 文档很清晰, 直接上手, 撸了个demo

use GuzzleHttpPsr7Request;
use GuzzleHttpClient;
use GuzzleHttpPool;
set_time_limit(0);
$client = new Client();
$urls = [
    'http://qxt-2017.cdn.xwg.cc/o_1bg5c4qca1j7vblh57qhl5aqu7.jpg',
    'http://qxt-2017.cdn.xwg.cc/o_1bg5c5p9tp08c9v18lb2u1ufvc.pptx',
    'http://qxt-2017.cdn.xwg.cc/2017-04-11_1491896251_lowb9SS8cBDjIOJ2jnIzZBDphY6s.mp4',
    'http://qxt-2017.cdn.xwg.cc/o_1bdba57vm1dps1g34igq1853mi87.docx',
    'http://qxt-2017.cdn.xwg.cc/2017-04-11_1491896395_lierbh4ZzMU8_2fSOUEUnXvgHQRo.mp4',
    'http://qxt-2017.cdn.xwg.cc/o_1befiphi67tn1bnrgf6fc1mtm7.ppt',
    'http://qxt-2017.cdn.xwg.cc/o_1befilahibnevnq1lp4170k1q9s7.xls',
    'http://qxt-2017.cdn.xwg.cc/FsGmw6A4WZvOgt-nPhFKW2pFSH1t'
];
$titles = [
    "264-141112102942604.jpg",
    '希望谷样板.pptx',
    '晓日.avi',
    '原理与发明测试.docx',
    '[dmzj][itazura_na_kiss][rv10][1280_720][12].rmvb',
    'web+of+science分析功能.ppt',
    '工-程-量-清-单-对-比-表.xls',
    'IMG_20170103_191314.jpg'
];
$requests = function () use ($urls) {
    foreach ($urls as $key => $url) {
        yield new Request('GET', $url); // [Generator syntax](http://php.net/manual/en/language.generators.syntax.php)
    }
};
$pool = new Pool($client, $requests(), [
    'concurrency' => 5,
    'fulfilled'   => function ($response, $index) use ($titles){
        file_put_contents($titles[$index], $response->getBody()->getContents()); //开始写文件
    },
    'rejected' => function ($reason, $index){
        print_r($reason); // 失败的原因
        echo $index; // 失败的索引
    },
]);
// 开始发送请求
$promise = $pool->promise();
$promise->wait();

一看很清晰, 以promise的方式来实现, 对js开发者蛮友好的, 条理也很清晰。

参考链接

原文地址:https://www.cnblogs.com/mingao/p/7478599.html