Python-并发编程(进程)

操作系统:

操作系统的三种基本类型:多道批处理系统、分时系统、实时系统。

多道-提高CPU利用率

分时-提高用户体验,降低了CPU的利用率

实时-提高用户体验

--------------------------------------------------------------------------------------------

程序员无法把所有的硬件操作细节都了解到,管理这些硬件并且加以优化使用是非常繁琐的工作,这个繁琐的工作就是操作系统来干的,有了他,程序员就从这些繁琐的工作中解脱了出来,只需要考虑自己的应用软件的编写就可以了,应用软件直接使用操作系统提供的功能来间接使用硬件。

  精简的说的话,操作系统就是一个协调、管理和控制计算机硬件资源和软件资源的控制程序。操作系统所处的位置如图

  细说的话,操作系统应该分成两部分功能:

#一:隐藏了丑陋的硬件调用接口,为应用程序员提供调用硬件资源的更好,更简单,更清晰的模型(系统调用接口)。应用程序员有了这些接口后,就不用再考虑操作硬件的细节,专心开发自己的应用程序即可。
例如:操作系统提供了文件这个抽象概念,对文件的操作就是对磁盘的操作,有了文件我们无需再去考虑关于磁盘的读写控制(比如控制磁盘转动,移动磁头读写数据等细节),

#二:将应用程序对硬件资源的竞态请求变得有序化
例如:很多应用软件其实是共享一套计算机硬件,比方说有可能有三个应用程序同时需要申请打印机来输出内容,那么a程序竞争到了打印机资源就打印,然后可能是b竞争到打印机资源,也可能是c,这就导致了无序,打印机可能打印一段a的内容然后又去打印c...,操作系统的一个功能就是将这种无序变得有序。

-------------------------------------------------------------------------------------------

程序:

进程:是计算机中资源分配的最小单位,即运行的程序。

堆栈:就像列表一样,存储数据的东西

顾名思义,进程即正在执行的一个过程。进程是对正在运行程序的一个抽象。

进程的概念起源于操作系统,是操作系统最核心的概念,也是操作系统提供的最古老也是最重要的抽象概念之一。操作系统的其他所有内容都是围绕进程的概念展开的。

所以想要真正了解进程,必须事先了解操作系统

PS:即使可以利用的cpu只有一个(早期的计算机确实如此),也能保证支持(伪)并发的能力。将一个单独的cpu变成多个虚拟的cpu(多道技术:时间多路复用和空间多路复用+硬件上支持隔离),没有进程的抽象,现代计算机将不复存在。

#一 操作系统的作用:
    1:隐藏丑陋复杂的硬件接口,提供良好的抽象接口
    2:管理、调度进程,并且将多个进程对硬件的竞争变得有序

#二 多道技术:
    1.产生背景:针对单核,实现并发
    ps:
    现在的主机一般是多核,那么每个核都会利用多道技术
    有4个cpu,运行于cpu1的某个程序遇到io阻塞,会等到io结束再重新调度,会被调度到4个
    cpu中的任意一个,具体由操作系统调度算法决定。
    
    2.空间上的复用:如内存中同时有多道程序
    3.时间上的复用:复用一个cpu的时间片
       强调:遇到io切,占用cpu时间过长也切,核心在于切之前将进程的状态保存下来,这样
            才能保证下次切换回来时,能基于上次切走的位置继续运行

进程:

进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。在早期面向进程设计的计算机结构中,进程是程序的基本执行实体;在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体。

狭义定义:进程是正在运行的程序的实例(an instance of a computer program that is being executed)。
广义定义:进程是一个具有一定独立功能的程序关于某个数据集合的一次运行活动。它是操作系统动态执行的基本单元,在传统的操作系统中,进程既是基本的分配单元,也是基本的执行单元。
注意:同一个程序执行两次,就会在操作系统中出现两个进程,所以我们可以同时运行一个软件,分别做不同的事情也不会混乱。
进程调度:
先来先服务调度:
先来先服务(FCFS)调度算法是一种最简单的调度算法,该算法既可用于作业调度,也可用于进程调度。FCFS算法比较有利于长作业(进程),而不利于短作业(进程)。由此可知,本算法适合于CPU繁忙型作业,而不利于I/O繁忙型的作业(进程)。

短作业有点调度:

短作业(进程)优先调度算法(SJ/PF)是指对短作业或短进程优先调度的算法,该算法既可用于作业调度,也可用于进程调度。但其对长作业不利;不能保证紧迫性作业(进程)被及时处理;作业的长短只是被估算出来的。

时间片轮转:

时间片轮转(Round Robin,RR)法的基本思路是让每个进程在就绪队列中的等待时间与享受服务的时间成比例。在时间片轮转法中,需要将CPU的处理时间分成固定大小的时间片,例如,几十毫秒至几百毫秒。如果一个进程在被调度选中之后用完了系统规定的时间片,但又未完成要求的任务,则它自行释放自己所占有的CPU而排到就绪队列的末尾,等待下一次调度。同时,进程调度程序又去调度当前就绪队列中的第一个进程。
      显然,轮转法只能用来调度分配一些可以抢占的资源。这些可以抢占的资源可以随时被剥夺,而且可以将它们再分配给别的进程。CPU是可抢占资源的一种。但打印机等资源是不可抢占的。由于作业调度是对除了CPU之外的所有系统硬件资源的分配,其中包含有不可抢占资源,所以作业调度不使用轮转法。
在轮转法中,时间片长度的选取非常重要。首先,时间片长度的选择会直接影响到系统的开销和响应时间。如果时间片长度过短,则调度程序抢占处理机的次数增多。这将使进程上下文切换次数也大大增加,从而加重系统开销。反过来,如果时间片长度选择过长,例如,一个时间片能保证就绪队列中所需执行时间最长的进程能执行完毕,则轮转法变成了先来先服务法。时间片长度的选择是根据系统对响应时间的要求和就绪队列中所允许最大的进程数来确定的。
      在轮转法中,加入到就绪队列的进程有3种情况:
      一种是分给它的时间片用完,但进程还未完成,回到就绪队列的末尾等待下次调度去继续执行。
      另一种情况是分给该进程的时间片并未用完,只是因为请求I/O或由于进程的互斥与同步关系而被阻塞。当阻塞解除之后再回到就绪队列。
      第三种情况就是新创建进程进入就绪队列。
      如果对这些进程区别对待,给予不同的优先级和时间片从直观上看,可以进一步改善系统服务质量和效率。例如,我们可把就绪队列按照进程到达就绪队列的类型和进程被阻塞时的阻塞原因分成不同的就绪队列,每个队列按FCFS原则排列,各队列之间的进程享有不同的优先级,但同一队列内优先级相同。这样,当一个进程在执行完它的时间片之后,或从睡眠中被唤醒以及被创建之后,将进入不同的就绪队列。  

