一、并发编程理论基础
- 并发编程得应用:
- 网络应用:爬虫(直接应用并发编程)
- 网络架构django、flask、tornado 源码-并发编程
- socket server 源码-并发编程
- 计算机操作系统发展史
- 手工操作-读穿孔的纸带、用户独占全机、cpu等待手工操作、cpu利用不充分
- 批处理-磁带存储(联机批处理{一边写入磁带,一边从磁带读出计算}、脱机批处理、只能运行一个程序,遇到IO就等待闲置
- 多道程序系统
- 同时执行多个任务,遇到io就切换,
- 即空间隔离(多个程序运行),时空复用(IO切换)的特点
- 分时系统
- 同时执行多个任务,没有遇到IO也切换,固定时间片到了就切
- 时间片轮转
- 多路性、交互性、独立性、及时性
- 切会浪费cpu时间,降低了cpu效率,但提高了用户体验。
- 实时系统
- 及时响应
- 高可靠性
- 时刻等待响应,即时处理
- 通用操作系统:
- 多道批处理
- 分时
- 实时
- 操作系统分类:
- 个人计算机操作系统
- 网络操作系统
- 分布式操作系统
- 操作系统的作用:
- 是一个协调、管理、和控制计算机硬件资源和软件资源的控制程序。【调度硬件资源、调用管理软件】
- 隐藏了丑陋的硬件调用接口,提供良好的抽象接口
- 管理、调度进程,并将多个进程对硬件的竞争变得有序
- I/O操作:(相对内存来说得)
- 输入【读到内存中】: input、f.read、accept、recv、connect
- 输出【从内存读出】:print、f.write、send、connect
- 文件操作/网络操作都是IO操作
- 异步:两个任务同时运行(并行)
- 同步:多个任务 串行 【按顺序执行】
- 阻塞:等待 input、accept、recv
- 非阻塞:不等待,直接执行
- 并行:多个任务同时执行【多个CPU在同时执行任务】
- 并发:只有一个CPU,交替执行多个任务【宏观上同时执行,实际是轮转】
- https://www.bughui.com/2017/08/23/difference-between-concurrency-and-parallelism/
- 程序:没有运行
- 进程:运行中得程序、计算机中最小得资源分配单元
- 进程调度:
- 多个进程(运行中得程序)在操作系统得控制下被CPU执行,去享用计算机资源
- 先来先服务调度算法
- 短作业优先
- 时间片轮转
- 多级反馈队列
- 进程调度得过程是不能随意被程序影响得
- 进程三大状态
- 就绪 程序开始运行,进入就绪队列,等待cpu分配时间
- 运行 执行进程,单个时间片内运行完成,就释放资源,没有左右完,就又自动切换到就绪队列等待下次得调度
- 阻塞 执行中得进程遇到事件入IO等导致无法执行,进入阻塞状态,解除后进入就绪队列等待进程调度
- 进程与父进程
- 进程 PID 通过os.getpid() 可以得到
- 父进程 PPID 负责回收一些子进程得资源后才关闭 通过os.getppid()
二、python中的并发编程-进程
- 包:multiprocessing
- 是python中操作、管理进程
- 创建进程、进程同步、进程池、进程之间数据共享
- python中的进程创建
- 对Windows来说创建子进程,是将主进程内存空间中的所有变量import子进程内存空间中,(import就会执行文件代码)
- windows平台创建子进程必须放在 if __name__ == '__main__' 下执行(不然就会循环创建子进程导致系统崩溃)
- 对Linux来说创建子进程,就是将主进程内存空间中所有复制过去,复制不会执行内存空间中的代码
- Linux平台就无所谓了,反正只是复制
-
创建进程:异步运行
- multiprocess.Process模块
- Process() 由该类实例化得到得对象,表示一个子进程中得任务(尚未启动)
- p = Process(group,target= func,agrs=('name',),kwargs={},name=)
- group 值默认始终为None
- target 表示调用对象
- args 调用对象得位置参数 元组!!!
- kwargs 调用对象得字典
- name为子进程得名称
- 注:必须用关键字方式指定参数、位置参数必须是元组格式,那怕是一个参数也要写成(n,)
- 方法:
- p.start() : 启动进程,调用操作系统的命令,传达要创建进程的申请,并调用子进程中的p.run()方法
- p.run(): 进程启动时运行的方法,其是真正去调用target指定的函数,自定义类,就需要自己定义run方法
- p.terminate(): 强制终止进程p
- p.is_alive(): 查看进程状态值,True表示运行中
- p.join(): 主线程等待子进程p运行结束后才运行 阻塞
- 实操:
-
复制代码 1 import os 2 import time 3 from multiprocessing import Process 4 5 6 def f(name): 7 print('in f', os.getpid(), os.getppid()) 8 print('i am is son process', name) 9 10 11 if __name__ == '__main__': 12 p = Process(target=f, args=('小青',)) 13 p.start() # start不是运行一个程序,而是调用操作系统的命令,要创建子进程 14 15 print('我是主程序……') 》》》结果: 我是主程序…… in f 6016 12312 i am is son process 小青 结论:在另一个地方开辟内存空间,执行f函数,主进程不会阻塞等待
- 给子进程传参:Process(target= func ,args = (参1,))必须是元组结构!!!
-
1 def f(args): 2 print('in func 2', args, os.getpid(), os.getppid()) 3 4 5 if __name__ == '__main__': 6 print('我在主进程中……') 7 p1 = Process(target=f, args=(666,)) 8 p1.start() 9 p2 = Process(target=f, args=(777,)) 10 p2.start() 11 print('我会在子进程前运行……,因为我和他们是隔离的,不会等他们') 12 13 ''' 14 结果: 15 我在主进程中…… 16 我会在子进程前运行……,因为我和他们是隔离的,不会等他们 17 in func 2 666 8144 7660 18 in func 2 777 10820 7660 19 '''
- 创建多个子进程:start()方法不是执行方法,而是创建子进程函数,真正执行任务函数是run()方法
-
1 def fu(num): 2 print("我是进程 :%d " % num) 3 4 5 if __name__ == '__main__': 6 print('in main', os.getpid(), os.getppid()) 7 for i in range(10): 8 Process(target=fu, args=(i,)).start() 9 print('main 66666') 10 """ 11 结果: 12 in main 11984 2064 13 main 66666 14 我是进程 :1 15 我是进程 :0 16 我是进程 :8 17 我是进程 :7 18 我是进程 :4 19 我是进程 :2 20 我是进程 :9 21 我是进程 :3 22 我是进程 :6 23 我是进程 :5 24 p.start()并没有立即执行,而是进入就绪队列,等带cpu调度,所以不是有序的 25 """
- join方法:阻塞、等待子进程执行完毕后再执行后面的代码
-
1 def g(args): 2 print("in g", args) 3 4 5 if __name__ == '__main__': 6 print('in main') 7 p = Process(target=g, args=(888,)) 8 p.start() 9 p.join() 10 print('i am in main process') 11 ''' 12 in main 13 in g 888 14 i am in main process 15 结:join总是等子进程执行完毕后再执行接下来的代码 16 '''
- 多进程中运用join方法:必须保证所有子进程结束,而操作系统创建子进程顺序是不定的,故需遍历每个子进程,每个都进行join一下
-
1 def pro(i): 2 print('in func', i, os.getpid(), os.getppid()) 3 4 5 if __name__ == '__main__': 6 print('in main', os.getpid(),os.getppid()) 7 p_list = [] 8 for i in range(10): 9 p = Process(target=pro, args=(i,)) 10 p.start() # 不是运行一个程序,而是调用操作系统命令,要创建子进程,等待操作系统作业,非阻塞 11 p_list.append(p) 12 print(p_list) 13 for p in p_list: # 遍历每个子进程,每个join一下,如果该子进程已经接收,join失效相当于pass,遍历完成就能保证每个子进程都结束了 14 p.join() # 阻塞,直到p这个子进程执行完毕之后再继续执行 15 print('主进程……') 16 ''' 17 in main 1480 2064 18 [<Process(Process-1, started)>, <Process(Process-2, started)>, <Process(Process-3, started)>, <Process(Process-4, started)>, <Process(Process-5, started)>, <Process(Process-6, started)>, <Process(Process-7, started)>, <Process(Process-8, started)>, <Process(Process-9, started)>, <Process(Process-10, started)>] 19 in func 3 6108 1480 20 in func 7 13756 1480 21 in func 5 12548 1480 22 in func 8 12116 1480 23 in func 4 10948 1480 24 in func 6 11744 1480 25 in func 9 11244 1480 26 in func 1 3968 1480 27 in func 2 9412 1480 28 in func 0 14024 1480 29 主进程…… 30 '''
- p.is_alive() 和 p.terminate() :查看子进程生命状态及强制结束子进程(terminate是非阻塞,并不会等待子进程彻底结束才执行后面的代码,只是发一信息要终结,就不管了)
-
1 # p.is_alive方法:查看子进程生存状态 2 # p.terminate() 强制结束子进程--非阻塞 3 def gro(i): 4 time.sleep(1) 5 print('in func', i, os.getpid(), os.getppid()) 6 7 8 if __name__ == '__main__': 9 print("in main") 10 p1 = Process(target=gro, args=(1,)) 11 p1.start() 12 # time.sleep(2) # 如果等待一会儿,就会执行函数,如果不等,就不管操作系统去建子进程,而直接执行后面的代码,所以可能比创建子进程前就执行了 13 print(p1.is_alive()) # 检测子进程是否还在执行任务 14 p1.terminate() # 强制结束子进程,非阻塞,不会等待状态改变,会马上执行后面代码 15 print(p1.is_alive()) 16 print('主进程的代码执行结束了……') 17 ''' 18 in main 19 True 20 True 21 主进程的代码执行结束了…… 22 23 结:因为直接执行,主进程执行快些,子进程函数不会执行 24 '''
- 通过面向对象的方法创建子进程(重点:重写run方法,继承Process类,继承父类__init__方法)
-
1 class Myprocess(Process): 2 def __init__(self, name): 3 super().__init__(self) # 需继承父类的init方法 4 self.name = name # 添加需要自己的属性 5 6 def run(self): 7 print(self.name) # 只有重写run方法才能将参数传入 8 print(os.getppid(), os.getpid()) 9 10 11 if __name__ == '__main__': 12 p = Myprocess('小强') 13 p.start()
- 进程与进程之间内存中的数据是隔离的!!!
- 进程与进程之间是不能自由的交换内存数据的【内存空间是不能共享的】
- 全局的变量在子进程中修改,其他进程是感知不到的【子进程的执行结果父进程是获取不到的】
- 进程与进程之间想要同行,必须借用其他手段,且两个进程都是自愿的【父子进程通信是通过socket】
-
1 from multiprocessing import Process 2 3 n = 100 4 5 6 def func(): 7 global n 8 n = n - 1 9 return 111 10 11 12 if __name__ == '__main__': 13 n_l = [] 14 for i in range(100): 15 p = Process(target=func) 16 p.start() 17 n_l.append(p) 18 for p in n_l: p.join() 19 print(n) 20 21 结果为:100 22 23 总结:说明子进程无法改变主进程的全局变量,本质是无法自由通信,但子进程中的n肯定减少了,只是没法拿出来
- 守护进程 p.daemon = True
- 特点:守护进程的生命周期只和主进程的代码有关系,和其他子进程没有关系,会随着主进程结束而结束
- 作用:报活,监控主进程生命状态
- 主进程创建守护进程
- 守护进程会在主进程代码结束后就终止
- 守护进程内无法再开子进程,否咋抛出异常 AssertionError: daemonic processes are not allowed to have children
- 守护进程的属性,默认是False,如果设置程True,就表示设置这个子进程为一个守护进程
- 设置守护进程的操作应该在开子进程之前即p.start()之前
-
1 from multiprocessing import Process 2 import time 3 def func1(): 4 print('begin') 5 time.sleep(3) 6 print('wawww') 7 8 # if __name__ == '__main__': 9 # p = Process(target=func1) 10 # # p.daemon = True 11 # p.start() 12 # time.sleep(1) 13 # print('in main') 14 ''' 15 结果: 16 begin 17 in main 18 19 结论:守护进程随着主进程结束而结束,那怕守护进程任务没有执行完毕 20 ''' 21 22 def f1(): 23 print('begin fun1') 24 time.sleep(3) 25 print('baidu') 26 27 def f2(): 28 while True: 29 print('in f2') 30 time.sleep(0.5) 31 32 if __name__ == '__main__': 33 Process(target=f1,).start() 34 p = Process(target=f2) 35 p.daemon = True 36 # 守护进程的属性,默认是False,如果设置成True,就表示设置这个子进程为一个守护进程 37 # 设置守护进程的操作应该在开启子进程之前 38 p.start() 39 time.sleep(1) 40 print('in main') # 主进程in main执行完后,守护进程就会结束,但主进程并没有结束而是等另一个子进程结束后才结束 41 42 43 # 设置成守护进程之后 会有什么效果呢? 44 # 守护进程会在主进程的代码执行完毕之后直接结束,无论守护进程是否执行完毕 45 46 # 应用 47 # 报活 主进程还活着 48 # 100台机器 100个进程 10000进程 49 # 应用是否在正常工作 - 任务管理器来查看 50 # 守护进程如何向监测机制报活???send/写数据库 51 # 为什么要用守护进程来报活呢?为什么不用主进程来工作呢??? 52 # 守护进程报活几乎不占用CPU,也不需要操作系统去调度 53 # 主进程不能严格的每60s就发送一条信息
-
进程同步控制
- 进程的同步控制 - 进程之间有一些简单的信号传递,但是用户不能感知,且用户不能传递自己想传递的内容
- 锁:multiprocessing.Lock ********* 【互斥锁】
- lock = Lock() # 创造一把锁
- lock.acquire() # 获取了这把锁的钥匙、阻塞,锁未换就一直阻塞着
- lock.release() # 归还这把钥匙
- 解决多个进程共享一段数据的时候,数据会出现不安全的现象,通过加锁来维护数据的安全性,同一刻,只允许一个线程修改数据
-
1 import json 2 import time 3 from multiprocessing import Lock 4 from multiprocessing import Process 5 6 # 锁 7 lock = Lock() # 创造了一把锁 8 lock.acquire() # 获取了这把锁的钥匙 9 lock.release() # 归还这把钥匙,其他进程就可以拿锁了 10 11 12 # 抢票的故事 13 # 需求:每个人都能查看余票、买相同车次票同一刻只能一人买完,另一人才能买 14 15 16 def search(i): 17 with open('db', encoding='utf-8') as f: 18 count_dic = json.load(f) 19 time.sleep(0.2) # 模拟网络延迟 20 print('person %s 余票:%s 张' % (i, count_dic.get('count'))) 21 return count_dic.get('count'), count_dic # 返回余票数,及字典 22 23 24 def buy(i): 25 count, count_dict = search(i) 26 if count > 0: 27 count_dict['count'] -= 1 # 有票就可以买 28 print('person %s 买票成功'% i) 29 time.sleep(2) 30 with open('db', 'w', encoding='utf-8') as f: 31 json.dump(count_dict, f) # 更改余票额度 32 33 34 def task(i, lock): 35 search(i) 36 lock.acquire() # 如果之前已经被acquire了 且 没有被release 那么进程会在这里阻塞 37 buy(i) 38 lock.release() 39 40 41 if __name__ == '__main__': 42 lock = Lock() 43 for i in range(1, 11): 44 Process(target=task, args=(i, lock)).start() 45 46 # 当多个进程共享一段数据的时候,数据会出现不安全的现象, 47 # 需要加锁来维护数据的安全性 48 49 ''' 50 D:installPython36python.exe D:/install/project/7、并发编程/3、锁.py 51 person 6 余票:5 张 52 person 5 余票:5 张 53 person 8 余票:5 张 54 person 2 余票:5 张 55 person 4 余票:5 张 56 person 10 余票:5 张 57 person 1 余票:5 张 58 person 9 余票:5 张 59 person 3 余票:5 张 60 person 7 余票:5 张 61 person 6 余票:5 张 62 person 6 买票成功 63 person 5 余票:4 张 64 person 5 买票成功 65 person 8 余票:3 张 66 person 8 买票成功 67 person 2 余票:2 张 68 person 2 买票成功 69 person 4 余票:1 张 70 person 4 买票成功 71 person 10 余票:0 张 72 person 1 余票:0 张 73 person 9 余票:0 张 74 person 3 余票:0 张 75 person 7 余票:0 张 76 77 Process finished with exit code 0 78 '''
注:进程间的数据交互,本质也用到了socket通信,不过都是本地的,基于文件的,可以通过将py名写成socket来看报错得知。
-
1 #加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。 2 虽然可以用文件共享数据实现进程间通信,但问题是: 3 1.效率低(共享数据基于文件,而文件是硬盘上的数据) 4 2.需要自己加锁处理 5 6 #因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。 7 队列和管道都是将数据存放于内存中 8 队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来, 9 我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。
- 信号量:multiprocessing.Semaphore
- 资源有限,同时允许一定数量的进程访问修改数据,即一把锁对应多把钥匙
- 本质:锁+ 计数器
-
1 互斥锁同时只允许一个线程更改数据,而信号量Semaphore是同时允许一定数量的线程更改数据 。 2 假设商场里有4个迷你唱吧,所以同时可以进去4个人,如果来了第五个人就要在外面等待,等到有人出来才能再进去玩。 3 实现: 4 信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞。这是迪科斯彻(Dijkstra)信号量概念P()和V()的Python实现。信号量同步机制适用于访问像服务器这样的有限资源。 5 信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念 6 7 8 import time, random 9 from multiprocessing import Semaphore, Process 10 11 # ktv 只有4个房间,即同时只能四个人进去,其他人必须等其中的人出来才能进去 12 # 13 # sem = Semaphore(4) # 设置信号量个数,并发数 14 # sem.acquire() 15 # print('进去1个人,关门阻塞中') 16 # sem.acquire() 17 # print('进去第2个人,关门阻塞中') 18 # sem.acquire() 19 # print('进去第3个人,关门阻塞中') 20 # sem.acquire() 21 # print('进去第4个人,关门阻塞中') 22 # sem.release() # 必须归还一把,才能继续下面的代码,不然一直阻塞中 23 # sem.acquire() 24 # print(6666) 25 # sem.release() 26 ''' 27 D:installPython36python.exe D:/install/project/7、并发编程/4、信号量.py 28 进去1个人,关门阻塞中 29 进去第2个人,关门阻塞中 30 进去第3个人,关门阻塞中 31 进去第4个人,关门阻塞中 32 6666 33 34 Process finished with exit code 0 35 ''' 36 37 38 def ktv(num, sem): 39 sem.acquire() 40 print('person %s 进入了ktv' % num) 41 time.sleep(random.randint(1, 4)) 42 print('person %s 进出了ktv' % num) 43 sem.release() 44 45 46 if __name__ == '__main__': 47 sem = Semaphore(4) 48 for i in range(10): 49 Process(target=ktv, args=(i, sem)).start() 50 51 ''' 52 最开始是4个同时进入,之后又人出,才能有人进 53 D:installPython36python.exe D:/install/project/7、并发编程/4、信号量.py 54 person 2 进入了ktv 55 person 8 进入了ktv 56 person 9 进入了ktv 57 person 7 进入了ktv 58 person 2 进出了ktv 59 person 6 进入了ktv 60 person 7 进出了ktv 61 person 5 进入了ktv 62 person 8 进出了ktv 63 person 1 进入了ktv 64 person 9 进出了ktv 65 person 0 进入了ktv 66 person 1 进出了ktv 67 person 4 进入了ktv 68 person 6 进出了ktv 69 person 3 进入了ktv 70 person 0 进出了ktv 71 person 5 进出了ktv 72 person 4 进出了ktv 73 person 3 进出了ktv 74 75 Process finished with exit code 0 76 '''
-
事件:multiprocessing.Event
-
定义:全局定义了一个‘flag’,如果标志为False,当程序执行event.wait方法时就会阻塞,如果为True,那么event.wait方法时便不再阻塞。
- 作用:主进程控制其他线程的执行,实现两个或多个线程间的交互
- 使用:
- 事件创立之初,默认是Faslse,即阻塞状态
- e = Event() # 创建事件,默认False
- print(e.is_set()) # 查看状态
- e.set() # 将标志设置为True
- e.clear() # 将标志设置为False
- e.wait() # 等待,当标志为False,那么阻塞,当标志为True,那么非阻塞,wait什么也不做直接pass
- e.wait(timeout=10) # 如果信号在阻塞10s之内变为True,那么就不继续阻塞直接pass,如果就阻塞10s之后状态还是没变,那么继续阻塞
-
1 from multiprocessing import Process, Event 2 import time, random 3 4 5 def car(e, n): 6 while True: 7 if not e.is_set(): # 进程刚开启,is_set()的值是Flase,模拟信号灯为红色 8 print('