php Pthread 多线程 (六) Pool类 线程池

Pool对象是多个Worker对象的容器,同时也是它们的控制器,对Worker功能更高抽象。
比如Worker是河,而线程是运行在河里的船。Pool则是管理着多条河。
<?php
//继承Collectable垃圾收集类,好让Pool::collect进行收集 
class Sql extends Collectable {
    private $sql = '';
    private $data = array();

    public function __construct($sql) {
        $this->sql = $sql;
    }

    public function run() {
        $db = $this->worker->getDb();
        $res = mysql_query($this->sql, $db);
        $tmp = array();
        while($row = mysql_fetch_assoc($res)) {
            //这里不能使用$this->data[] = $row;这种方式。
            $tmp[] = $row;
        }
        $this->data = $tmp;
        //这里工作完后,设置为垃圾
        //在Pool::collect中isGarbage()判断时则为真
        $this->setGarbage();
    }

    public function getData() {
        return $this->data;
    }
}

class SqlWorker extends Worker {
    protected static $db = null;

    public function getDb() {
        if(!self::$db) {
            self::$db = mysql_connect('127.0.0.1', 'root', '');
            mysql_select_db('test', self::$db);
        }
        return self::$db;
    }
}

//这里创建5个Worker对象的Pool线程池
$pool = new Pool(5, 'SqlWorker');

//我们创建20个Sql线程对象,并提交到Pool的Worker中
$sqls = array();
for($ix = 0; $ix < 20; ++$ix) {
    $sql = new Sql("select * from test order by id limit {$ix},1");
    $sqls[] = $sql;

    //$pool->submit($sql);不要在这里submit
}

//注意,这里循环提交$sql
//如果把$pool->submit放到前面的for循环内,会出现一个错误
//第一个Sql对象的sql语句会跟最后一个相同,导致结果出现问题
foreach($sqls as $sql) {
    //这里的submit方法有问题,它会修改$sqls
    //导致第一个Sql对象与最后一个相同
    //不知是不是BUG
    $pool->submit($sql);
}

//等待队列中执行完成
$pool->shutdown();

$ret = array();
foreach($sqls as $sql) {
    $ret[] = $sql->getData();
}
file_put_contents('ret.txt', var_export($ret, true));

//回收已完成的对象
$pool->collect(function($sql){
    return $sql->isGarbage();
});
array (
  0 => 
  array (
    0 => 
    array (
      'id' => '20',
      'name' => 'mmm',
    ),
  ),
  1 => 
  array (
    0 => 
    array (
      'id' => '2',
      'name' => '222',
    ),
  ),
  2 => 
  array (
    0 => 
    array (
      'id' => '3',
      'name' => '333',
    ),
  ),
  3 => 
  array (
    0 => 
    array (
      'id' => '4',
      'name' => '444',
    ),
  ),
  4 => 
  array (
    0 => 
    array (
      'id' => '5',
      'name' => '555',
    ),
  ),
  5 => 
  array (
    0 => 
    array (
      'id' => '6',
      'name' => '666',
    ),
  ),
  6 => 
  array (
    0 => 
    array (
      'id' => '7',
      'name' => '777',
    ),
  ),
  7 => 
  array (
    0 => 
    array (
      'id' => '8',
      'name' => '888',
    ),
  ),
  8 => 
  array (
    0 => 
    array (
      'id' => '9',
      'name' => '999',
    ),
  ),
  9 => 
  array (
    0 => 
    array (
      'id' => '10',
      'name' => 'aaa',
    ),
  ),
  10 => 
  array (
    0 => 
    array (
      'id' => '11',
      'name' => 'bbb',
    ),
  ),
  11 => 
  array (
    0 => 
    array (
      'id' => '12',
      'name' => 'ccc',
    ),
  ),
  12 => 
  array (
    0 => 
    array (
      'id' => '13',
      'name' => 'ddd',
    ),
  ),
  13 => 
  array (
    0 => 
    array (
      'id' => '14',
      'name' => 'eee',
    ),
  ),
  14 => 
  array (
    0 => 
    array (
      'id' => '15',
      'name' => 'fff',
    ),
  ),
  15 => 
  array (
    0 => 
    array (
      'id' => '16',
      'name' => 'ggg',
    ),
  ),
  16 => 
  array (
    0 => 
    array (
      'id' => '17',
      'name' => 'vvv',
    ),
  ),
  17 => 
  array (
    0 => 
    array (
      'id' => '18',
      'name' => 'hhh',
    ),
  ),
  18 => 
  array (
    0 => 
    array (
      'id' => '19',
      'name' => 'nnn',
    ),
  ),
  19 => 
  array (
    0 => 
    array (
      'id' => '20',
      'name' => 'mmm',
    ),
  ),
)
从结果可以看出,第一条记录跟最后一条是相同的,再没有pool->submit之前$sqls数组中的对象都是正确的,submit之后第一个对象的数据就改变了,不知道是不是pthreads的BUG。
 
上述代码我们通过创建一个包含5个SqlWorker对象的pool,然后创建20个Sql对象加入到pool中。
 
当然我们的Sql类并不一定非要继承自Collectable类,我们也可自定义判断什么时候可回收。
<?php
//继承Threaded类,Threaded提供了隐式的线程安全机制
//这个对象中的所有操作都是线程安全的
class MyWork extends Threaded {
    private $name = '';
    private $do = false;
    private $data = '';

    public function __construct($name) {
        $this->name = $name;
    }

    public function run() {
        $this->data = "{$this->name} run... in thread [" . $this->worker->getName() . "] 
";
        //通过do来判断是否完成
        //如果为true,则让Pool::collect回收
        $this->do = true;
    }

    public function isDo() { 
        return  $this->do;
    }

    public function getData() {
        return $this->data;
    }
}

class MyWorker extends Worker {
    public static $name = 0;

    public function __construct() {
        self::$name++;
    }

    public function run() {
        
    }

    public function getName() {
        return self::$name;
    }
}

$pool = new Pool(5, 'MyWorker');

$works = array();
for($ix = 0; $ix < 20; ++$ix) {
    $work = new MyWork($ix);
    $works[] = $work;
}

foreach($works as $work) {
    $pool->submit($work);
}
$pool->shutdown();

foreach($works as $work) {
    echo $work->getData();
}

//回收已完成的对象
$pool->collect(function($work){
    //我们通过自定义函数isDo来判断对象是否执行完毕
    return $work->isDo();
});
执行结果如下:
19 run... in thread [5]
1 run... in thread [2]
2 run... in thread [3]
3 run... in thread [4]
4 run... in thread [5]
5 run... in thread [1]
6 run... in thread [2]
7 run... in thread [3]
8 run... in thread [4]
9 run... in thread [5]
10 run... in thread [1]
11 run... in thread [2]
12 run... in thread [3]
13 run... in thread [4]
14 run... in thread [5]
15 run... in thread [1]
16 run... in thread [2]
17 run... in thread [3]
18 run... in thread [4]
19 run... in thread [5]
第一条记录为什么会有问题,前面我已经说过了。这里我们看20个MyWork对象,它们顺序的加入到5个MyWorker对象中,如果第一条记录没有问题的话,它们分别加入到1,2,3,4,5的MyWorker中,然后远行run方法。
 
 
 
原文地址:https://www.cnblogs.com/jkko123/p/6294592.html