时间片轮转法

多级反馈调度:

前面介绍的各种用作进程调度的算法都有一定的局限性。如短进程优先的调度算法,仅照顾了短进程而忽略了长进程,而且如果并未指明进程的长度,则短进程优先和基于进程长度的抢占式调度算法都将无法使用。
而多级反馈队列调度算法则不必事先知道各种进程所需的执行时间,而且还可以满足各种类型进程的需要,因而它是目前被公认的一种较好的进程调度算法。在采用多级反馈队列调度算法的系统中,调度算法的实施过程如下所述。
(1) 应设置多个就绪队列,并为各个队列赋予不同的优先级。第一个队列的优先级最高,第二个队列次之,其余各队列的优先权逐个降低。该算法赋予各个队列中进程执行时间片的大小也各不相同,在优先权愈高的队列中,为每个进程所规定的执行时间片就愈小。例如,第二个队列的时间片要比第一个队列的时间片长一倍,……,第i+1个队列的时间片要比第i个队列的时间片长一倍。
(2) 当一个新进程进入内存后,首先将它放入第一队列的末尾,按FCFS原则排队等待调度。当轮到该进程执行时,如它能在该时间片内完成,便可准备撤离系统;如果它在一个时间片结束时尚未完成,调度程序便将该进程转入第二队列的末尾,再同样地按FCFS原则等待调度执行;如果它在第二队列中运行一个时间片后仍未完成,再依次将它放入第三队列,……,如此下去,当一个长作业(进程)从第一队列依次降到第n队列后,在第n 队列便采取按时间片轮转的方式运行。

(3) 仅当第一队列空闲时,调度程序才调度第二队列中的进程运行;仅当第1~(i-1)队列均空时,才会调度第i队列中的进程运行。如果处理机正在第i队列中为某进程服务时,又有新进程进入优先权较高的队列(第1~(i-1)中的任何一个队列),则此时新进程将抢占正在运行进程的处理机,即由调度程序把正在运行的进程放回到第i队列的末尾,把处理机分配给新到的高优先权进程。

多级反馈队列

进程的并行与并发

并行 : 并行是指两者同时执行,比如赛跑,两个人都在不停的往前跑;(资源够用,比如三个线程,四核的CPU )

并发 : 并发是指资源有限的情况下,两者交替轮流使用资源,比如一段路(单核CPU资源)同时只能过一个人,A走一段后,让给B,B用完继续给A ,交替使用,目的是提高效率。

区别:

并行是从微观上,也就是在一个精确的时间片刻,有不同的程序在执行,这就要求必须有多个处理器。
并发是从宏观上,在一个时间段上可以看出是同时执行的,比如一个服务器同时处理多个session。

同步异步阻塞非阻塞

状态介绍(运行I/O会影响程序的运行效率)

  在了解其他概念之前,我们首先要了解进程的几个状态。在程序运行的过程中,由于被操作系统的调度算法控制,程序会进入几个状态:就绪,运行和阻塞。

  (1)就绪(Ready)状态

  当进程已分配到除CPU以外的所有必要的资源,只要获得处理机便可立即执行,这时的进程状态称为就绪状态。

  (2)执行/运行(Running)状态当进程已获得处理机,其程序正在处理机上执行,此时的进程状态称为执行状态。

  (3)阻塞(Blocked)状态正在执行的进程,由于等待某个事件发生而无法执行时,便放弃处理机而处于阻塞状态。引起进程阻塞的事件可有多种,例如,等待I/O完成、申请缓冲区不能满足、等待信件(信号)等。

      

