Python之多进程篇

Process

创建子进程执行指定的函数

>>> from multiprocessing import Process,current_process
>>> 
>>> def test(*args,**kwargs):
...     p = current_process()
...     print p.name,p.pid
...     print args
...     print kwargs
... 
>>> 
>>> p = Process(target=test,args=(1,2),kwargs={"a":"hello"},name="TEST")

  >>> p.start();p.join();
  TEST 24796
  (1, 2)
  {'a': 'hello'}

帮助文档:

    class Process(__builtin__.object)
     |  Process objects represent activity that is run in a separate process
     |  
     |  The class is analagous to `threading.Thread`
     |  
     |  Methods defined here:
     |  
     |  __init__(self, group=None, target=None, name=None, args=(), kwargs={})
     |  
     |  __repr__(self)
     |  
     |  is_alive(self)
     |      Return whether process is alive
     |  
     |  join(self, timeout=None)
     |      Wait until child process terminates
     |  
     |  run(self)
     |      Method to be run in sub-process; can be overridden in sub-class
     |  
     |  start(self)
     |      Start child process
     |  
     |  terminate(self)
     |      Terminate process; sends SIGTERM signal or uses TerminateProcess()
方法start()创建子进程,然后再新进程中通过run()执行目标函数。构建参数args、kwargs会传递给目标函数。在父进程中用join()等待并获取子进程退出状态,否则会留下僵尸进程,除非父进程先终止。
 
可以看到__init__()在父进程执行,但run()已经是子进程了。
 
[root@typhoeus79 20131104]# more myprocess.py 
#!/usr/bin/env python26
#-*- coding:utf-8 -*-
import os
from multiprocessing import Process,current_process

class MyProcess(Process):
    def __init__(self):
        print "init:",os.getpid()//还是父进程
        super(MyProcess,self).__init__()

    def run(self):
        print "run:",os.getpid()//子进程


if  __name__ == '__main__':
    print "parent:",os.getpid()
    p = MyProcess()
    p.start()
    p.join()

[root@typhoeus79 20131104]# ./myprocess.py 
parent: 17213
init: 17213
run: 17216

 子进程不会调用退出函数,而且只有后台(daemon)进程才可捕获主进程退出信号,默认处理自然是终止子进程。另外,后台进程不能创建新的子进程,这将导致僵尸出现。

[root@typhoeus79 20131104]# more myprocess2.py 
#!/usr/bin/env python26
#-*- coding:utf-8 -*-

import os
from time import sleep
from signal import signal,SIGTERM
from multiprocessing import Process

def test():
        def handler(signum,frame):
            print "chid exit.",os.getpid()
            exit(0)

        signal(SIGTERM,handler)
        print "child start:",os.getpid()

        while True:
            print "sleeping..."
            sleep(1)

if __name__ == "__main__":
    p = Process(target = test)
    p.daemon = True  //必须明确指定,说明该子进程是个后台进程,且必须在start()前设置,否则子进程会一直打印sleeping...
    p.start()

    sleep(2)//给点时间让子进程进入"状态"
    print "parent exit."

[root@typhoeus79 20131104]# ./myprocess2.py    
child start: 22402
sleeping...
sleeping...
parent exit.
chid exit. 22402

调用terminate()会立即强制终止子进程(不会执行任何清理操作)。有关状态还有:is_alive()、pid、exitcode

Pool

进程池。用多个可重复使用的后台daemon进程执行函数,默认数量和CPU核相等。

[root@typhoeus79 20131104]# more process_pool.py 
#!/usr/bin/env python26
#-*- coding:utf-8 -*-
from multiprocessing import Pool

def test(*args,**kwargs):
        print args
        print kwargs
        return 123

if __name__ == "__main__":
    pool = Pool()
    print pool.apply(test,range(3),dict(a=1,b=2))

    pool.terminate()
    pool.join()
[root@typhoeus79 20131104]# ./process_pool.py    
(0, 1, 2)
{'a': 1, 'b': 2}
123

