python笔记-8(线程,进程,协程,io多路复用)

线程进程概要:
 1、最小工作单元为线程
2、一个应用程序至少有一个进程,一个进程里至少有一个线程
3、应用场景:
IO密集型适合用线程
计算密集型适合用进程
4、python中GIL全局解释器锁,保证同一个进程中只能有一个线程同时被调用

进程、线程:
  1、进程内存独立,线程共享同一进程的资源
  2、进程是资源的集合,线程是执行单位
  3、进程之间不能直接互相访问,同一进程内的线程可以互相通信
  4、创建新进程会消耗资源,线程非常轻量,只保存线程运行时的必要数据,如上下文,程序堆栈
  5、同一进程里的线程可以互相控制,不同进程之间不能相互控制,但是父进程可以控制子进程


线程
线程模块:threading

创建一个线程:t = threading.Thread(target=,args=[])
target:函数或类名
args:必须是可迭代的,如列表、元组
运行该线程: t.start()表示已经准备好等待cpu调度

1、线程的基本使用:
 1 import threading,time
 2     
 3 def task(args):
 4     time.sleep(1)
 5     print(args)
 6 
 7 for i in range(20):
 8     t = threading.Thread(target=task,args=[i,])
 9     t.start()
10 
11 print("end")   #主线程
1、线程的基本使用:

2、线程的调用:

 1 import threading,time
 2 def task(args):
 3     time.sleep(args)
 4     print(args)
 5 
 6 for i in range(20):
 7     t = threading.Thread(target=task,args=[i,])
 8     # t.setDaemon(True) #主线程不等待子线程结束
 9     # t.setDaemon(False)#默认值,主线程等子线程全部执行完再结束
10                                    #setDaemon()只能设置在start上方
11     t.start()
12     #t.join() #一个线程一个线程的执行,并且主线程等待所有子线程执行完再执行
13     t.join(2) #表示主线程只等2秒,如果2秒内子线程没有执行,则不再等待,又称等待的最大时间
14 
15 print("end")    #主线程
2、线程的调用

3、线程锁:

import threading,time
 1 v = 10
 2 lock = threading.Lock()#
 3 # lock = threading.RLock()#递归锁,又称多重锁,同时可以释放多把锁
 4 
 5 def task(args):
 6     time.sleep(2)
 7     lock.acquire()#调用锁(上锁)
 8     #lock.acquire()
 9     global v
10     v -= 1
11     print(v)
12     lock.release()#释放锁(解锁)
13     #lock.release()#只有使用RLock的时候才能开多把
14 
15 for i in range(10):
16     t = threading.Thread(target=task,args=(i,))
17     t.start()
3.1、只能通过一个线程:

 1 v = 10
 2 lock = threading.BoundedSemaphore(3) #可以通过的线程数量
 3 
 4 def task(args):
 5     lock.acquire()#调用锁(上锁)
 6     time.sleep(2)
 7     global v
 8     v -= 1
 9     print(v)
10     lock.release()#释放锁(解锁
11 
12 for i in range(10):
13     t = threading.Thread(target=task,args=(i,))
14     t.start()
3.2、可以通过多个线程:
 1 lock = threading.Event()
 2 
 3 def task(args):
 4     time.sleep(1)
 5     lock.wait()#锁住所有的线程,等待统一释放
 6     print(args)
 7 
 8 for i in range(10):
 9     t = threading.Thread(target=task,args=(i,))
10     t.start()
11 
12 while True:
13     value = input(">>>>:")
14     if value == "1":
15         lock.set()#释放所有的线程
16         #lock.clear() #默认值,等待
3.3、事件锁,锁住所有的线程,同时释放所有的线程
 1 lock = threading.Condition()
 2 
 3 def task(args):
 4     time.sleep(1)
 5     lock.acquire()
 6     lock.wait() #锁住所有的线程
 7     print(args)
 8     lock.release()
 9 
10 for i in range(10):
11     t = threading.Thread(target=task,args=(i,))
12     t.start()
13 
14 while True:
15     value = input(">>>>:")
16     lock.acquire()
17     lock.notify(int(value)) #通知lock.wait可以同时释放多少线程
18     lock.release()
3.4、用户指定锁住和释放的线程:

4、线程池:

  线程池模块:from concurrent.futures import ThreadPoolExecutor