同步和异步:(同步就是一步一步的做,异步是同时做不同的事情

  所谓同步就是一个任务的完成需要依赖另外一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成,这是一种可靠的任务序列。要么成功都成功,失败都失败,两个任务的状态可以保持一致。

  所谓异步是不需要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工作,依赖的任务也立即执行,只要自己完成了整个任务就算完成了。至于被依赖的任务最终是否真正完成,依赖它的任务无法确定,所以它是不可靠的任务序列

例子:

比如我去银行办理业务,可能会有两种方式:
第一种 :选择排队等候;
第二种 :选择取一个小纸条上面有我的号码,等到排到我这一号时由柜台的人通知我轮到我去办理业务了;

第一种:前者(排队等候)就是同步等待消息通知,也就是我要一直在等待银行办理业务情况;

第二种:后者(等待别人通知)就是异步等待消息通知。在异步消息处理中,等待消息通知者(在这个例子中就是等待办理业务的人)往往注册一个回调机制,在所等待的事件被触发时由触发机制(在这里是柜台的人)通过某种机制(在这里是写在小纸条上的号码,喊号)找到等待该事件的人。

例子

阻塞与非阻塞

阻塞和非阻塞这两个概念与程序(线程)等待消息通知(无所谓同步或者异步)时的状态有关。也就是说阻塞与非阻塞主要是程序(线程)等待消息通知时的状态角度来说的

继续上面的那个例子,不论是排队还是使用号码等待通知,如果在这个等待的过程中,等待者除了等待消息通知之外不能做其它的事情,那么该机制就是阻塞的,表现在程序中,也就是该程序一直阻塞在该函数调用处不能继续往下执行。
相反,有的人喜欢在银行办理这些业务的时候一边打打电话发发短信一边等待,这样的状态就是非阻塞的,因为他(等待者)没有阻塞在这个消息通知上,而是一边做自己的事情一边等待。

注意:同步非阻塞形式实际上是效率低下的,想象一下你一边打着电话一边还需要抬头看到底队伍排到你了没有。如果把打电话和观察排队的位置看成是程序的两个操作的话,这个程序需要在这两种不同的行为之间来回的切换,效率可想而知是低下的;而异步非阻塞形式却没有这样的问题,因为打电话是你(等待者)的事情,而通知你则是柜台(消息触发机制)的事情,程序没有在两种不同的操作中来回切换。

同步/异步与阻塞/非阻塞

同步阻塞:就是洗不了菜,做不了饭

异步阻塞:就是洗衣机坏了,洗不了衣服,但是可以继续做饭

同步非阻塞:就是洗衣机坏了,但是我去做饭。

异步非阻塞:整体过程,都不会停止,同时做几件事情,事情之间没有关联

  1. 同步阻塞形式

  效率最低。拿上面的例子来说,就是你专心排队,什么别的事都不做。

  1. 异步阻塞形式

  如果在银行等待办理业务的人采用的是异步的方式去等待消息被触发(通知),也就是领了一张小纸条,假如在这段时间里他不能离开银行做其它的事情,那么很显然,这个人被阻塞在了这个等待的操作上面;

  异步操作是可以被阻塞住的,只不过它不是在处理消息时阻塞,而是在等待消息通知时被阻塞。

  1. 同步非阻塞形式

  实际上是效率低下的。

  想象一下你一边打着电话一边还需要抬头看到底队伍排到你了没有,如果把打电话和观察排队的位置看成是程序的两个操作的话,这个程序需要在这两种不同的行为之间来回的切换,效率可想而知是低下的。

  1. 异步非阻塞形式

  效率更高,

  因为打电话是你(等待者)的事情,而通知你则是柜台(消息触发机制)的事情,程序没有在两种不同的操作中来回切换

  比如说,这个人突然发觉自己烟瘾犯了,需要出去抽根烟,于是他告诉大堂经理说,排到我这个号码的时候麻烦到外面通知我一下,那么他就没有被阻塞在这个等待的操作上面,自然这个就是异步+非阻塞的方式了。

  

很多人会把同步和阻塞混淆,是因为很多时候同步操作会以阻塞的形式表现出来,同样的,很多人也会把异步和非阻塞混淆,因为异步操作一般都不会在真正的IO操作处被阻塞

 进程的创建与结束

进程的创建

  但凡是硬件,都需要有操作系统去管理,只要有操作系统,就有进程的概念,就需要有创建进程的方式,一些操作系统只为一个应用程序设计,比如微波炉中的控制器,一旦启动微波炉,所有的进程都已经存在。

  而对于通用系统(跑很多应用程序),需要有系统运行过程中创建或撤销进程的能力,主要分为4中形式创建新的进程:

  1. 系统初始化(查看进程linux中用ps命令,windows中用任务管理器,前台进程负责与用户交互,后台运行的进程与用户无关,运行在后台并且只在需要时才唤醒的进程,称为守护进程,如电子邮件、web页面、新闻、打印)

  2. 一个进程在运行过程中开启了子进程(如nginx开启多进程,os.fork,subprocess.Popen等)

  3. 用户的交互式请求,而创建一个新进程(如用户双击暴风影音)

  4. 一个批处理作业的初始化(只在大型机的批处理系统中应用)

  无论哪一种,新进程的创建都是由一个已经存在的进程执行了一个用于创建进程的系统调用而创建的。 

进程的结束

  1. 正常退出(自愿,如用户点击交互式页面的叉号,或程序执行完毕调用发起系统调用正常退出,在linux中用exit,在windows中用ExitProcess)

  2. 出错退出(自愿,python a.py中a.py不存在)

  3. 严重错误(非自愿,执行非法指令,如引用不存在的内存,1/0等,可以捕捉异常,try...except...)

  4. 被其他进程杀死(非自愿,如kill -9)

在python程序中的进程操作

     之前我们已经了解了很多进程相关的理论知识,了解进程是什么应该不再困难了,刚刚我们已经了解了,运行中的程序就是一个进程。所有的进程都是通过它的父进程来创建的。因此,运行起来的python程序也是一个进程,那么我们也可以在程序中再创建进程。多个进程可以实现并发效果,也就是说,当我们的程序中存在多个进程的时候,在某些时候,就会让程序的执行速度变快。以我们之前所学的知识,并不能实现创建进程这个功能,所以我们就需要借助python中强大的模块。

multiprocess模块

 仔细说来,multiprocess不是一个模块而是python中一个操作、管理进程的包。 之所以叫multi是取自multiple的多功能的意思,在这个包中几乎包含了和进程有关的所有子模块。由于提供的子模块非常多,为了方便大家归类记忆,我将这部分大致分为四个部分:创建进程部分,进程同步部分,进程池部分,进程之间数据共享。

multiprocess.process模块

import os
print(os.getpid())#进程的pid编号
print(os.getppid())  #是上面的父进程

process模块介绍

process模块是一个创建进程的模块,借助这个模块,就可以完成进程的创建。

Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)

强调:
1. 需要使用关键字的方式来指定参数
2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号

参数介绍:
1 group参数未使用,值始终为None
2 target表示调用对象,即子进程要执行的任务
3 args表示调用对象的位置参数元组,args=(1,2,'egon',)
4 kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}
5 name为子进程的名称

方法介绍:

p.start():启动进程,并调用该子进程中的p.run() 
2 p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法  
3 p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
4 p.is_alive():如果p仍然运行,返回True
5 p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程 

属性介绍:

 p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置
2 p.name:进程的名称
3 p.pid:进程的pid
4 p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)
5 p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可

在Windows使用process注意事项

在Windows操作系统中由于没有fork(linux操作系统中创建进程的机制),在创建子进程的时候会自动 import 启动它的这个文件,而在 import 的时候又执行了整个文件。因此如果将process()直接写在文件中就会无限递归创建子进程报错。所以必须把创建子进程的部分使用if __name__ ==‘__main__’ 判断保护起来,import 的时候  ,就不会递归运行了。

--------------------------------------------------------------------------------------创建进程--------------------------------------------------------------------------

#multiprocess模块

#创建进程部分---Process:开启进程的两种方式,start、join、守护进程

#进程同步部分--锁,信号量,事件

#进程之间数据共享--队列,管道,Manager

#进程池部分

 

import os
from multiprocessing import Process
def func():
    print('我的pid:%s,父进程的pid:%s'%(os.getpid(),os.getppid()))

if __name__ == '__main__':  #这个必须有,是windows的特点,因为进程之间内存是相互隔离的
        # 子进程中没有func这个函数,要去父进程中很早func,该怎样找,在子进程中import 父进程所在的文件
        #为了避免无线开启进程所以使用__name__ == '__main__' 来进行一次判断,这样就导致,只能开一个子进程
        #没办法在子进程中,在开一个进程
    print(os.getpid(), os.getppid())
    p = Process(target=func) #创建了一个进程对象
    p.start() #开启了一个字进程

 进程与进程之间关系:父进程必须要等待子进程结束之后回收子进程的资源

                               进程与进程之间数据隔离,即父进程和子进程的内存是隔离开的

          异步的特点---并发,子进程和父进程在某段时间段一起执行

