异步进程 multiprocessing 模板

 1 import os
 2 from multiprocessing import Pool, Manager
 3 
 4 
 5 def handle_task(arg1, arg2):
 6     lock.acquire()
 7     # get from queuing_task_lst
 8     # enqueue running_task_dt
 9     proc_id = os.getpid()
10     lock.release()
11 
12 
13 def async_deal_task_queue():
14     # query
15     queuing_task_lst.append(123)
16 
17 
18 def init(mgr_lock):
19     global lock
20     lock = mgr_lock
21 
22 
23 if __name__ == '__main__':
24     manager = Manager()
25     lock = manager.Lock()
26     queuing_task_lst = manager.list()
27     running_task_dt = manager.dict()
28 
29     proc_pool = Pool(processes=10, initializer=init, initargs=(lock,))
30     for i in range(0, 10):
31         try:
32             proc_pool.apply_async(handle_task, args=(queuing_task_lst, running_task_dt))
33         except KeyboardInterrupt as err:
34             proc_pool.terminate()
35         except Exception as e:
36             proc_pool.terminate()
37     async_deal_task_queue()
38     try:
39         proc_pool.close()
40         proc_pool.join()
41     except Exception:
42         pass
43     finally:
44         pass

> 说明:

上述代码中,handle_task进程和async_deal_task_queue进程异步执行
async_deal_task_queue作为生产者,往全局队列 queuing_task_lst 放入生产的产品
handle_task 从 queuing_task_lst 中拿到产品进行消费
消费方式是一次性将生产的产品存多个,然后再一个一个进行处理
处理过程中使用了全局锁,通过 Pool的参数 initializer 及 initargs 来实现

> 细节技术:

manager.dict() 不同于一般意义上python 的 dict, 他的取法有如下特殊点:
取key只能用 run_task_dt.keys()
获取value用 tid_lst = run_task_dt[key]
赋值能用 run_task_dt[queue_name] = value, 可变对象记得deepcopy
manager要使用 multiprocessing下的manager

还可以参考: https://thief.one/2016/11/24/Multiprocessing%E5%AD%90%E8%BF%9B%E7%A8%8B%E8%BF%94%E5%9B%9E%E5%80%BC/

原文地址:https://www.cnblogs.com/zhanghaibin16/p/13322056.html