from concurrent.futures import ThreadPoolExecutor
import threading,time
1 def task(args):
2     time.sleep(1)
3     print(args)
4 
5 pool = ThreadPoolExecutor(5) #创建连接池,并指定连接池的连接数量
6 
7 for i in range(30):
8     pool.submit(task,i) #到连接池中获取连接,并将数据交给task函数进行处理
4.1、线程池基本用法
 1 import requests  #如果没有此模块则使用:pip3 install requests进行安装
 2 
 3 def task(url):
 4     response = requests.get(url)
 5     print("结果:",url,len(response.content)) #拿到结果后,可以根据自己的需求进行处理
 6 
 7 pool = ThreadPoolExecutor(2)
 8 
 9 url_list = [
10     "http://www.baidu.com",
11     "http://www.open.com.cn",
12     "http://www.sina.com",
13 ]
14 
15 for url in url_list:
16     pool.submit(task,url)
4.2、直接执行:
 1 def download(url):
 2     response = requests.get(url)
 3     return response #此处的response包含了下载的所有内容
 4 
 5 pool = ThreadPoolExecutor(2)
 6 
 7 url_list = [
 8     "http://www.baidu.com",
 9     "http://www.open.com.cn",
10     "http://www.sina.com",
11 ]
12 
13 for url in url_list:
14     future = pool.submit(download,url) #此处只表示下载完成,并未执行其他操作(也可以理解为这里的结果是由download函数处理得到的)
15     future.add_done_callback(test)#当download函数返回response时,future就等于response,同时future也包含了response中所有的内容
16                                   # future.add_done_callback(test)固定写法,括号中是用户想将结果交给那个函数处理的函数名
17 
18 def test(response):
19     download_response = response.result()#通过执行response.result()就相当于得到了download函数中返回值response(result()是固定写法)
20     print("结果:",download_response.url,download_response.status_code)#此处的download_response.url就等于download函数中的response.url
4.3、分步执行,又称回调函数:

将以上过程写成两个文件(即将download和循环操作写成一个模块),在另一个文件中来引用并调用

 1 from concurrent.futures import ThreadPoolExecutor
 2 import requests
 3 
 4 def down(url):
 5     response = requests.get(url)
 6     return response
 7 
 8 def run(url_list):
 9     pool = ThreadPoolExecutor(2)
10     for item in url_list:
11         url = item["url"]
12         call = item["call"]
13         futurt = pool.submit(down,url)
14         futurt.add_done_callback(call)
download模块文件:
模块名为syan
 1 import syan
 2 
 3 def f1(f1_data):
 4     response = f1_data.result()#拿到syan文件中处理的结果
 5     print(response)
 6 
 7 def f2(f2_data):
 8     response = f1_data.result()#拿到syan文件中处理的结果
 9     print(response)
10 
11 def f3(f3_data):
12     response = f1_data.result()#拿到syan文件中处理的结果
13     print(response)
14 
15 url_list = [
16     {"url":"http://www.baidu.com","call":f1},
17     {"url":"http://www.open.com.cn","call":f2},
18     {"url":"http://www.sina.com","call":f3},
19 ]
20 
21 syan.run(url_list)
另一个文件,即用户文件

4.4、线程池整理:

  共分为两种模式:(线程池是创建线程之后来处理批量任务)

  一、将所有处理的任务写在一个函数中

  二、将处理的任务分为两个函数来处理

武sir整理:
 1 def task(url):
 2     """
 3     任务执行两个操作:下载;保存本地
 4     """
 5     # response中封装了Http请求响应的所有数据
 6     # - response.url            请求的URL
 7     # - response.status_code    响应状态码
 8     # - response.text           响应内容(字符串格式)
 9     # - response.content        响应内容(字节格式)
10     # 下载
11     response = requests.get(url)
12     
13     # 下载内容保存至本地
14     f = open('a.log','wb')
15     f.write(response.content)
16     f.close()
17 
18     
19 pool = ThreadPoolExecutor(2)
20 url_list = [
21     'http://www.oldboyedu.com',
22     'http://www.autohome.com.cn',
23     'http://www.baidu.com',
24 ]
25  
26    
27 for url in url_list:
28     print('开始请求',url)
29     # 去连接池中获取链接
30     pool.submit(task,url)
模式一:直接处理

 1 def save(future):
 2     """
 3     只做保存  # future中包含response
 4     """
 5     response = future.result()
 6     
 7     # 下载内容保存至本地
 8     f = open('a.log','wb')
 9     f.write(response.content)
10     f.close()
11 
12 def task(url):
13     """
14     只做下载 requests
15     """
16     # response中封装了Http请求响应的所有数据
17     # - response.url            请求的URL
18     # - response.status_code    响应状态码
19     # - response.text           响应内容(字符串格式)
20     # - response.content        响应内容(字节格式)
21     # 下载
22     response = requests.get(url)
23     return response
24 
25 pool = ThreadPoolExecutor(2)
26 
27 url_list = [
28     'http://www.oldboyedu.com',
29     'http://www.autohome.com.cn',
30     'http://www.baidu.com',
31 ]
32 
33 for url in url_list:
34     print('开始请求',url)
35     # 去连接池中获取链接
36     # future中包含response
37     future = pool.submit(task,url)
38     # 下载成功后,自动调用save方法
39     future.add_done_callback(save)
模式二:分步处理
 