例子说明:

import os
import time
from multiprocessing import Process
n = 1
def func():
    global n
    n = 100
    print('我的pid:%s,父进程的pid:%s'%(os.getpid(),os.getppid()))

if __name__ == '__main__':  #这个必须有,是windows的特点,因为进程之间内存是相互隔离的
        # 子进程中没有func这个函数,要去父进程中很早func,该怎样找,在子进程中import 父进程所在的文件
        #为了避免无线开启进程所以使用__name__ == '__main__' 来进行一次判断,这样就导致,只能开一个子进程
        #没办法在子进程中,在开一个进程
    print(os.getpid(), os.getppid())
    p = Process(target=func) #创建了一个进程对象
    p.start() #开启了一个字进程
    time.sleep(2)
    print('n :' ,n )  #打印出来的是1

可以往子进程中传参数

import os
import time
from multiprocessing import Process
n = 1
def func(num):
    global n
    n = 100
    print(num)
    print('我的pid:%s,父进程的pid:%s'%(os.getpid(),os.getppid()))

if __name__ == '__main__':  #这个必须有,是windows的特点,因为进程之间内存是相互隔离的
        # 子进程中没有func这个函数,要去父进程中很早func,该怎样找,在子进程中import 父进程所在的文件
        #为了避免无线开启进程所以使用__name__ == '__main__' 来进行一次判断,这样就导致,只能开一个子进程
        #没办法在子进程中,在开一个进程
    print(os.getpid(), os.getppid())
    # p = Process(target=func) #创建了一个进程对象
    p =Process(target=func,args=(666,))#args里面必须传可迭代的对象,一般习惯传元祖
    p.start() #开启了一个字进程
    time.sleep(2)
    print('n :' ,n )

返回值:学习的此阶段,不能直接获取到子进程执行的函数的返回值。

启动子进程是需要时间的,如下:


import os
import time
from multiprocessing import Process
import random

def
wahahha(num): print(num) time.sleep(random.random()) print(num*'-') if __name__ == '__main__': Process(target=wahahha, args=(1,)).start() print('结束') #启动进程也是需要时间的,所在先打印出’结束‘,再打印出子进程执行的代码

该如何解决:


import os
import time
from multiprocessing import Process
import random

def
wahahha(num): print(num) time.sleep(random.random()) print(num*'-') if __name__ == '__main__': p = Process(target=wahahha, args=(1,)) p.start() p.join() #使用join,上面的代码必须分开写, p = Process(target=wahahha, args=(1,)) #一直阻塞直到这个子进程执行完毕,这样就先打印‘num’,,,最后打印‘结束’ print('结束')

进一步思考


import os
import time
from multiprocessing import Process
import random

def
wahahha(num): print(num) time.sleep(random.random()) print(num*'-') if __name__ == '__main__': # Process(target=wahahha, args=(1,)).start() # print('结束') #启动进程也是需要时间的,所在先打印出’结束‘,再打印出子进程执行的代码 for i in range(10): #开启9个子进程 p = Process(target=wahahha,args=(i,)) p.start() # p.join() 在这里加阻塞,就导致无法实现并发进程,变成了同步,顺序执行了。 # p.join() 在这里阻塞的是第9次,这样因为是并发的,所以第9次又可能很快就结束了 #所以打印的结果还是很乱 print('结束')

解决办法:多个子进程并发执行,在执行完毕之后,再做其他事,及打印‘结束’。


import os
import time
from multiprocessing import Process
import random

def
wahahha(num): print(num) time.sleep(random.random()) print(num*'-') if __name__ == '__main__': p_lst = [] for i in range(10): p = Process(target=wahahha,args=(i,)) p.start() p_lst.append(p) for p in p_lst: p.join() print('结束')

第二种实现子进程的方法:面向对象开启进程:

import time
from multiprocessing import Process
class MyProcess(Process):
    def run(self):
        time.sleep(1)
        print('wahaha')
if __name__=='__main__':
    for i  in range(10):
        MyProcess().start()
#使用面对对象开启进程的方式
#必须的写一个类,必须继承Process,必须实现一个叫run的方法

 解决传参问题,使用__init__

import time
from multiprocessing import Process
class MyProcess(Process):
    def __init__(self,name):
        super().__init__()#继承父类的init方法
        self.name=name
    def run(self):
        time.sleep(1)
        print('%s wahaha '%self.name)
if __name__=='__main__':
    for i  in range(10):
        MyProcess('aaaa').start()

守护进程

 #守护进程也是一个子进程

会随着主进程的结束而结束。

主进程创建守护进程

  其一:守护进程会在主进程代码执行结束后就终止

  其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children

注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止

import time
from multiprocessing import Process
def func():
    while True:
        print('is alive')
        time.sleep(0.5)
if __name__ == '__main__':
    Process(target=func).start()
    time.sleep(3)
    print('主进程')
#子进程0.5秒汇报一次,这个代码思考可以实现监控,问题:子进程一直会执行下去,但实际是父进程已经结束

解决办法:守护进程

import time
from multiprocessing import Process
def func():
    while True:
        print('is alive')
        time.sleep(0.5)
if __name__ == '__main__':
    p = Process(target=func)
    p.daemon = True #必须放在start之前,默认是False,设置为True之后,这个p进程就成了守护进程
    p.start()
    time.sleep(3)
    print('主进程')
#主进程的<代码>执行结束之后守护进程自动结束
import time
from multiprocessing import Process
def func():
    while True:
        print('is alive')
        time.sleep(0.5)
def wahaha():
    i = 0
    while i<5:
        print('第%s秒'%i)
        time.sleep(1)
        i +=1
if __name__ == '__main__':
    p2 = Process(target=wahaha)
    p2.start()
    p = Process(target=func)
    p.daemon = True #必须放在start之前,默认是False,设置为True之后,这个p进程就成了守护进程
    p.start()
    time.sleep(3)
    print('主进程')
    p2.join()#进行阻塞,使得子进程结束之后,再结束主进程;
            #这个守护进程会等待p2执行结束之后,才结束守护

Process的其他方法:销毁一个进程

import time
from multiprocessing import Process
def wahaha():
    i = 0
    while i <5:
        print('第%s秒'%i)
        time.sleep(1)
        i +=1
