网络编程6_multiprocess模块.锁.队列

一. multiprocess模块 仔细说来,multiprocess不是一个模块而是python中一个操作、管理进程的包。 之所以叫multi是取自multiple的多功能的意思,在这个包中几乎包含了和进程有关的所有子模块。由于提供的子模块非常多,为了方便大家归类记忆,我将这部分大致分为四个部分:创建进程部分,进程同步部分,进程池部分,进程之间数据共享。重点强调:进程没有任何共享状态,进程修改的数据,改动仅限于该进程内,但是通过一些特殊的方法,可以实现进程之间数据的共享。 1. Process模块 Process是穿件进程的模块, 借助这个模块, 可以实现进程的创建 Process([group [, target [, name [, args [, kwargs]]]]])由该类实例化一个对象, 表示一个子进程中的任务(尚未启动) 强调: (1). 需要使用关键字的方式来指定参数 (2). args指定的为传给target函数的位置参数, 是一个元祖形式, 必须有逗号 (1). 看一个程序实例: from multiprocessing import Process def func(): print(12345) # 当我们运行当前这个test.py文件的时候, 就产生了进程, 这个进程我们称之为主进程 if __name__ == '__main__': # 将函数注册到一个进程中, p是一个进程对象, 此时还没有启动进程, 只是创建了一个进程对象, 并且func是不加括号的的, 因为加上括号就直接运行了 p = Process(target=func, ) # 告诉操作系统, 给我开启一个进程, func这个函数就被我们新开的这个进程执行了, 而这个进程是我主程序创建出来的所以称这个新创建的进程为主进程的子进程, 而主进程又可以称之为这个新进程的父进程 # 而这个子进程中执行的程序,相当于将现在这个test.py文件中的程序copy到一个你看不到的python文件中去执行了,就相当于当前这个文件,被另外一个py文件import过去并执行了。 # start并不是直接就去执行了,我们知道进程有三个状态,进程会进入进程的三个状态,就绪,(被调度,也就是时间片切换到它的时候)执行,阻塞,并且在这个三个状态之间不断的转换,等待cpu执行时间片到了。 p.start() # 这是主进程的程序,上面开启的子进程的程序是和主进程的程序同时运行的,我们称为异步 print("*"*10) (2). 上面说了,我们通过主进程创建的子进程是异步执行的,那么我们就验证一下,并且看一下子进程和主进程(也就是父进程)的ID号(讲一下pid和ppid,使用pycharm举例),来看看是否是父子关系。 import time import os # os.getpid() # 获取自己的进程号 # os.getppid() # 获取自己进程的父进程的ID号 from multiprocessing import Process def func(): print("aaaaa") time.sleep(1) print("子进程>>>", os.getpid()) print("该子进程的父进程>>>", os.getppid()) print(12345) if __name__ == '__main__': p = Process(target=func, ) p.start() print("*"*10) print("父进程>>>", os.getpid()) print("父进程的父进程>>>", os.getppid()) # ********** # 首先打印出来了主进程的程序,然后打印的是子进程的,也就是子进程是异步执行的,相当于主进程和子进程同时运行着,如果是同步的话,我们先执行的是func(),然后再打印主进程最后的10个*号。 # 父进程>>> 9044 # 父进程的父进程>>> 9528 #我运行的test.py文件的父进程号,它是pycharm的进程号 # aaaaa # 子进程>>> 10476 # 该子进程的父进程>>> 9044 #是我主进程的ID号,说明主进程为它的父进程 # 12345 (3). 看一个问题,说明linux和windows两个不同的操作系统创建进程的不同机制导致的不同结果: import time import os from multiprocessing import Process def func(): print('aaaa') time.sleep(1) print('子进程>>',os.getpid()) print('该子进程的父进程>>',os.getppid()) print(12345) print('太白老司机') """如果我在这里加了一个打印,你会发现运行结果中会出现两次打印出来的太白老司机,因为我们在主进程中开了一个子进程,子进程中的程序相当于import的主进程中的程序,那么import的时候会不会执行你import的那个文件的程序啊,前面学的,是会执行的,所以出现了两次打印, 其实是因为windows开起进程的机制决定的,在linux下是不存在这个效果的,因为windows使用的是process方法来开启进程,他就会拿到主进程中的所有程序,而linux下只是去执行我子进程中注册的那个函数,不会执行别的程序,这也是为什么在windows执行程序的时候,要加上if __name__ == '__main__':,否则会出现子进程中运行的时候还开启子进程,那就出现无限循环的创建进程了,就报错了""" if __name__ == '__main__': p = Process(target=func, ) p.start() print("*"*10) print("父进程>>>", os.getpid()) print("父进程的父进程>>>", os.getppid()) # 太白老司机 # ********** # 父进程>>> 11204 # 父进程的父进程>>> 9528 # 太白老司机 # aaaa # 子进程>> 6644 # 该子进程的父进程>> 11204 # 12345 (4). 一个进程的生命周期:如果子进程的运行时间长,那么等到子进程执行结束程序才结束,如果主进程的执行时间长,那么主进程执行结束程序才结束,实际上我们在子进程中打印的内容是在主进程的执行结果中看不出来的,但是pycharm帮我们做了优化,因为它会识别到你这是开的子进程,帮你把子进程中打印的内容打印到了显示台上。 如果说一个主进程运行完了之后,我们把pycharm关了,但是子进程还没有执行结束,那么子进程还存在吗?这要看你的进程是如何配置的,如果说我们没有配置说我主进程结束,子进程要跟着结束,那么主进程结束的时候,子进程是不会跟着结束的,他会自己执行完,如果我设定的是主进程结束,子进程必须跟着结束,那么就不会出现单独的子进程(孤儿进程)了,具体如何设置,看下面的守护进程的讲解。比如说,我们将来启动项目的时候,可能通过cmd来启动,那么我cmd关闭了你的项目就会关闭吗,不会的,因为你的项目不能停止对外的服务,对吧 (5). Process类中参数的介绍: 参数介绍: ①. group参数未使用,值始终为None ②. target表示调用对象,即子进程要执行的任务 ③. args表示调用对象的位置参数元组,args=(1,2,'egon',) ④. kwargs表示调用对象的字典,kwargs={'name':'egon','age':18} ⑤. name为子进程的名称 给要执行的函数传参数: def func(x,y): print(x) time.sleep(1) print(y) if __name__ == '__main__': p = Process(target=func,args=('姑娘','来玩啊!'))#这是func需要接收的参数的传送方式。 p.start() print('父进程执行结束!') #执行结果: 父进程执行结束! 姑娘 来玩啊! (6). Process类中各方法的介绍: ①. p.start():启动进程,并调用该子进程中的p.run() ②. p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法 ③. p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁 ④. p.is_alive():如果p仍然运行,返回True ⑤. p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程 jion方法的例子: 让主进程加上join的地方等待(也就是阻塞住), 等待子程序执行完之后, 在继续往下执行主进程, 很多时候, 我们主进程需要子进程的执行结果, 所以必须要等待, join有点像把子程序和主程序拼接起来, 将异步改为同步执行 import time from multiprocessing import Process def func(x, y): print(x) time.sleep(1) print(y) if __name__ == "__main__": p = Process(target=func, args=("姑娘", "来玩啊")) p.start() print("这里是异步的") p.join() print("父程序执行结束") # 打印结果 这里是异步的 姑娘 来玩啊 父程序执行结束 用for循环开启多个线程: import time import os from multiprocessing import Process def func(x, y): print(x) # time.sleep(1) print(y) if __name__ == '__main__': p_list = [] for i in range(10): p = Process(target=func, args=("姑娘%s"%i, "来玩啊")) p_list.append(p) p.start() # 1、如果加到for循环里面,那么所有子进程包括父进程就全部变为同步了,因为for循环也是主进程的,循环第一次的时候,一个进程去执行了,然后这个进程就join住了,那么for循环就不会继续执行了,等着第一个子进程执行结束才会继续执行for循环去创建第二个子进程。 # 2、如果我不想这样的,也就是我想所有的子进程是异步的,然后所有的子进程执行完了再执行主进程 p.join() # 4、这是解决办法,前提是我们的子进程全部都已经去执行了,那么我在一次给所有正在执行的子进程加上join,那么主进程就需要等着所有子进程执行结束才会继续执行自己的程序了,并且保障了所有子进程是异步执行的。 [ap.join() for ap in p_list] # 3、如果这样写的话,多次运行之后,你会发现会出现主进程的程序比一些子进程先执行完,因为我们p.join()是对最后一个子进程进行了join,也就是说如果这最后一个子进程先于其他子进程执行完,那么主进程就会去执行,而此时如果还有一些子进程没有执行完,而主进程执行完了,那么就会先打印主进程的内容了,这个cpu调度进程的机制有关系,因为我们的电脑可能只有4个cpu,我的子进程加上住进程有11个,虽然我for循环是按顺序起进程的,但是操作系统一定会按照顺序给你执行你的进程吗,答案是不会的,操作系统会按照自己的算法来分配进程给cpu去执行,这里也解释了我们打印出来的子进程中的内容也是没有固定顺序的原因,因为打印结果也需要调用cpu,可以理解成进程在争抢cpu,如果同学你想问这是什么算法,这就要去研究操作系统啦。那我们的想所有子进程异步执行,然后再执行主进程的这个需求怎么解决啊 p.join() print("不要钱") 模拟两个应用场景, 1. 同时对一个文件进行写操作, 2. 同时创建多个文件 import time import os import re from multiprocessing import Process # 多进程同时对一个文件进行写操作 # def func(x, y, i): # with open(x, "a", encoding = "utf-8") as f: # print("当前进程%s拿到的文件的光标位置>>%s" % (os.getpid(), f.tell())) # f.write("%s "%y) # 多线程同时创建多个文件 def func(x, y): with open(x, "w", encoding="utf-8") as f: f.write(y) if __name__ == '__main__': p_list = [] for i in range(10): # p = Process(target=func, args=("can_do_girl_lists.txt", "姑娘%s"%(i+1), i+1)) p = Process(target=func, args=("girl/can_do_girl_lists_%s.txt"%(i+1), "姑娘%s"%(i+1))) p_list.append(p) p.start() [ap.join() for ap in p_list] # with open("can_do_girl_lists.txt", "r", encoding="utf-8") as f: # data = f.read() # all_num = re.findall("d+", data) # print(">>>>>", all_num, ".....%s"%(len(all_num))) # print([i for i in os.walk(r"D:/1PY/Day30")]) print("不要钱") (7). Process类中自带封装的各属性的介绍 ①. p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置 ②. p.name:进程的名称 ③. p.pid:进程的pid ④. p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可) ⑤. p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可) 2. Process类的使用 注意: 在windows中Process()必须放到 if __name__ == "__main__": 下 由于Windows没有fork,多处理模块启动一个新的Python进程并导入调用模块。 如果在导入时调用Process(),那么这将启动无限继承的新进程(或直到机器耗尽资源)。 这是隐藏对Process()内部调用的原,使用if __name__ == “__main __”,这个if语句中的语句将不会在导入时被调用。 进程创建的第二种方法(继承): (1). 进程创建的第二种方法: import os from multiprocessing import Process class MyProcess(Process): def __init__(self, person): super().__init__() # 必须执行父类的__init__方法 self.person = person def run(self): print(os.getpid()) print(self.pid) print("%s正在和女主播聊天" % self.person) # def start(self): # self.run() # print("我是%s" % self.person) if __name__ == '__main__': p1 = MyProcess("Jedan") p2 = MyProcess("太白") p3 = MyProcess("alexDSB") p1.start() p2.start() p2.run() p3.start() p1.join() p2.join() p3.join() (2). 进程之间的数据是隔离的 from multiprocessing import Process n = 100 def work(): global n n = 0 print("子进程内:", n) # print(n) if __name__ == '__main__': p = Process(target=work) p.start() p.join() #等待子进程执行完毕,如果数据共享的话,我子进程是不是通过global将n改为0了,但是你看打印结果,主进程在子进程执行结束之后,仍然是n=100,子进程n=0,说明子进程对n的修改没有在主进程中生效,说明什么?说明他们之间的数据是隔离的,互相不影响的 print("主进程内:", n) # 子进程内: 0 # 主进程内: 100 (3). 多线程实现多个客户端通信 (4). is_alive(), terminate() import time from multiprocessing import Process class Piao(Process): def __init__(self, name): super().__init__() self.name = name def run(self): print("%s is 打飞机" % self.name) s = input("!!!") # 在pycharm下子进程中不能input输入, 会报错 EOFError: EOF when reading a line, 因为子进程中没有像我们主进程这样的在pycharm下的控制台可以输入东西的地方 time.sleep(2) print("%s is 打飞机结束" % self.name) if __name__ == '__main__': p1 = Piao("太白") p1.start() p1.join() # time.sleep(5) p1.terminate() # 关闭进程, 不会立即关闭, 有个等着操作系统去关闭这个进程的时间, 所以is_alive立刻查看的结果可能还是存活, 但是稍微等一会就关闭了 print(p1.is_alive()) # 查看子程序是否还存活 print("等会...") time.sleep(1) print(p1.is_alive()) (5). 僵尸进程(有害)和孤儿进程(无害) 僵尸进程: 一个进程使用fork创建子进程, 如果子程序退出, 而父程序没有调用wait或waitpid获取子进程的状态消息, 那么子进程的进程描述符任然保存在系统中, 这个进程称之为僵尸进程 任何一个子进程(init除外)在exit()之后,并非马上就消失掉,而是留下一个称为僵尸进程(Zombie)的数据结构,等待父进程处理。这是每个子进程在结束时都要经过的阶段。如果子进程在exit()之后,父进程没有来得及处理,这时用ps命令就能看到子进程的状态是“Z”。如果父进程能及时 处理,可能用ps命令就来不及看到子进程的僵尸状态,但这并不等于子进程不经过僵尸状态。 如果父进程在子进程结束之前退出,则子进程将由init接管。init将会以父进程的身份对僵尸状态的子进程进行处理。 孤儿进程: 一个父进程退出, 而它的一个或多个子进程还在运行, 那么那些子进程将会成为孤儿进程, 孤儿进程将会被init进程(进程号为1)所收养, 并由init进程对他们完成状态收集工作 僵尸进程的危害场景: 例如有个进程,它定期的产 生一个子进程,这个子进程需要做的事情很少,做完它该做的事情之后就退出了,因此这个子进程的生命周期很短,但是,父进程只管生成新的子进程,至于子进程 退出之后的事情,则一概不闻不问,这样,系统运行上一段时间之后,系统中就会存在很多的僵死进程,倘若用ps命令查看的话,就会看到很多状态为Z的进程。 严格地来说,僵死进程并不是问题的根源,罪魁祸首是产生出大量僵死进程的那个父进程。因此,当我们寻求如何消灭系统中大量的僵死进程时,答案就是把产生大 量僵死进程的那个元凶枪毙掉(也就是通过kill发送SIGTERM或者SIGKILL信号啦)。枪毙了元凶进程之后,它产生的僵死进程就变成了孤儿进 程,这些孤儿进程会被init进程接管,init进程会wait()这些孤儿进程,释放它们占用的系统进程表中的资源,这样,这些已经僵死的孤儿进程 就能瞑目而去了。 3. 守护进程 如果主进程结束了, 由主进程创建的子进程必须跟着结束, 这时就需要守护进程 主进程创建守护进程: 其一. 守护进程会在主进程代码执行结束后就终止 其二. 守护进程内无法再开启子进程, 否则会抛出异常AssertionError: daemonic processes are not allowed to have children 注意: 进程之间是相互独立的, 主进程代码运行结束, 守护进程随即终止 import os, time from multiprocessing import Process class MyProcess(Process): def __init__(self, person): super().__init__() self.name = name deg run(self): print(os.getpid(), self.name) print("%s正在和女主播聊天" % self.name) time.sleep(3) if __name__ == "__mian__": p = MyProcess("太白") p.daemon = True # 一定要在p.start()之前设置p为守护进程, 禁止p创建子进程, 并且父进程代码执行结束, p即终止运行 p.start() print("宝") 4. 进程同步(锁) 利用并发编程可以更加充分的利用io资源, 但也带来了新的问题: 进程之间的数据不共享, 但是共享同一套文件系统, 所以访问同一个文件或者同一个打印终端, 是没有问题的, 而共享带来的是竞争, 竞争带来的是错乱, 如何控制, 就是枷锁 (1). 多进程抢占输出资源, 导致打印混乱的示例: import os, time, random from multiprocessing import Process def work(n): print("%s:%s is running" % (n, os.getpid())) time.sleep(random.randint(1,3)) print("%s:%s is done" % (n, os.getpid())) if __name__ == '__main__': for i in range(5): p = Process(target = work, args = (i,)) p.start() # 3:6716 is running # 4:10220 is running # 0:5524 is running # 1:3164 is running # 2:10036 is running # 3:6716 is done # 4:10220 is done # 1:3164 is done # 0:5524 is done # 2:10036 is done 两个问题: 一. 每个进程中work函数的第一个打印就不是按照我们for循环的顺序来打印的 二. 每个work都要两个打印, 但时第一个打印的顺序是3-4-0-1-2, 而第二个打印的顺序是3-4-1-0-2, 说明我们一个进程中的程序顺序都乱了 第二个问题可以通过枷锁来解决, 第一个问题是没法解决的, 因为进程开到了内核, 由操作系统来决定进程的调度, 无法控制 (2). 加锁, 由并发改为了串行 import os, time from multiprocessing import Process, Lock def work(n, l): # 加锁, 保证每一次只有一个进程在执行锁里面的程序, 这一段程序对于所有写上这个锁的进程, 大家都变成了串行 lock.acquire() print("%s:%s is running" % (n, os.getpid())) time.sleep(1) print("%s:%s is done" % (n, os.getpid())) # 解锁, 解锁之后其他进程才能去执行自己的程序 lock.release() if __name_ == "__main__": lock = Lock() for i in range(5): p = Process(target=work, args=(i, l)) p.start() # 2:4032 is running # 2:4032 is done # 0:8444 is running # 0:8444 is done # 4:2872 is running # 4:2872 is done # 3:7480 is running # 3:7480 is done # 1:5196 is running # 1:5196 is done 结果分析:(自己去多次运行一下,看看结果,我拿出其中一个结果来看)通过结果我们可以看出,多进程刚开始去执行的时候,每次运行,首先打印出来哪个进程的程序是不固定的,但是我们解决了上面打印混乱示例代码的第二个问题,那就是同一个进程中的两次打印都是先完成的,然后才切换到下一个进程去,打印下一个进程中的两个打印结果,说明我们控制住了同一进程中的代码执行顺序,如果涉及到多个进程去操作同一个数据或者文件的时候,就不担心数据算错或者文件中的内容写入混乱了。 上面这种情况虽然使用加锁的形式实现了顺序的执行, 但是程序又重新变成串行了, 这样确实会浪费了时间, 但是却保证了数据的安全 (3). 模拟抢票 import json, time from multiprocessing import Process, Lock def check(n): ticket_dic = json.load(open("ticketinfo.json", "r", encoding="utf-8")) print(ticket_dic) print("%s查看了余票数, 尚有余票%s张" % (n, ticket_dic["余票"])) def buy(n): ticket_dic = json.load(open("ticketinfo.json", "r", encoding="utf-8")) if ticket_dic["余票"]>0: time.sleep(1) ticket_dic["余票"] -= 1 json.dump(ticket_dic, open("ticketinfo.json", "w", encoding="utf-8"), ensure_ascii=False) print("%s购票成功" % n) else: print("没票了") def task(n, lock): # def task(n): check(n) lock.acquire() buy(n) lock.release() if __name__ == '__main__': lock = Lock() for i in range(5): p = Process(target=task, args=(i, lock)) # p = Process(target=task, args=(i, )) p.start() # {'余票': 2} # 1查看了余票数, 尚有余票2张 # {'余票': 2} # 2查看了余票数, 尚有余票2张 # {'余票': 2} # 0查看了余票数, 尚有余票2张 # {'余票': 2} # 3查看了余票数, 尚有余票2张 # {'余票': 2} # 4查看了余票数, 尚有余票2张 # 1购票成功 # 2购票成功 # 没票了 # 没票了 # 没票了 进程锁总结: 加锁可以保证多个进程修改同一块数据时, 同一时间只能有一个任务可以进行修改, 即串行的修改, 没错, 速度是慢了, 但保证了数据安全. 虽然可以用文件共享数据实现进程间通信, 但问题是: 1效率低(共享数据基于文件, 而文件是硬盘上的数据). 2需要自己加锁处理 因此我们最好找一种解决方案能够兼顾: 1效率高(多个进程共享一块内存的数据) 2帮我们处理好问题, 这就是multiprocessing模块为我们提供的基于消息的IPC通信机制: 队列和管道 队列和管道都是将数据存放于内存中 队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来, 我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。 IPC通信机制(了解):IPC是intent-Process Communication的缩写,含义为进程间通信或者跨进程通信,是指两个进程之间进行数据交换的过程。IPC不是某个系统所独有的,任何一个操作系统都需要有相应的IPC机制, 比如Windows上可以通过剪贴板、管道和邮槽等来进行进程间通信,而Linux上可以通过命名共享内容、信号量等来进行进程间通信。Android它也有自己的进程间通信方式,Android建构在Linux基础上,继承了一 部分Linux的通信方式。 5. 队列 进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的。队列就像一个特殊的列表,但是可以设置固定长度,并且从前面插入数据,从后面取出数据,先进先出。 Queue([maxsize]) 创建共享的进程队列。 参数 :maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。 底层队列使用管道和锁实现。 (1). 方法介绍 q = Queue([maxsize]) 创建共享的进程队列。maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。底层队列使用管道和锁定实现。另外,还需要运行支持线程以便队列中的数据传输到底层管道中。 Queue的实例q具有以下方法: q.get( [ block [ ,timeout ] ] ) 返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。 q.get_nowait( ) 同q.get(False)方法。 q.put(item [, block [,timeout ] ] ) 将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Full 异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。 q.qsize() 返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。 q.empty() 如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目 q.full() 如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。。 q.close() 关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,如果某个使用者正被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。 q.cancel_join_thread() 不会再进程退出时自动连接后台线程。这可以防止join_thread()方法阻塞。 q.join_thread() 连接队列的后台线程。此方法用于在调用q.close()方法后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread()方法可以禁止这种行为。 (2). 队列是进程安全的, 同一时间只能一个进程拿到队列中的数据 例子: 批量生产输入放入队列, 再批量的获取结果 import os, time import multiprocessing # 向queue中输入数据的函数 def inputQ(queue): info = str(os.getpid()) + "(put):" + str(time.asctime()) queue.put(info) # 向queue中输出数据的函数 def outputQ(queue): info = queue.get() print("%s%s %s" % (str(os.getpid()), "(get):", info)) if __name__ == '__main__': # windows下, 如果开启进程较多的话, 程序会崩溃, 为了防止这个问题, 使用freeze_support()方法来解决 multiprocessing.freeze_support() record1 = [] # store input processes record2 = [] # store output processes queue = multiprocessing.Queue(3) # 输入进程 for i in range(10): process = multiprocessing.Process(target=inputQ, args=(queue,)) time.sleep(0.2) process.start() record1.append(process) # 输出进程 for i in range(10): process = multiprocessing.Process(target=outputQ, args=(queue,)) process.start() record2.append(process) # for p in record1: # p.join() [pp.join() for pp in record1] # for p in record2: # p.join() [pp.join() for pp in record2] # 6248(get): 200(put):Wed Jan 9 23:05:38 2019 # 1568(get): 11692(put):Wed Jan 9 23:05:38 2019 # 9292(get): 6736(put):Wed Jan 9 23:05:38 2019 # 3452(get): 12136(put):Wed Jan 9 23:05:38 2019 # 6676(get): 3400(put):Wed Jan 9 23:05:39 2019 # 372(get): 2904(put):Wed Jan 9 23:05:39 2019 # 1396(get): 6352(put):Wed Jan 9 23:05:39 2019 # 1532(get): 4156(put):Wed Jan 9 23:05:39 2019 # 6868(get): 9528(put):Wed Jan 9 23:05:40 2019 # 10832(get): 8336(put):Wed Jan 9 23:05:41 2019 (3). 生产者消费者模型 在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。 为什么要使用生产者和消费者模式 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。 什么是生产者消费者模式 生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力,并且我可以根据生产速度和消费速度来均衡一下多少个生产者可以为多少个消费者提供足够的服务,就可以开多进程等等,而这些进程都是到阻塞队列或者说是缓冲区中去获取或者添加数据。 from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print('33[45m%s 吃 %s33[0m' %(os.getpid(),res)) def producer(q): for i in range(10): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生产者们:即厨师们 p1=Process(target=producer,args=(q,)) #消费者们:即吃货们 c1=Process(target=consumer,args=(q,)) #开始 p1.start() c1.start() print('主') (3). 生产者消费者模型总结 #程序中有两类角色 一类负责生产数据(生产者) 一类负责处理数据(消费者) #引入生产者消费者模型为了解决的问题是: 平衡生产者与消费者之间的工作能力,从而提高程序整体处理数据的速度 #如何实现: 生产者队列<——>消费者 #生产者消费者模型实现类程序的解耦和 (4). 通过上面基于队列的生产者消费者代码示例,我们发现一个问题:主进程永远不会结束,原因是:生产者p在生产完后就结束了,但是消费者c在取空了q之后,则一直处于死循环中且卡在q.get()这一步。 解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就可以break出死循环 from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() if res is None:break #收到结束信号则结束 time.sleep(random.randint(1,3)) print('33[45m%s 吃 %s33[0m' %(os.getpid(),res)) def producer(q): for i in range(5): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res)) q.put(None) #在自己的子进程的最后加入一个结束信号 if __name__ == '__main__': q=Queue() #生产者们:即厨师们 p1=Process(target=producer,args=(q,)) #消费者们:即吃货们 c1=Process(target=consumer,args=(q,)) #开始 p1.start() c1.start() print('主') 注意:结束信号None,不一定要由生产者发,主进程里同样可以发,但主进程需要等生产者结束后才应该发送该信号 from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() if res is None:break #收到结束信号则结束 time.sleep(random.randint(1,3)) print('33[45m%s 吃 %s33[0m' %(os.getpid(),res)) def producer(q): for i in range(2): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生产者们:即厨师们 p1=Process(target=producer,args=(q,)) #消费者们:即吃货们 c1=Process(target=consumer,args=(q,)) #开始 p1.start() c1.start() p1.join() #等待生产者进程结束 q.put(None) #发送结束信号 print('主') (5). 但上述解决方式,在有多个生产者和多个消费者时,由于队列我们说了是进程安全的,我一个进程拿走了结束信号,另外一个进程就拿不到了,还需要多发送一个结束信号,有几个取数据的进程就要发送几个结束信号,我们则需要用一个很low的方式去解决 from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() if res is None:break #收到结束信号则结束 time.sleep(random.randint(1,3)) print('33[45m%s 吃 %s33[0m' %(os.getpid(),res)) def producer(name,q): for i in range(2): time.sleep(random.randint(1,3)) res='%s%s' %(name,i) q.put(res) print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生产者们:即厨师们 p1=Process(target=producer,args=('包子',q)) p2=Process(target=producer,args=('骨头',q)) p3=Process(target=producer,args=('泔水',q)) #消费者们:即吃货们 c1=Process(target=consumer,args=(q,)) c2=Process(target=consumer,args=(q,)) #开始 p1.start() p2.start() p3.start() c1.start() p1.join() #必须保证生产者全部生产完毕,才应该发送结束信号 p2.join() p3.join() q.put(None) #有几个消费者就应该发送几次结束信号None q.put(None) #发送结束信号 print('主') (6). 其实我们的思路无非是发送结束信号而已,有另外一种队列提供了这种机制 #JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。 #参数介绍: maxsize是队列中允许最大项数,省略则无大小限制。   #方法介绍: JoinableQueue的实例p除了与Queue对象相同的方法之外还具有: q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常 q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止,也就是队列中的数据全部被get拿走了。 from multiprocessing import Process,JoinableQueue import time,random,os def consumer(q): while True: res=q.get() # time.sleep(random.randint(1,3)) time.sleep(random.random()) print('33[45m%s 吃 %s33[0m' %(os.getpid(),res)) q.task_done() #向q.join()发送一次信号,证明一个数据已经被取走并执行完了 def producer(name,q): for i in range(10): # time.sleep(random.randint(1,3)) time.sleep(random.random()) res='%s%s' %(name,i) q.put(res) print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res)) print('%s生产结束'%name) q.join() #生产完毕,使用此方法进行阻塞,直到队列中所有项目均被处理。 print('%s生产结束~~~~~~'%name) if __name__ == '__main__': q=JoinableQueue() #生产者们:即厨师们 p1=Process(target=producer,args=('包子',q)) p2=Process(target=producer,args=('骨头',q)) p3=Process(target=producer,args=('泔水',q)) #消费者们:即吃货们 c1=Process(target=consumer,args=(q,)) c2=Process(target=consumer,args=(q,)) c1.daemon=True #如果不加守护,那么主进程结束不了,但是加了守护之后,必须确保生产者的内容生产完并且被处理完了,所有必须还要在主进程给生产者设置join,才能确保生产者生产的任务被执行完了,并且能够确保守护进程在所有任务执行完成之后才随着主进程的结束而结束。 c2.daemon=True #开始 p_l=[p1,p2,p3,c1,c2] for p in p_l: p.start() p1.join() #我要确保你的生产者进程结束了,生产者进程的结束标志着你生产的所有的人任务都已经被处理完了 p2.join() p3.join() print('主') # 主进程等--->p1,p2,p3等---->c1,c2 # p1,p2,p3结束了,证明c1,c2肯定全都收完了p1,p2,p3发到队列的数据 # 因而c1,c2也没有存在的价值了,不需要继续阻塞在进程中影响主进程了。应该随着主进程的结束而结束,所以设置成守护进程就可以了。

原文地址:https://www.cnblogs.com/guyannanfei/p/10247635.html