进程: 

进程模块:from multiprocessing import Process
进程的创建与使用与线程完全一致
区别之处:
将线程中的threading.Thread改为Process
线程中的setDaemon(True)在进程中为daemon = True or False
 

进程之间的数据共享:

验证进程之间数据不共享:

 1 from multiprocessing import Process
 2 def task(num,li):
 3     li.append(num)
 4     print(li)
 5 
 6 v = []
 7 
 8 for i in range(10):
 9     p = Process(target=task,args=(i,v,))
10     p.start()
11 
12 
13 结果:
14 [0]
15 [1]
16 [2]
17 [3]
18 [4]
19 [5]
20 [6]
21 [7]
22 [8]
23 [9]
进程:
 1 线程:
 2 from threading import Thread
 3 def task(num,li):
 4     li.append(num)
 5     print(li)
 6 
 7 v = []
 8 
 9 for i in range(10):
10     t = Thread(target=task,args=(i,v,))
11     t.start()
12 
13 
14 结果:
15 [0]
16 [0, 1]
17 [0, 1, 2]
18 [0, 1, 2, 3]
19 [0, 1, 2, 3, 4]
20 [0, 1, 2, 3, 4, 5]
21 [0, 1, 2, 3, 4, 5, 6]
22 [0, 1, 2, 3, 4, 5, 6, 7]
23 [0, 1, 2, 3, 4, 5, 6, 7, 8]
24 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
线程:

实现进程之间数据的共享:

第一种方式:
需要的模块(Array):from multiprocessing import Process,Array
Array是c语言级别的共享,有一定的限制
Array("类型",长度)
 1 from multiprocessing import Process,Array
 2 
 3 def task(num,li):
 4     li[num] = 1  #当num是1的时候,v的列表里就会有一个值变为1,当num是2到时候,v的列表里就会有两个值变为1
 5     print(list(li))
 6 
 7 
 8 v = Array("i",10) #此处v的值为[0,0,0,0,0,0,0,0,0,0]
 9                   #Array("i",10)表示此列表的长度为10,并且数据类型为整数型
10 
11 
12 for i in range(10):
13     p = Process(target=task,args=(i,v,))
14     p.start()
15 
16 
17 结果:
18 [1, 0, 0, 0, 0, 0, 0, 0, 0, 0]
19 [1, 1, 0, 0, 0, 0, 0, 0, 0, 0]
20 [1, 1, 1, 0, 0, 0, 0, 0, 0, 0]
21 [1, 1, 1, 1, 0, 0, 0, 0, 0, 0]
22 [1, 1, 1, 1, 1, 0, 0, 0, 0, 0]
23 [1, 1, 1, 1, 1, 1, 0, 0, 0, 0]
24 [1, 1, 1, 1, 1, 1, 1, 0, 0, 0]
25 [1, 1, 1, 1, 1, 1, 1, 1, 0, 0]
26 [1, 1, 1, 1, 1, 1, 1, 1, 1, 0]
27 [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
Array方式
第二种方式:(通过socket实现通信)
需要的模块(Manager):from multiprocessing import Process,Manager
Manager().dict() / Manager().list()
 1 from multiprocessing import Process,Manager
 2 
 3 def task(num,li):
 4     li.append(num)
 5     print(li)
 6 
 7 
 8 # obj = Manager()#表示创建两socket对象
 9 # dic = obj.dict() #或者可以写成:Manager().dict();特殊的字典
10 v = Manager().list() #特殊的列表
11 
12 
13 for i in range(10):
14     p = Process(target=task,args=(i,v,))
15     p.start()
16 
17 
18 结果:会有报错,是因为连接没有关闭
19 [0]
20 [0, 1, 2]
21 [0, 1, 2]
22 [0, 1, 2, 3]
23 [0, 1, 2, 3, 4, 5]
24 [0, 1, 2, 3, 4, 5]
25 [0, 1, 2, 3, 4, 5, 6]
26 [0, 1, 2, 3, 4, 5, 6, 7]
27 [0, 1, 2, 3, 4, 5, 6, 7, 8]
Manager方式
进程池
  模块:from concurrent.futures import ProcessPoolExecutor
  跟线程池的区别就是将线程池中的ThreadPoolExecutor改为ProcessPoolExecutor即可





原文地址:https://www.cnblogs.com/super-sos/p/6588480.html