if __name__ == '__main__':
    p = Process(target=wahaha)
    p.start()
    time.sleep(3)
    print('主进程')
    print(p.is_alive())#p.is_alive()判断子进程是否还存在
    p.terminate()#结束一个子进程,这个距离操作系统去执行关闭一个进程,会有一个时间
                #所以下面的p.is_alive()还是True
    print(p.is_alive())

进程同步(multiprocess.Lock)

锁 —— multiprocess.Lock

有10个进程,同一段代码,多个进程并发执行控制某段代码在同一时刻只能有一个进程执行,这就用到了锁

 

  怎么用呢?

#
#为了避免同一段代码被多个进程同时执行
from multiprocessing import Lock
lock = Lock() #创建一个锁  只有一把锁,只能一个人用
lock.acquire() #拿到钥匙,想要开锁
print('拿到钥匙')
lock.release()#还钥匙

 例子说明:

import json
import time
from multiprocessing import Process
from multiprocessing import Lock
def search(i):
    f= open('db')
    ticket_dic=json.load(f)
    f.close()
    print('%s正在查票,剩余票数:%s'%(i,ticket_dic['count']))
def buy(i):
    with open('db') as f:
        ticket_dic=json.load(f)
        time.sleep(0.2)
    if ticket_dic['count']>0:
        ticket_dic['count']-=1
        print('%s买到票了'%i)
        time.sleep(0.2)
        with  open('db','w') as f:
            json.dump(ticket_dic,f)
def get_ticket(i,lock):
    search(i)
    lock.acquire()#拿钥匙,开启buy函数的锁,可以理解为锁随钥匙走
    buy(i)
    lock.release()
if __name__ == '__main__':
    lock =Lock()#创建锁,这个锁在这里创建
    for i in range(10):
        p = Process(target=get_ticket,args = (i,lock))
        p.start()
#维护数据安全,但降低了程序的效率
#所有的效率都是建立在数据安全的基础上的
#但凡涉及到并发编程都需要考虑数据的安全性
# 我们需要再并发部分对数据修改的操作格外小心
#如果会涉及到数据的不安全,就需要进行加锁控制

信号量:

#把代码放在屋子里,放一串钥匙,可以自己定义钥匙的数量,谁先到,谁先拿到钥匙,执行,这个原理就是信号量

 

验证

from multiprocessing import Semaphore
sem = Semaphore(3)#放几把钥匙
sem.acquire()  #取钥匙
print(1)
sem.acquire()
print(2)
sem.acquire()
print(3)
sem.acquire()
print(4) #无法打印出,因为就3把钥匙
sem.release()#需要释放资源,这样就能获得
print(5)#能够打印

例子:只有4个包间的ktv,10个进程,及人来轮流使用

#信号量是由锁+计数器来实现的
import time
import random
from multiprocessing import Process
from multiprocessing import Semaphore
def ktv(sem,i):
    sem.acquire()#拿钥匙
    print('%s走进ktv'%i)
    time.sleep(random.randint(1,3))
    print('%s走出ktv'%i)
    sem.release()
if __name__ == '__main__':
    sem = Semaphore(4) #在主进程创建信号量,4把钥匙
    for i in range(10):
        p = Process(target=ktv,args=(sem,i))
        p.start()

上下文管理,帮助减少代码

import time
import random
from multiprocessing import Process
from multiprocessing import Semaphore
# def ktv(sem,i):
#     sem.acquire()#拿钥匙
#     print('%s走进ktv'%i)
#     time.sleep(random.randint(1,3))
#     print('%s走出ktv'%i)
#     sem.release()
# 使用上下文管理来节省代码,自动获取钥匙,自动归还
def ktv(sem,i):
    with sem:#拿钥匙
        print('%s走进ktv'%i)
        time.sleep(random.randint(1,3))
        print('%s走出ktv'%i)

if __name__ == '__main__':
    sem = Semaphore(4) #在主进程创建信号量,4把钥匙
    for i in range(10):
        p = Process(target=ktv,args=(sem,i))
        p.start()

事件

#事件:控制子进程执行还是阻塞的一个机制
#通过wait方法
    #在事件中有一个标志,如果这个标志是True,这个wait方法执行效果就是pass,直接执行
    #如果这个标志是False,wait方法的效果就是阻塞,直到这个标志标称True
#控制标志
    #判断标志的状态 is_set
    #set方法,将标志设置为True
    #clear方法,将标志设置为False
from multiprocessing import Event
e = Event()
print(e.is_set()) #False
e.wait()#进行阻塞,事件在创建之初标志的状态是false
from multiprocessing import Event
e = Event() #将标志改为True
e.set()
print(e.is_set())
e.wait()#当标志为True时,不阻塞

例子:

import time
from multiprocessing import Event,Process
def func1(e):
    print('start func1')
    e.wait() #e.wait(1),里面可以写参数,代表如果是False,我阻塞1s之后,直接执行下面代码,但是这个标志,还是False,没有改变
    print('end func1')
if __name__=='__main__':
    e = Event()
    p = Process(target=func1,args=(e,))
    p.start()
    time.sleep(3)
    e.set() #写在这里,子进程也能拿到

 红绿灯例子

#红绿灯
import time
import random
from multiprocessing import Process,Event
print('33[1;31m红灯亮33[0m')
def traffic_light(e):
    while True:
        time.sleep(2)
        if e.is_set():
            print('33[1;31m红灯亮33[0m')
            e.clear()
        else:
            print('33[1;32m绿地亮33[0m')
            e.set()
def car(i,e):
    if not e.is_set():
        print('%s正在等待通过'%i)
        e.wait()
    print('%s通过' % i)
if __name__=='__main__':
    e = Event()
    light = Process(target=traffic_light,args=(e,))
    light.daemon =True #放在被设置为守护进程的start之前
    light.start()
    car_lst = []
    for i in range(10):
        p = Process(target=car,args=(i,e))
        p.start()
        time.sleep(random.randint(0,3))
        car_lst.append(p)
    for car in car_lst:
        car.join()

进程间通信——队列(multiprocess.Queue)

创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递 

IPC:进程之间的通信,就叫做IPC(Inter-Process Communication),实现IPC通信有两个方法,一个是队列,一个是管道。

Queue([maxsize]) 
创建共享的进程队列。
参数 :maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。
底层队列使用管道和锁定实现。
Queue([maxsize]) 
创建共享的进程队列。maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。底层队列使用管道和锁定实现。另外,还需要运行支持线程以便队列中的数据传输到底层管道中。 
Queue的实例q具有以下方法:

q.get( [ block [ ,timeout ] ] ) 
返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。

