队列实现—multiprocessing.JoinableQueue
multiprocessing是python标准库中支持多进程并发的模块,我们这里采用multiprocessing中的数据结构:JoinableQueue,它本质上仍是一个FIFO的队列,它与一般队列(如queue中的Queue)的区别在于它是多进程安全的,这意味着我们不用担心它的互斥和死锁问题。JoinableQueue主要可以用来存放执行的任务和收集任务的执行结果。多进程安全的,它会自动处理“加锁”的过程,举例来看(以下皆省去导入包的过程):
def read(q): while True: try: value = q.get() print('Get %s from queue.' % value) time.sleep(random.random()) finally: q.task_done() def main(): q = multiprocessing.JoinableQueue() pw1 = multiprocessing.Process(target=read, args=(q,)) pw2 = multiprocessing.Process(target=read, args=(q,)) pw1.daemon = True pw2.daemon = True //子进程设置为守护进程——主进程结束后随之结束 pw1.start() pw2.start() //子进程就开始独立于父进程运行 for c in [chr(ord('A')+i) for i in range(26)]: q.put(c) try: q.join() //主进程一直等待全部的子进程结束之后,主进程自身才结束,程序退出 except KeyboardInterrupt: print("stopped by hand") if __name__ == '__main__': main()
进程池实现——使用concurrent.futures.ProcessPoolExecutor
def read(q): print('Get %s from queue.' % q) time.sleep(random.random()) def main(): futures = set() with concurrent.futures.ProcessPoolExecutor() as executor: for q in (chr(ord('A')+i) for i in range(26)): future = executor.submit(read, q) futures.add(future) try: for future in concurrent.futures.as_completed(futures):
//这是等待所有子进程都执行完毕。子进程执行过程中可能抛出异常,err = future.exception()
可以收集这些异常,便于后期处理 err = future.exception() if err is not None: raise err except KeyboardInterrupt: print("stopped by hand") if __name__ == '__main__': main()
这里我们采用concurrent.futures.ProcessPoolExecutor对象,可以把它想象成一个进程池,子进程往里“填”。我们通过submit方法实例一个Future对象,然后把这里Future对象都填到池——futures里,这里futures是一个set对象。只要进程池里有future,就会开始执行任务。这里的read函数更为简单——只是把一个字符打印并休眠一会而已。
个人看法,线程本身就是一个有些复杂甚至可以说有些丑陋的解决方案,95%的情况下其实都可以不用——KISS。
如果要用并发处理,基本上使用 非阻塞多路复用+多进程 方式,就可以处理绝大多数需求了
线程安全
http://python.jobbole.com/87743/