学习multiprocessing

1. multiprocessing.Pool

from multiprocessing.pool import Pool

def gen_row():
    ...return rows

def main(rows):
   i = 1
for row in rows: i += 1
     ...
print(i)
if __name__ == "__main__": rows = gen_row() with Pool(4) as p: p.map(main, (rows[:100000],rows[100000:200000],rows[200000:300000],rows[300000:]))

#会打印4个 i 出来,把 map()第二个参数给定长度为1的 iterable 参数,只打印一个 i 出来;
#如果是 Pool(2) ,传4个 rows 进去依然得到4个 i ;
#不使用 Pool 和使用 Pool 得到的结果居然不一样,没想明白,所以不能用在操作同一个对象上,p.map(f, [1,2,3,4]) 这样参数之间不存在关系就能保证结果一致。

下面这个官网的例子能体现出优势:

from multiprocessing import Pool
 
 def f(x):
     return x*x
 
 with Pool() as p:
     p.map(f, range(10000000))

再想一下之后,看下面:

import json
import MySQLdb
from multiprocessing.pool import Pool

def gen_row():
    db = MySQLdb.connect(host='192.168.1.205', user='root', passwd='123456', db='kaqu')
    c = db.cursor()
    c.execute("select params from t1")
    rows = c.fetchall()
    return rows

def main(row):
#    for row in rows:
        try:
            latitude = float(json.loads(row[0])['latitude'])
            longitude = float(json.loads(row[0])['longitude'])
            if not (latitude == 5e-324 or latitude == 0.0):
                print(latitude, longitude)
        except:
            pass

if __name__ == "__main__":
    rows = gen_row()
    with Pool(2) as p:
        p.map(main, rows)  #这里直接可以把rows拿过来
   

奇怪的是:

Pool(2)

time python test2.py >> all.log
#wc -l all.log
383696 all.log

Pool(4)

time python test2.py >> all2.log
#wc -l all2.log
384183 all2.log

Pool(5)

# wc -l all.log 
383881 all.log

Pool(10)

# wc -l all.log 
383966 all.log

每次结果都不一样!!!系统是VM 4核心虚拟机。其中 Pool(4) 是可靠的数据,因此请根据 cpu 核心数来操作!!

PS:导致以上结果差异的原因是没有等待线程结束,加上close()   join() 即可,见下面实例。

小结:

 使用官网的例子,Pool() 不用带第一个参数,会自动根据cpu数来进行。

实例:

import multiprocessing as mul
import time

def f(number):
    time.sleep(1)
    return number + 1


if __name__ == '__main__':
    sequence = list(range(4))
    p = mul.Pool()
    print(p.map(f, sequence))
    p.close()
    p.join()

 实例2:

from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Pool(4)
    for i in range(8):
        p.apply_async(long_time_task, args=(i,))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')

输出:

Parent process 11756.
Waiting for all subprocesses done...
Run task 0 (11048)...
Run task 1 (13032)...
Run task 2 (6736)...
Run task 3 (8884)...
Task 3 runs 0.50 seconds.
Run task 4 (8884)...
Task 2 runs 1.03 seconds.
Run task 5 (6736)...
Task 1 runs 1.19 seconds.
Run task 6 (13032)...
Task 0 runs 2.86 seconds.
Run task 7 (11048)...
Task 4 runs 2.69 seconds.
Task 6 runs 2.22 seconds.
Task 5 runs 2.76 seconds.
Task 7 runs 2.10 seconds.
All subprocesses done.

可以发现只创建了4个子进程,因为设定了 Pool(4) 。

PS:参考廖雪峰 Python

原文地址:https://www.cnblogs.com/bvac/p/5396599.html