q.get_nowait( ) 
同q.get(False)方法。

q.put(item [, block [,timeout ] ] ) 
将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。

q.qsize() 
返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。


q.empty() 
如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。

q.full() 
如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。。

方法介绍
q.close() 
关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,如果某个使用者正被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。

q.cancel_join_thread() 
不会再进程退出时自动连接后台线程。这可以防止join_thread()方法阻塞。

q.join_thread() 
连接队列的后台线程。此方法用于在调用q.close()方法后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread()方法可以禁止这种行为。

其他方法(了解)

例子

from multiprocessing import Queue
q = Queue()
q.put(1)
q.put(2)
print(q.get())  #先进先出
print(q.get())

例子说明

from multiprocessing import Queue
q = Queue(5)#添加数字代表只能有5个值,如果不添加代表的是无线
            #如果队列已经满了,还是添加数据,就会卡住不动,这是针对单进程来说的
            #多进程的话,可以等一个进程把数值拿走,之后才能放,继续进行下去
print(q.empty())#判断这个队列是否为空
print(q.full())#判断这个队列是否满了
q = Queue(5)
q.put(1)
q.put('aa')
q.put([1,2,3])
q.put({'k':'v'}) 
q.put({'k','v'})
print(q.empty())
print(q.full())
q.put((1,2,3)) #放不进去

 get的例子

from multiprocessing import Queue
#这些在多进程里 empty,get,获取的值都不准,因为进程之间异步导致
q = Queue(3)
# q.get()#因为没有数据,会一直等,卡在这里,程序进行不下去了
# q.get_nowait() #这个没有数据,不会等,但因为没有数据可取,会报错
try:
    q.get_nowait()
except:
    print('队列中没有值')

put例子

from multiprocessing import Queue
q = Queue(3)
q.put(1)
q.put('aa')
q.put([1,2,3])
# q.put((1,2,3))#这个放不进去,程序会卡在这里
# q.put_nowait((1,2,3)) #如果已经满了,会报错,可以用异常处理try来处理,但是这个要放进去的数据就会丢了
try:
    q.put_nowait((1,2,3))
except:
    print('丢失了一个数据')

方法总结:

#多进程的情况下,不准:
#empty
#full
#会引起程序的阻塞:
#get
#put
#不会引起程序的阻塞,但是程序会报错
#get_nowait
#put_nowait #容易丢失数据

例子,父子进程之间通信

from multiprocessing import Process,Queue
def func(num,q):
    q.put(num**num)  #把结果放在队列里,队列能够在进程之间通信,实现return的方法
if __name__=='__main__':
    q = Queue()
    p = Process(target=func, args=(10,q))
    p.start()
    print(q.get())

多进程之间如果获取

from multiprocessing import Process,Queue
def func(num,q):
    q.put({num:num**num})  #把结果放在队列里,队列能够在进程之间通信,实现return的方法
if __name__=='__main__':
    q = Queue()
    for i in range(10):
        p = Process(target=func,args=(i,q))
        p.start()
    for i in  range(10):
        print(q.get())

基于多进程实现的TCP并发

server端:

#基于多进程实现的TCP并发
import socket
from multiprocessing import Process
def con():
    while 1:
        # mag = input('>>>')#会报错,子进程里面不允许有input
        conn.send(b'hello')
        print(conn.recv(1024))
    conn.close()
if __name__ == '__main__':
    sk = socket.socket()
    sk.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
    #这句代码的作用,1,可以避免出现程序中断,再重新开启时,报出端口已占用的问题,实际代码未到
    #sk.close(),端口不会释放,所以端口并未被占用。
    sk.bind(('127.0.0.1',8090))
    sk.listen(5)
    try:
        while True:
            conn,addr =sk.accept()
            Process(target=con,args=(conn,)).start()
    finally:#及时代码报错,也会执行下面的代码
        sk.close()

客户端:

import socket
sk = socket.socket()
sk.connect(('127.0.0.1',8090))
while 1:
    print(sk.recv(1024))
    msg = input('>>>').encode('utf-8')
    sk.send(msg)
sk.close()

生产者消费者模型

简单的例子

import time
import random
from multiprocessing import Queue,Process
def consumer(q):
    while True:
        obj = q.get()
        print('消费了一个数据%s'%obj)
        time.sleep(random.randint(1,3))
if __name__=='__main__':
    q = Queue()
    Process(target=consumer,args=(q,)).start()
    for i in range(10):
        time.sleep(random.randint(1,5))
        q.put('food%s'%i)
        print('生产了一个数据food%s'%i)

#上面的代码,结束不了,且生产者和消费者速度不一致

import time
import random
from multiprocessing import Queue,Process
def consumer(name,q):
    while True:
        obj = q.get()
        if obj is None:break#帮助结束consumer代码print('%s吃了一个%s'%(name,obj))
        time.sleep(random.randint(1,3))
def producer(name,food,q):
    for i in range(10):
        time.sleep(random.randint(1,5))
        q.put('%s生产的%s%s'%(name,food,i))
        print('%s生产了一个数据%s%s'%(name,food,i))

if __name__=='__main__':
    q = Queue()
    Process(target=consumer,args=('haha',q)).start()
    p1 = Process(target=producer,args=('lala','包子',q))
    p1.start()
    p2 = Process(target=producer,args=('hehe','饺子',q))
    p2.start()
    p1.join()
    p2.join()#进行阻塞,使得所有的数据都生产完,放在队列中-->#什么时候所有的数据就都生产完了
    q.put(None)#使得consumer能够判断,是否吃完了-->#如果所有生产出来的数据都被消费了,consumer就可以结束了

如果有两个或者多个消费者,那么上面的代码就不行了,那么怎么解决呢?有几个consumer就加几个q.put(None)

import time
import random
from multiprocessing import Queue,Process
def consumer(name,q):
    while True:
        obj = q.get()
        if obj is None:break#帮助结束consumer代码print('%s吃了一个%s'%(name,obj))
        time.sleep(random.randint(1,3))
def producer(name,food,q):
    for i in range(10):
        time.sleep(random.randint(1,5))
        q.put('%s生产的%s%s'%(name,food,i))
        print('%s生产了一个数据%s%s'%(name,food,i))

