52_并发编程-线程-线程池

一、新式创建进程、线程池
    
    from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
1 1. concurrent.futures    提供了高度封装的异步调用接口
2 2. ThreadPoolExecutor    线程池,提供异步调用
3 3. ProcessPoolExecutor   进程池,提供异步调用
名称解析
1 1. submit(fn, *args, **kwargs)    # 异步提交任务
2 2. map(func, *iterables, timeout=None, chunksize=1)    # 取代for循环submit的操作
3 3. shutdown(wait=True)    # 相当于进程池的pool.close() + pool.join()操作,wait=True,等待池内所有任务执行完毕回收资源后才继续wait=False,立即返回并不会等待池内的任务执行完毕
4                             但不管wait参数为何值,整个程序都会等到所有任务执行完毕submit和map必须在shutdown之前
5 4. result(timeout=None)   # 取结果
6 5. add_done_callback(fun) # 回调函数 
方法
 
二、实例
    
  1、基本定义
 1 import time
 2 import os
 3 import threading
 4 from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
 5 
 6 def func(n):
 7     time.sleep(2)
 8     print('%s打印的:'%(threading.get_ident()),n)
 9     return n*n
10 tpool = ThreadPoolExecutor(max_workers=5) #默认一般起线程的数据不超过CPU个数*5
11 # tpool = ProcessPoolExecutor(max_workers=5) #进程池的使用只需要将上面的ThreadPoolExecutor改为ProcessPoolExecutor就行了,其他都不用改
12 #异步执行
13 t_lst = []
14 for i in range(5):
15     t = tpool.submit(func,i) #提交执行函数,返回一个结果对象,i作为任务函数的参数 def submit(self, fn, *args, **kwargs):  可以传任意形式的参数
16     t_lst.append(t)  #
17     # print(t.result())
18     #这个返回的结果对象t,不能直接去拿结果,不然又变成串行了,可以理解为拿到一个号码,等所有线程的结果都出来之后,我们再去通过结果对象t获取结果
19 tpool.shutdown() #起到原来的close阻止新任务进来 + join的作用,等待所有的线程执行完毕
20 print('主线程')
21 for ti in t_lst:
22     print('>>>>',ti.result())
23 
24 # 我们还可以不用shutdown(),用下面这种方式
25 # while 1:
26 #     for n,ti in enumerate(t_lst):
27 #         print('>>>>', ti.result(),n)
28 #     time.sleep(2) #每个两秒去去一次结果,哪个有结果了,就可以取出哪一个,想表达的意思就是说不用等到所有的结果都出来再去取,可以轮询着去取结果,因为你的任务需要执行的时间很长,那么你需要等很久才能拿到结果,通过这样的方式可以将快速出来的结果先拿出来。如果有的结果对象里面还没有执行结果,那么你什么也取不到,这一点要注意,不是空的,是什么也取不到,那怎么判断我已经取出了哪一个的结果,可以通过枚举enumerate来搞,记录你是哪一个位置的结果对象的结果已经被取过了,取过的就不再取了
29 
30 #结果分析: 打印的结果是没有顺序的,因为到了func函数中的sleep的时候线程会切换,谁先打印就没准儿了,但是最后的我们通过结果对象取结果的时候拿到的是有序的,因为我们主线程进行for循环的时候,我们是按顺序将结果对象添加到列表中的。
31 # 37220打印的: 0
32 # 32292打印的: 4
33 # 33444打印的: 1
34 # 30068打印的: 2
35 # 29884打印的: 3
36 # 主线程
37 # >>>> 0
38 # >>>> 1
39 # >>>> 4
40 # >>>> 9
41 # >>>> 16
基本定义

  2、map方法

 1 from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
 2 import threading
 3 import os,time,random
 4 def task(n):
 5     print('%s is runing' %threading.get_ident())
 6     time.sleep(random.randint(1,3))
 7     return n**2
 8 
 9 if __name__ == '__main__':
10 
11     executor=ThreadPoolExecutor(max_workers=3)
12 
13     # for i in range(11):
14     #     future=executor.submit(task,i)
15 
16     s = executor.map(task,range(1,5)) #map取代了for+submit
17     print([i for i in s])
View Code

  3、回调函数

 1 from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
 2 from multiprocessing import Pool
 3 import requests
 4 import json
 5 import os
 6 
 7 def get_page(url):
 8     print('<进程%s> get %s' %(os.getpid(),url))
 9     respone=requests.get(url)
10     if respone.status_code == 200:
11         return {'url':url,'text':respone.text}
12 
13 def parse_page(res):
14     res=res.result()
15     print('<进程%s> parse %s' %(os.getpid(),res['url']))
16     parse_res='url:<%s> size:[%s]
' %(res['url'],len(res['text']))
17     with open('db.txt','a') as f:
18         f.write(parse_res)
19 
20 
21 if __name__ == '__main__':
22     urls=[
23         'https://www.baidu.com',
24         'https://www.python.org',
25         'https://www.openstack.org',
26         'https://help.github.com/',
27         'http://www.sina.com.cn/'
28     ]
29 
30     # p=Pool(3)
31     # for url in urls:
32     #     p.apply_async(get_page,args=(url,),callback=pasrse_page)
33     # p.close()
34     # p.join()
35 
36     p=ProcessPoolExecutor(3)
37     for url in urls:
38         p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果
View Code
原文地址:https://www.cnblogs.com/hq82/p/9875575.html