1. 进程
引入模块
from multiprocessing import Process #谋定噗rua赛斯听 #噗rua赛斯
进程之间是空间隔离的,不共享资源
进程的两种创建方法
创建进程的第一种方式:
p1 = Process(target=func1, args=(1,)) #target(他给特)#args(啊渴死) p1.start() #start四大特
创建进行的第二种方式:
自己定义一个类,继承Process类,必须写一个run方法,想传参数,自行写init方法,然后执行super父类的init方法
class MyProcess(Process): def __init__(self,n,name): super().__init__() self.n = n self.name = name def run(self): # print(1+1) # print(123) print('子进程的进程ID',os.getpid()) print('你看看n>>',self.n) if __name__ == '__main__': p1 = MyProcess(100,name='子进程1') p1.start() #给操作系统发送创建进程的指令,子进程创建好之后,要被执行,执行的时候就会执行run方法 print('p1.name',p1.name) print('p1.pid',p1.pid) print('主进程结束')
进程的其他方法
import time from multiprocessing import Process def func1(): time.sleep(2) print() print('子进程') if __name__ == '__main__': p1 = Process(target=func1,) p1.start() p1.terminate() # 给操作系统发了一个关闭p1子进程的信号,关闭进程 terminate(涛妹雷特) time.sleep(1) 四离谱 print('进程是否还活着:',p1.is_alive())#判断进程是否存活 is_alive(A子蓝付) print(p1.pid) print('主进程结束')
class MyProcess(Process): def __init__(self,n,name): super().__init__() self.n = n self.name = name def run(self): # print(1+1) # print(123) print('子进程的进程ID',os.getpid()) print('你看看n>>',self.n) if __name__ == '__main__': p1 = MyProcess(100,name='子进程1') p1.start() #给操作系统发送创建进程的指令,子进程创建好之后,要被执行,执行的时候就会执行run方法 print('p1.name',p1.name) print('p1.pid',p1.pid) print('主进程结束')
join方法
global_num = 100 def func1(): time.sleep(2) global global_num global_num = 0 print('子进程全局变量>>>',global_num) if __name__ == '__main__': p1 = Process(target=func1,) p1.start() print('子进程执行') #time.sleep(3) p1.join() #阻塞住,等待你的p1子进程执行结束,主进程的程序才能从这里继续往下执行 print('主进程的全局变量>>>',global_num)
global_num = 100 def func1(): start_time = time.time() time.sleep(2) global global_num global_num = 0 print('子进程全局变量>>>',global_num) end_time = time.time() print(end_time - start_time) if __name__ == '__main__': p1 = Process(target=func1,) p1.start() print('子进程执行') #time.sleep(3) p1.join() #阻塞住,等待你的p1子进程执行结束,主进程的程序才能从这里继续往下执行 print('主进程的全局变量>>>',global_num)
#for循环在创建进程中的应用 def fun1(n): time.sleep(1) print(n) if __name__ == '__main__': pro_list = [] for i in range(10): p1 = Process(target=fun1,args=(i,)) p1.start() pro_list.append(p1) for p in pro_list: p.join() print('主进程结束')
守护进程 一定要在start之前设置守护进程
import time import os from multiprocessing import Process def func1(): time.sleep(5) print(os.getpid()) print('子进程') if __name__ == '__main__': p1 = Process(target=func1,) p1.daemon = True#将p1子进程设置为守护进程 daemon(地们) p1.start() # print('主进程的ID',os.getpid()) print('主进程结束')
三、进程同步(锁)
问题1:为什么要加进程锁?
线程锁是为了在线程不安全的时候,为一段代码加上锁来控制实现线程安全,即线程间数据隔离;
进程间的数据本来就是隔离的,所以一般不用加锁,当进程间共用某个数据的时候需要加锁;
Look(唠嗑)模块 acquire(额快也)#加锁 release(蕊李四)还锁
加锁的另种形式: with
import json import time import random from multiprocessing import Process,Lock def get_ticket(i,ticket_lock): print('我们都到齐了,大家预备!!123') #所有进程异步执行,到这里等待,同时再去抢下面的代码执行 time.sleep(1) ticket_lock.acquire() #这里有个门,只有一个人能够抢到这个钥匙,加锁 with open('ticket','r') as f: #将文件数据load为字典类型的数据 last_ticket_info = json.load(f) #查看一下余票信息 last_ticket = last_ticket_info['count'] #如果看到余票大于0,说明你可以抢到票 if last_ticket > 0: #模拟网络延迟时间 time.sleep(random.random()) #抢到一张票就减去1 last_ticket = last_ticket - 1 last_ticket_info['count'] = last_ticket #将修改后的票数写回文件 with open('ticket','w') as f: #通过json.dump方法来写回文件,字符串的形式 json.dump(last_ticket_info,f) print('%s号抢到了,丫nb!' % i) else: print('%s号傻逼,没票啦,明年再来!' % i) #释放锁,也就是还钥匙的操作 ticket_lock.release() if __name__ == '__main__': #创建一个锁 ticket_lock = Lock() for i in range(10): #将锁作为参数传给每个进程,因为每个进程都需要通过锁来进行限制,同步 p = Process(target=get_ticket,args=(i,ticket_lock,)) p.start()
四、信号量(了解)
Semaphore(赛呢fao)
import time import random from multiprocessing import Process,Semaphore def dbj(i,s): s.acquire() print('%s号男主人公来洗脚'%i) print('-------------') time.sleep(random.randrange(3,6)) # print(time.time()) s.release() if __name__ == '__main__': s = Semaphore(4) #创建一个计数器,每次acquire就减1,直到减到0,那么上面的任务只有4个在同时异步的执行,后面的进程需要等待. for i in range(10): p1 = Process(target=dbj,args=(i,s,)) p1.start()
五、事件(了解)
Event(额外特)
e.clear(可累儿)将e改成False
e.wait(威特)等待
e.set(赛特)将e是True
e.is_set()查看当前e的状态是True或者False
import time import random from multiprocessing import Process,Event #模拟红绿灯执行状态的函数 def traffic_lights(e): while 1: print('红灯啦') time.sleep(5) e.set() print('绿灯亮') time.sleep(3) e.clear() #将e改为了False def car(i,e): if not e.is_set(): #新来的车看到是红灯 print('我们在等待.....') e.wait() print('走你') else: print('可以走了!!!') if __name__ == '__main__': e = Event() hld = Process(target=traffic_lights,args=(e,)) hld.start() while 1: time.sleep(0.5) #创建10个车 for i in range(3): # time.sleep(random.randrange(1,3)) p1 = Process(target=car,args=(i,e,)) p1.start()
复制代码 from multiprocessing import Process,Semaphore,Event import time,random e = Event() #创建一个事件对象 print(e.is_set()) #is_set()查看一个事件的状态,默认为False,可通过set方法改为True print('look here!') # e.set() #将is_set()的状态改为True。 # print(e.is_set())#is_set()查看一个事件的状态,默认为False,可通过set方法改为Tr # e.clear() #将is_set()的状态改为False # print(e.is_set())#is_set()查看一个事件的状态,默认为False,可通过set方法改为Tr e.wait() #根据is_set()的状态结果来决定是否在这阻塞住,is_set()=False那么就阻塞,is_set()=True就不阻塞 print('give me!!') #set和clear 修改事件的状态 set-->True clear-->False #is_set 用来查看一个事件的状态 #wait 依据事件的状态来决定是否阻塞 False-->阻塞 True-->不阻塞
六、进程间通信 IPC(重要)
队列:(重点)
full(fao)
from multiprocessing import Process,Queue #先进后出 ''' 要注意的事项 q.full()队列满了返回True,不满返回False 队列为空的时候,get会阻塞 盖特 put超出了队列长度,你put插入数据的时候会阻塞 破特 print('>>>',q.empty()) #不可信,队列空了返回True,不为空返回False ''' q = Queue(3) q.put(1)#往队列里写入3个数据 q.put(2) q.put(3) print(q.get())#获取值 #可以用异常踹一下 while 1: try: q.get(False) #queue.Empty except: print('队列目前是空的')
队列实现进程间的通信
import time from multiprocessing import Process,Queue def girl(q): print('来自boy的消息',q.get()) print('来自校领导的凝视',q.get()) def boy(q): q.put('约吗') if __name__ == '__main__': q = Queue(5) boy_p = Process(target=boy,args=(q,)) girl_p = Process(target=girl,args=(q,)) boy_p.start() girl_p.start() time.sleep(1) q.put('好好工作,别乱搞')
生产者消费者模型
import time from multiprocessing import Process,Queue def producer(q): for i in range(1,11): time.sleep(1) print('生产了包子%s号' % i) q.put(i) q.put(None) #针对第三个版本的消费者,往队列里面加了一个结束信号 def consumer(q): while 1: time.sleep(2) s = q.get() if s == None: break else: print('消费者吃了%s包子' % s) if __name__ == '__main__': #通过队列来模拟缓冲区,大小设置为20 q = Queue(20) #生产者进程 pro_p = Process(target=producer,args=(q,)) pro_p.start() #消费者进程 con_p = Process(target=consumer,args=(q,)) con_p.start()
生产者消费者模型主进程发送结束信号
import time from multiprocessing import Process,Queue def producer(q): for i in range(1,11): time.sleep(1) print('生产了包子%s号' % i) q.put(i) def consumer(q): while 1: time.sleep(2) s = q.get() if s == None: break else: print('消费者吃了%s包子' % s) if __name__ == '__main__': #通过队列来模拟缓冲区,大小设置为20 q = Queue(20) #生产者进程 pro_p = Process(target=producer,args=(q,)) pro_p.start() #消费者进程 con_p = Process(target=consumer,args=(q,)) con_p.start() pro_p.join() q.put(None)
JoinableQueue的生产着消费者模型 Joinable(准呢包)Queue 张波Q
q.task_done(她可死但) 给q对象发送一个任务结束的信号
import time from multiprocessing import Process,Queue,JoinableQueue def producer(q): for i in range(1,11): time.sleep(0.5) print('生产了包子%s号' % i) q.put(i) q.join() print('在这里等你') def consumer(q): while 1: time.sleep(1) s = q.get() print('消费者吃了%s包子' % s) q.task_done() #给q对象发送一个任务结束的信号 if __name__ == '__main__': #通过队列来模拟缓冲区,大小设置为20 q = JoinableQueue(20) #生产者进程 pro_p = Process(target=producer,args=(q,)) pro_p.start() #消费者进程 con_p = Process(target=consumer,args=(q,)) con_p.daemon = True # con_p.start() pro_p.join() print('主进程结束')
管道:
进程间通信(IPC)方式二:管道(了解即可)
Pipe(牌破)
from multiprocessing import Process,Pipe def func1(conn1,conn2): try: msg = conn2.recv() print('>>>',msg) #如果管道一端关闭了,那么另外一端在接收消息的时候会报错 msg2 = conn2.recv() #EOFError except EOFError: print('对方管道一端已经关闭') conn2.close() if __name__ == '__main__': conn1, conn2 = Pipe() p = Process(target=func1,args=(conn1,conn2,)) p.start() conn1.send('小鬼!') conn1.close() # conn1.recv() #OSError: handle is closed
数据共享:(了解)
Lock(啦个)
from multiprocessing import Process,Manager,Lock def func(m_dic,ml): #不加锁的情况会出现数据错乱 # m_dic['count'] -= 1 #加锁,这是另外一种加锁形式 with ml: m_dic['count'] -= 1 #等同 # ml.acquire() # m_dic['count'] -= 1 # ml.release() if __name__ == '__main__': m = Manager() ml = Lock() m_dic = m.dict({'count':100}) # print('主进程', m_dic) p_list = [] #开启20个进程来对共享数据进行修改 for i in range(20): p1 = Process(target=func,args=(m_dic,ml,)) p1.start() p_list.append(p1) [ppp.join() for ppp in p_list] print('主进程',m_dic)
进程池和mutiprocess.Poll(噗奥)
为什么要有进程池?进程池的概念。
在程序实际处理问题过程中,忙时会有成千上万的任务需要被执行,闲时可能只有零星任务。那么在成千上万个任务需要被执行的时候,我们就需要去创建成千上万个进程么?首先,创建进程需要消耗时间,销毁进程(空间,变量,文件信息等等的内容)也需要消耗时间。第二即便开启了成千上万的进程,操作系统也不能让他们同时执行,维护一个很大的进程列表的同时,调度的时候,还需要进行切换并且记录每个进程的执行节点,也就是记录上下文(各种变量等等乱七八糟的东西,虽然你看不到,但是操作系统都要做),这样反而会影响程序的效率。因此我们不能无限制的根据任务开启或者结束进程。就看我们上面的一些代码例子,你会发现有些程序是不是执行的时候比较慢才出结果,就是这个原因,那么我们要怎么做呢?
在这里,要给大家介绍一个进程池的概念,定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果
p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。 '''需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()''' p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。 '''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。''' p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成 P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用
apply(额扑来) apply_async(额扑来森科) Map(麦坡)
import time from multiprocessing import Pool,Process #针对range(100)这种参数的 # def func(n): # for i in range(3): # print(n + 1) def func(n): print(n) # 结果: # (1, 2) # alex def func2(n): for i in range(3): print(n - 1) if __name__ == '__main__': #1.进程池的模式 s1 = time.time() #我们计算一下开多进程和进程池的执行效率 poll = Pool(5) #创建含有5个进程的进程池 # poll.map(func,range(100)) #异步调用进程,开启100个任务,map自带join的功能 poll.map(func,[(1,2),'alex']) #异步调用进程,开启100个任务,map自带join的功能 # poll.map(func2,range(100)) #如果想让进程池完成不同的任务,可以直接这样搞 #map只限于接收一个可迭代的数据类型参数,列表啊,元祖啊等等,如果想做其他的参数之类的操作,需要用后面我们要学的方法。 # t1 = time.time() - s1 # # #2.多进程的模式 # s2 = time.time() # p_list = [] # for i in range(100): # p = Process(target=func,args=(i,)) # p_list.append(p) # p.start() # [pp.join() for pp in p_list] # t2 = time.time() - s2 # # print('t1>>',t1) #结果:0.5146853923797607s 进程池的效率高 # print('t2>>',t2) #结果:12.092015027999878s
同步与异步两种执行方式:
同步:
import time from multiprocessing import Process,Pool def fun(i): time.sleep(0.5) # print(i) return i**2 if __name__ == '__main__': p = Pool(4) for i in range(10): res = p.apply(fun,args=(i,)) #同步执行的方法,他会等待你的任务的返回结果, print(res)
异步:
import time from multiprocessing import Process,Pool def fun(i): time.sleep(1) print(i) return i**2 if __name__ == '__main__': p = Pool(4) res_list = [] for i in range(10): res = p.apply_async(fun,args=(i,)) res_list.append(res) p.close() # 不是关闭进程池,而是不允许再有其他任务来使用进程池 p.join() # 这是感知进程池中任务的方法,进程池中所有的进程随着主进程的结束而结束了,等待进程池的任务全部执行完 #循环打印结果 for e_res in res_list: print('结果:',e_res.get()) print('主进程结束')
需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数,
这是进程池特有的,普通进程没有这个机制,但是我们也可以通过进程通信来拿到返回值,进程池的这个回调也是进程通信的机制完成的。 我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果
import os from multiprocessing import Pool def func1(n): print('func1>>',os.getpid()) # print('func1') return n*n def func2(nn): print('func2>>',os.getpid()) # print('func2') print(nn) # import time # time.sleep(0.5) if __name__ == '__main__': print('主进程:',os.getpid()) p = Pool(4) p.apply_async(func1,args=(10,),callback=func2) p.close() p.join()
回调函数在写的时候注意一点,回调函数的形参执行有一个,如果你的执行函数有多个返回值,那么也可以被回调函数的这一个形参接收,接收的是一个元祖,包含着你执行函数的所有返回值。