if __name__=='__main__':
    q = Queue()
    Process(target=consumer,args=('haha',q)).start()
    Process(target=consumer, args=('jiji', q)).start()
    p1 = Process(target=producer,args=('lala','包子',q))
    p1.start()
    p2 = Process(target=producer,args=('hehe','饺子',q))
    p2.start()
    p1.join()
    p2.join()#进行阻塞,使得所有的数据都生产完,放在队列中-->#什么时候所有的数据就都生产完了
    q.put(None)#使得consumer能够判断,是否吃完了-->#如果所有生产出来的数据都被消费了,consumer就可以结束了
    q.put(None)#有几个consumer就添加几个q.put(None)

JoinableQueue队列的用法

 主要是多了一个q.join()

# 这个代码最终会阻塞在q.join
import time
from multiprocessing import JoinableQueue,Process
def consumer(q):
    while True:
        print(q.get())
        time.sleep(0.3)
        q.task_done()#通知队列一个数据已经被处理完了
if __name__=='__main__':
    q = JoinableQueue()
    Process(target=consumer,args=(q,)).start()
    for i in range(10):
        q.put(i)
    q.join()#看的不是get的操作,是task_done
            #join标志所有的数据都被取走且被处理完才结束阻塞
    print('所有的数据都被消费完了')

 但是上面的代码还是有问题,因为消费者while True,无线循环,无法终止,所以还要再改

加上守护进程

import time
from multiprocessing import JoinableQueue,Process
def consumer(q):
    while True:
        print(q.get())
        time.sleep(0.3)
        q.task_done()#通知队列一个数据已经被处理完了
if __name__=='__main__':
    q = JoinableQueue()
    c = Process(target=consumer,args=(q,))
    c.daemon =True
    c.start()
    for i in range(10):
        q.put(i)
    q.join()#看的不是get的操作,是task_done
            #join标志所有的数据都被取走且被处理完才结束阻塞
    print('所有的数据都被消费完了')

 使用JoinableQueue

#JoinableQueue
import time
import random
from multiprocessing import JoinableQueue,Process
def consumer(name,q):
    while True:
        obj = q.get()
        print('%s吃了一个%s'%(name,obj))
        time.sleep(random.randint(1,3))
        q.task_done()
def producer(name,food,q):
    for i in range(10):
        time.sleep(random.randint(1,5))
        q.put('%s生产的%s%s'%(name,food,i))
        print('%s生产了一个数据%s%s'%(name,food,i))

if __name__=='__main__':
    q = JoinableQueue()
    c1 = Process(target=consumer,args=('haha',q))
    c1.daemon = True
    c1.start()
    c2 =Process(target=consumer, args=('jiji', q))
    c2.daemon= True
    c2.start()
    p1 = Process(target=producer,args=('lala','包子',q))
    p1.start()
    p2 = Process(target=producer,args=('hehe','饺子',q))
    p2.start()
    p1.join()
    p2.join()
    q.join()

 队列:维护了一个先进先出的顺序,且保证了数据在进程之间的安全

 管道:

from multiprocessing import Pipe
lp,rp = Pipe()#创建一个管道,会拿到管道的两端
lp.send([1,2,3]) #两个一个发,也可以收,一个收,也可以发,这个传什么数据都行,是双向通信的
print(rp.recv())
rp.send('123)
print(lp.recv())

例子说明

from multiprocessing import Pipe,Process
def consumer(lp,rp):
    print(rp.recv())

if __name__=='__main__':
    lp,rp =Pipe()
    Process(target=consumer,args=(lp,rp)).start()
    lp.send('hello')

 生成者消费者模型:

#模拟生产者消费者模型,下面的代码会因为while循环,而不会终止
from multiprocessing import Pipe,Process
def consumer(lp,rp):
    while True:
        print(rp.recv())

if __name__=='__main__':
    lp,rp =Pipe()
    Process(target=consumer,args=(lp,rp)).start()
    for i in range(10):
        lp.send('food%s'%i)

解决办法:

 

from multiprocessing import Pipe,Process
def consumer(lp,rp):
    lp.close()
    while True:
        try:
             print(rp.recv())
        except EOFError:break

if __name__=='__main__':
    lp,rp =Pipe()
    Process(target=consumer,args=(lp,rp)).start()
    rp.close()
    for i in range(10):
        lp.send('food%s'%i)
    lp.close()

应该特别注意管道端点的正确管理问题,如果是生产者或消费者中都没有使用管道的某个端点,就应将它关闭,这也说明了为何在生产者中关闭了管道的输出端,在消费者中关闭管道的输入端。如果忘记执行这些步骤,程序可能在消费者中的recv()操作上挂起。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生成EOFError异常。因此,在生产者中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点。

管道的缺点:管道是数据不安全的,因为他没有锁,一个数据可能被多个进程取走

而队列实现的机制是:管道+锁

进程之间数据共享

展望未来,基于消息传递的并发编程是大势所趋

即便是使用线程,推荐做法也是将程序设计为大量独立的线程集合,通过消息队列交换数据。

这样极大地减少了对使用锁定和其他同步手段的需求,还可以扩展到分布式系统中。

但进程间应该尽量避免通信,即便需要通信,也应该选择进程安全的工具来避免加锁带来的问题。

以后我们会尝试使用数据库来解决现在进程之间的数据共享问题。

Manager模块介绍

进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的
虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.
from multiprocessing import Manager,Process,Lock
def work(d,lock):
    with lock: #不加锁而操作共享的数据,肯定会出现数据错乱
        d['count']-=1

if __name__ == '__main__':
    lock=Lock()
    # with Manager() as m:
    #     dic=m.dict({'count':100})
    #     p_l=[]
    #     for i in range(100):
    #         p=Process(target=work,args=(dic,lock))
    #         p_l.append(p)
    #         p.start()
    #     for p in p_l:
    #         p.join()
    #     print(dic)
    m =Manager()
    dic=m.dict({'count':100})#有m创建的字典,可以在进程之间传送
    p_l=[]
    for i in range(100):
        p=Process(target=work,args=(dic,lock))
        p_l.append(p)
        p.start()
    for p in p_l:
        p.join()
    print(dic)

进程池

多进程---适合处理高计算型的程序

进程的个数:CPU的个数,或者CPU的个数+1

池:默认的是CPU的个数+1

为什么要有进程池?进程池的概念。

在程序实际处理问题过程中,忙时会有成千上万的任务需要被执行,闲时可能只有零星任务。那么在成千上万个任务需要被执行的时候,我们就需要去创建成千上万个进程么?首先,创建进程需要消耗时间,销毁进程也需要消耗时间。第二即便开启了成千上万的进程,操作系统也不能让他们同时执行,这样反而会影响程序的效率。因此我们不能无限制的根据任务开启或者结束进程。那么我们要怎么做呢?

进程池的概念,定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果。

Pool([numprocess  [,initializer [, initargs]]]):创建进程池
1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值
2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None
3 initargs:是要传给initializer的参数组
#方法
1 p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。
2 '''需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()'''
3 
4 p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。
5 '''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。'''
6    
7 p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
8 
9 P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用
#其他方法
方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具有以下方法
2 obj.get():返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发一场。如果远程操作中引发了异常,它将在调用此方法时再次被引发。
3 obj.ready():如果调用完成,返回True
4 obj.successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常
5 obj.wait([timeout]):等待结果变为可用。
6 obj.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数
Pool的使用

 例子

from multiprocessing import Pool#使用Pool来开启进程
def wahaha(i):
    print(i**i)
if __name__ == '__main__':
    p =Pool(5)#开启有5个进程的池子
    for i in range(100):
        p.apply_async(func=wahaha,args=(i,))#相当于提交了一个任务,异步提交,主进程与子进程异步了,如果没有close()和join(),无法打印
    p.close()#关闭进程池,防止进一步操作,如果所有操作持续挂起,他们将在工作进程终止前完成
                #这个不是进程池不工作了,而是所有工作都提交完毕了
    p.join()#等待所有工作进程退出,此方法只能在close()和teminate()之后调用

进程池和多进程计算速度对比:下面这个代码是进程池块

import time
from multiprocessing import Pool,Process#使用Pool来开启进程
def wahaha(i):
    print(i**i)
if __name__ == '__main__':
    start = time.time()
    p =Pool(5)#开启有5个进程的池子
    for i in range(100):
        p.apply_async(func=wahaha,args=(i,))#相当于提交了一个任务,异步提交,主进程与子进程异步了
    p.close()#关闭进程池,防止进一步操作,如果所有操作持续挂起,他们将在工作进程终止前完成
                #这个不是进程池不工作了,而是所有工作都提交完毕了,阻止往池中添加新的任务
    p.join()#等待所有工作进程退出,此方法只能在close()和teminate()之后调用,join依赖close
    print('-->>>',time.time() - start)
    start=time.time()
    p_lst=[Process(target=wahaha,args=(i,)) for i in range(101)]
    for p in p_lst:p.start()
    for p in p_lst:p.join()
    print('-->>>',time.time()-start)

如果加上阻塞,那么多进程块,对待不同的程序,使用不同的代码

import time
from multiprocessing import Pool,Process
def wahaha(i):
    time.sleep(5)
    print(i**i)
if __name__ == '__main__':
    start = time.time()
    p =Pool(5)#开启有5个进程的池子
    for i in range(100):
        p.apply_async(func=wahaha,args=(i,))
    p.close()
    p.join()
    print('-->>>',time.time() - start)
    start=time.time()
    p_lst=[Process(target=wahaha,args=(i,)) for i in range(101)]
    for p in p_lst:p.start()
    for p in p_lst:p.join()
    print('-->>>',time.time()-start)

总结:对于纯计算型的,使用进程池更好;对于高IO的代码,使用多进程更好

p.apply和p.apply_async的区别

import time
from multiprocessing import Pool,Process
def wahaha(i):
    time.sleep(0.5)
    print(i**i)
if __name__ == '__main__':
    start = time.time()
    p =Pool(5)
    for i in range(100):
        p.apply_async(func=wahaha,args=(i,))#异步提交了一个任务
    p.close()
    p.join()

    p = Pool(5)
    for i in range(100):
        p.apply(func=wahaha, args=(i,)) #同步提交了一个任务
    p.close()
    p.join()

进程池的效果

import time
import random
from multiprocessing import Pool,Process
def wahaha(i):
    time.sleep(random.randint(1,5))#异步提交,一下子先执行5个进程,之后谁先执行完了,然后再接入任务,开始执行
    print(i**i)                     
if __name__ == '__main__':
    start = time.time()
    p =Pool(5)
    for i in range(100):
        p.apply_async(func=wahaha,args=(i,))#异步提交了一个任务
    p.close()
    p.join()

p.map()的用法

from multiprocessing import Pool,Process
def wahaha(i):
    print(i**i)
if __name__ == '__main__':
    # p = Pool(5)
    # for i in range(100):
    #     p.apply_async(func=wahaha,args=(i,))
    # p.close()
    # p.join()
    p=Pool(5)#map这个代码实现的效果上面的效果
    p.map(func=wahaha,iterable=range(101))#接受一个任务函数,和一个iterable,相当于省略了for循环内部默认实现apply_async()异步处理,
                                        # 并自带close()和join(),只是一种简便的写法

那么p.map()和p.apply_async()之间有什么区别呢?map无法取回返回值,p.apply_async()可以通过get()获取到函数的返回值

import random
import time
from multiprocessing import Pool,Process
def wahaha(i):
    print(i**i)
    time.sleep(random.randint(1,3))
    return i*i*'>'
if __name__ == '__main__':
    p = Pool(5)
    result_lst=[]
    for i in range(100):
        r = p.apply_async(func=wahaha,args=(i,))
        result_lst.append(r)
        # print(r)#打印出来的是个地址
        # print(r.get()) #能够获取到函数wahaha的返回值,但是代码会由异步变成同步的
    for r in result_lst:print(r.get())#使用一个列表,当进程执行完后,我再取值,就不会出现同步问题
    p.close()
    p.join()

回调函数

需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数

我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。

例子

from multiprocessing import Pool
def wahaha(num):
    return num**num
def call(argv):#这个argv是wahaha函数的返回值
    print(argv)
if __name__ == '__main__':
    p = Pool(5)
    p.apply_async(func=wahaha,args=(50,),callback=call)#callbakc是回调函数,接受一个函数地址
    p.close()
    p.join()

回调函数使用的是主进程的资源

import os
from multiprocessing import Pool
def wahaha(num):
    print('子进程', os.getpid())
    return num**num
def call(argv):#这个argv是wahaha函数的返回值。回调函数用的是主进程中的资源。
    print(os.getpid())
    print(argv)
if __name__ == '__main__':
    print('主进程',os.getpid())
    p = Pool(5)
    p.apply_async(func=wahaha,args=(50,),callback=call)#callbakc是回调函数,接受一个函数地址
    p.close()
    p.join()
#Pool中的回调函数,使用的是主进程的资源
原文地址:https://www.cnblogs.com/xiao-xuan-feng/p/14347299.html