调用join()等待所有工作进程结束前,必须确保用close()或terminate()关闭进程池。close()阻止提交新任务,通知工作进程在完成全部任务后结束。该方法立即返回,不会阻塞等待。

使用异步模型时,callback是可选的。

[root@typhoeus79 20131104]# more callback.py 
#!/usr/bin/env python26
#-*- coding:utf8 -*-

from multiprocessing import Pool
from time import sleep

def test(*args,**kwargs):
    print "in testing"
    print "sleeping..."
    sleep(2)

    print "test returning..."
    return 123

def callback(ret):

    print "callbacking..."
    sleep(2)
    print "return:",ret

if __name__ == "__main__":
    pool = Pool()
    pool.apply_async(test,callback=callback)

    print "pooling..."
    print
    print

    ar = pool.apply_async(test)//apply_async返回AsyncResult实例

    print
    print ar.get() //get([timeout])、wait()、successful()等方法可获知任务执行状态和结果

    pool.close()
    pool.join()
[root@typhoeus79 20131104]# ./callback.py    
pooling...


in testing
sleeping...

in testing
sleeping...
test returning...
test returning...
callbacking...
return: 123
123

get()第一次没有获取到,后第二次获取。

map()和imap()用于批量执行,分别返回列表和迭代器结果。

[root@typhoeus79 20131104]# more process_map.py 
#!/usr/bin/env python26
#-*- coding:utf-8 -*-

from multiprocessing import Pool,current_process

def test(x):
    print current_process().pid, x//获取当前进程的pid,是current_process()的属性
    return x + 100

def test2(s):
    print current_process().pid, s

if __name__ == "__main__":
    p = Pool(3)

    print p.map(test,xrange(5))
    p.map(test2,"abc")
[root@typhoeus79 20131104]# ./process_map.py    
5402 0
5403 1
5402 3
5402 4
5404 2
[100, 101, 102, 103, 104]
5402 a
5402 b
5402 c

从上面可以看到只有三个进程号

参数chunksize指定数据分块大小,如果待处理数据量很大,建议调高该参数。

if __name__ == "__main__":
    p = Pool(5)

    print p.map(test,xrange(10),chunksize=2)
    p.map(test2,"abc")

输出结果:

6796 0
6796 1
6797 2
6797 3
6798 4
6798 5
6797 8
6799 6
6797 9
6799 7
[100, 101, 102, 103, 104, 105, 106, 107, 108, 109]
6796 b
6800 a
6798 c

 Queue

Queue是最常用的数据交换方法。参数maxsize限制队列中的数据项数量,这会影响get/put等阻塞操作。默认值无限制。

通常直接使用JoinableQueue,其内部使用Semaphore进行协调。在执行put()、task_done()时调整信号量计数器。当task_done()发现计数值等于0,立即通知join()解决阻塞。

[root@typhoeus79 20131104]# more test_queue.py 
#!/usr/bin/env python26
#-*- coding:utf-8 -*-

from Queue import Empty
from multiprocessing import Process,current_process,JoinableQueue

def test(q):
    pid = current_process().pid

    while True:
        try:
            d = q.get(timeout=2) #阻塞+超时。照顾生产着以及生产情形

            print pid,d
            q.task_done()
        except Empty:
            print pid,"empty!"
            break

if __name__ == "__main__":
    q = JoinableQueue(maxsize=1000)

    map(q.put,range(5)) #未超出队列容量限制,不会阻塞
    print "put over!"

    for i in range(3): #创建多个consumer
        Process(target=test,args=(q,)).start()

    q.join() #等待任务完成
    print "task done"
[root@typhoeus79 20131104]# ./test_queue.py 
put over!
16768 0
16768 1
16768 2
16768 3
16768 4
task done
16770 empty!
16769 empty!
16768 empty!
原文地址:https://www.cnblogs.com/gsblog/p/3406924.html