47.进程

多任务

先看一下代码

#!/usr/bin/env python
#coding:utf-8
from time import sleep

def sing():
    for i in range(3):
        print("正在唱歌...%d"%i)
        sleep(1)

def dance():
    for i in range(3):
        print("正在跳舞...%d"%i)
        sleep(1)

if __name__ == '__main__':
    sing() #唱歌
    dance() #跳舞

执行结果:

正在唱歌...0
正在唱歌...1
正在唱歌...2
正在跳舞...0
正在跳舞...1
正在跳舞...2

注意

很显然刚刚的程序并没有完成唱歌和跳舞同时进行的要求

如果想要实现“唱歌跳舞”同时进行,那么就需要一个新的方法,叫做:多任务

多任务的概念

什么叫多任务呢?简单地说,就是操作系统可以同时运行多个任务。

现在,多核CPU已经非常普及了,但是,即使过去的单核CPU,也可以执行多任务。由于CPU执行代码都是顺序执行的,那么,单核CPU是怎么执行多任务的呢?

答案就是操作系统轮流让各个任务交替执行,任务1执行0.01秒,切换到任务2,任务2执行0.01秒,再切换到任务3,执行0.01秒……这样反复执行下去。表面上看,每个任务都是交替执行的,但是,由于CPU的执行速度实在是太快了,我们感觉就像所有任务都在同时执行一样。

真正的并行执行多任务只能在多核CPU上实现,但是,由于任务数量远远多于CPU的核心数量,所以,操作系统也会自动把很多任务轮流调度到每个核心上执行。

 操作系统背景知识

顾名思义,进程即正在执行的一个过程。进程是对正在运行程序的一个抽象。

进程的概念起源于操作系统,是操作系统最核心的概念,也是操作系统提供的最古老也是最重要的抽象概念之一。操作系统的其他所有内容都是围绕进程的概念展开的。

所以想要真正了解进程,必须事先了解操作系统

即使可以利用的cpu只有一个(早期的计算机确实如此),也能保证支持(伪)并发的能力。将一个单独的cpu变成多个虚拟的cpu(多道技术:时间多路复用和空间多路复用+硬件上支持隔离),没有进程的抽象,现代计算机将不复存在。

必备的理论基础:


#一 操作系统的作用: 1:隐藏丑陋复杂的硬件接口,提供良好的抽象接口 2:管理、调度进程,并且将多个进程对硬件的竞争变得有序 #二 多道技术: 1.产生背景:针对单核,实现并发 ps: 现在的主机一般是多核,那么每个核都会利用多道技术 有4个cpu,运行于cpu1的某个程序遇到io阻塞,会等到io结束再重新调度,会被调度到4个cpu中的任意一个,具体由操作系统调度算法决定。 2.空间上的复用:如内存中同时有多道程序 3.时间上的复用:复用一个cpu的时间片 强调:遇到io切,占用cpu时间过长也切,核心在于切之前将进程的状态保存下来,这样才能保证下次切换回来时,能基于上次切走的位置继续运行

进程

进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。在早期面向进程设计的计算机结构中,进程是程序的基本执行实体;在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体。

狭义定义:进程是正在运行的程序的实例(an instance of a computer program that is being executed)。
广义定义:进程是一个具有一定独立功能的程序关于某个数据集合的一次运行活动。它是操作系统动态执行的基本单元,在传统的操作系统中,进程既是基本的分配单元,也是基本的执行单元。

 进程的概念

第一,进程是一个实体。每一个进程都有它自己的地址空间,一般情况下,包括文本区域(text region)、数据区域(data region)和堆栈(stack region)。
  文本区域存储处理器执行的代码;数据区域存储变量和进程执行期间使用的动态分配的内存;堆栈区域存储着活动过程调用的指令和本地变量。 第二,进程是一个“执行中的程序”。程序是一个没有生命的实体,只有处理器赋予程序生命时(操作系统执行之),它才能成为一个活动的实体,我们称其为进程。   进程是操作系统中最基本、重要的概念。是多道程序系统出现后,为了刻画系统内部出现的动态情况,描述系统内部各道程序的活动规律引进的一个概念,
  所有多道程序设计操作系统都建立在进程的基础上。

 操作系统引入进程的概念的原因

从理论角度看,是对正在运行的程序过程的抽象;
从实现角度看,是一种数据结构,目的在于清晰地刻画动态系统的内在规律,有效管理和调度进入计算机系统主存储器运行的程序

 进程的特征

动态性:进程的实质是程序在多道程序系统中的一次执行过程,进程是动态产生,动态消亡的。
并发性:任何进程都可以同其他进程一起并发执行
独立性:进程是一个能独立运行的基本单位,同时也是系统分配资源和调度的独立单位;
异步性:由于进程间的相互制约,使进程具有执行的间断性,即进程按各自独立的、不可预知的速度向前推进
结构特征:进程由程序、数据和进程控制块三部分组成。
多个不同的进程可以包含相同的程序:一个程序在不同的数据集里就构成不同的进程,能得到不同的结果;但是执行过程中,程序不能发生改变。

 进程与程序中的区别

程序是指令和数据的有序集合,其本身没有任何运行的含义,是一个静态的概念。
而进程是程序在处理机上的一次执行过程,它是一个动态的概念。
程序可以作为一种软件资料长期存在,而进程是有一定生命期的。
程序是永久的,进程是暂时的。

注意:同一个程序执行两次,就会在操作系统中出现两个进程,所以我们可以同时运行一个软件,分别做不同的事情也不会混乱。

进程调度

要想多个进程交替运行,操作系统必须对这些进程进行调度,这个调度也不是随即进行的,而是需要遵循一定的法则,由此就有了进程的调度算法。

先来先服务调度算法

先来先服务(FCFS)调度算法是一种最简单的调度算法,该算法既可用于作业调度,也可用于进程调度。FCFS算法比较有利于长作业(进程),而不利于短作业(进程)。
由此可知,本算法适合于CPU繁忙型作业,而不利于I/O繁忙型的作业(进程)。

短作业优先调度算法

短作业(进程)优先调度算法(SJ/PF)是指对短作业或短进程优先调度的算法,该算法既可用于作业调度,也可用于进程调度。但其对长作业不利;
不能保证紧迫性作业(进程)被及时处理;作业的长短只是被估算出来的。

时间片轮转法

时间片轮转(Round Robin,RR)的基本思路是让每个进程在就绪队列中的等待时间与享受服务的时间成比例。在时间片轮转法中,
需要将CPU的处理时间分成固定大小的时间片,例如,几十毫秒至几百毫秒。如果一个进程在被调度选中之后用完了系统规定的时间片,但又未完成要求的任务,
则它自行释放自己所占有的CPU而排到就绪队列的末尾,等待下一次调度。同时,进程调度程序又去调度当前就绪队列中的第一个进程。
显然,轮转法只能用来调度分配一些可以抢占的资源。这些可以抢占的资源可以随时被剥夺,而且可以将它们再分配给别的进程。CPU是可抢占资源的一种。
但打印机等资源是不可抢占的。由于作业调度是对除了CPU之外的所有系统硬件资源的分配,其中包含有不可抢占资源,所以作业调度不使用轮转法。 在轮转法中,时间片长度的选取非常重要。首先,时间片长度的选择会直接影响到系统的开销和响应时间。如果时间片长度过短,则调度程序抢占处理机的次数增多。
这将使进程上下文切换次数也大大增加,从而加重系统开销。反过来,如果时间片长度选择过长,例如,一个时间片能保证就绪队列中所需执行时间最长的进程能执行完毕,
则轮转法变成了先来先服务法。时间片长度的选择是根据系统对响应时间的要求和就绪队列中所允许最大的进程数来确定的。
在轮转法中,加入到就绪队列的进程有3种情况:
一种是分给它的时间片用完,但进程还未完成,回到就绪队列的末尾等待下次调度去继续执行。 另一种情况是分给该进程的时间片并未用完,只是因为请求I/O或由于进程的互斥与同步关系而被阻塞。当阻塞解除之后再回到就绪队列。 第三种情况就是新创建进程进入就绪队列。

如果对这些进程区别对待,给予不同的优先级和时间片从直观上看,可以进一步改善系统服务质量和效率。例如,
我们可把就绪队列按照进程到达就绪队列的类型和进程被阻塞时的阻塞原因分成不同的就绪队列,每个队列按FCFS原则排列,各队列之间的进程享有不同的优先级,
但同一队列内优先级相同。这样,当一个进程在执行完它的时间片之后,或从睡眠中被唤醒以及被创建之后,将进入不同的就绪队列。

多级反馈队列

前面介绍的各种用作进程调度的算法都有一定的局限性。如短进程优先的调度算法,仅照顾了短进程而忽略了长进程,而且如果并未指明进程的长度,
则短进程优先和基于进程长度的抢占式调度算法都将无法使用。 而多级反馈队列调度算法则不必事先知道各种进程所需的执行时间,而且还可以满足各种类型进程的需要,因而它是目前被公认的一种较好的进程调度算法。
在采用多级反馈队列调度算法的系统中,调度算法的实施过程如下所述。
(1) 应设置多个就绪队列,并为各个队列赋予不同的优先级。第一个队列的优先级最高,第二个队列次之,其余各队列的优先权逐个降低。
该算法赋予各个队列中进程执行时间片的大小也各不相同,在优先权愈高的队列中,为每个进程所规定的执行时间片就愈小。例如,
第二个队列的时间片要比第一个队列的时间片长一倍,……,第i+1个队列的时间片要比第i个队列的时间片长一倍。
(2) 当一个新进程进入内存后,首先将它放入第一队列的末尾,按FCFS原则排队等待调度。当轮到该进程执行时,如它能在该时间片内完成,便可准备撤离系统;
如果它在一个时间片结束时尚未完成,调度程序便将该进程转入第二队列的末尾,再同样地按FCFS原则等待调度执行;
如果它在第二队列中运行一个时间片后仍未完成,再依次将它放入第三队列,……,如此下去,当一个长作业(进程)从第一队列依次降到第n队列后,
在第n 队列便采取按时间片轮转的方式运行。 (3) 仅当第一队列空闲时,调度程序才调度第二队列中的进程运行;仅当第1~(i-1)队列均空时,才会调度第i队列中的进程运行。
如果处理机正在第i队列中为某进程服务时,又有新进程进入优先权较高的队列(第1~(i-1)中的任何一个队列),则此时新进程将抢占正在运行进程的处理机,
即由调度程序把正在运行的进程放回到第i队列的末尾,把处理机分配给新到的高优先权进程。

进程的并行与并发

无论是并行还是并发,在用户看来都是'同时'运行的,不管是进程还是线程,都只是一个任务而已,真正干活的是cpu,cpu来做这些任务,而一个cpu同一时刻只能执行一个任务

并发:是伪并行,即看起来是同时运行。单个cpu+多道技术就可以实现并发

并发是指资源有限的情况下,两者交替轮流使用资源,比如一段路(单核CPU资源)同时只能过一个人,A走一段后,让给B,B用完继续给A ,交替使用,目的是提高效率。

并行:同时运行,只有具备多个CPU才能实现并行

单核下,可以利用多道技术,多个核,每个核也都可以利用多道技术(多道技术是针对单核而言的)
 
有四个核,六个任务,这样同一时间有四个任务被执行,假设分别被分配给了cpu1,cpu2,cpu3,cpu4,
 
一旦任务1遇到I/O就被迫中断执行,此时任务5就拿到cpu1的时间片去执行,这就是单核下的多道技术
 
而一旦任务1的I/O结束了,操作系统会重新调用它(需知进程的调度、分配给哪个cpu运行,由操作系统说了算)
 
可能被分配给四个cpu中的任意一个去执行

区别:

并行是从微观上,也就是在一个精确的时间片刻,有不同的程序在执行,这就要求必须有多个处理器。
并发是从宏观上,在一个时间段上可以看出是同时执行的,比如一个服务器同时处理多个session。

 如图:

多道技术概念回顾:

内存中同时存入多道(多个)程序,cpu从一个进程快速切换到另外一个,使每个进程各自运行几十或几百毫秒,这样,虽然在某一个瞬间,一个cpu只能执行一个任务,但在1秒内,cpu却可以运行多个进程,这就给人产生了并行的错觉,即伪并发,以此来区分多处理器操作系统的真正硬件并行(多个cpu共享同一个物理内存)

同步、异步、阻塞、非阻塞

状态介绍

 在了解其他概念之前,我们首先要了解进程的几个状态。在程序运行的过程中,由于被操作系统的调度算法控制,程序会进入几个状态:就绪,运行和阻塞。

1)就绪(Ready)状态

  当进程已分配到除CPU以外的所有必要的资源,只要获得处理机便可立即执行,这时的进程状态称为就绪状态。

(2)执行/运行(Running)状态
  当进程已获得处理机,其程序正在处理机上执行,此时的进程状态称为执行状态。
3)阻塞(Blocked)状态正在执行的进程,由于等待某个事件发生而无法执行时,便放弃处理机而处于阻塞状态。引起进程阻塞的事件可有多种,
  例如,等待I/O完成、申请缓冲区不能满足、等待信件(信号)等。

 如图:

同步和异步

  所谓同步就是一个任务的完成需要依赖另外一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成,这是一种可靠的任务序列。要么成功都成功,失败都失败,两个任务的状态可以保持一致。

例子:

1. multiprocessing.Pool下的apply
#发起同步调用后,就在原地等着任务结束,根本不考虑任务是在计算还是在io阻塞,
总之就是一股脑地等任务结束  

 

所谓异步是不需要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工作,依赖的任务也立即执行,只要自己完成了整个任务就算完成了。至于被依赖的任务最终是否真正完成,依赖它的任务无法确定,所以它是不可靠的任务序列。

举例:

1. multiprocessing.Pool().apply_async()
#发起异步调用后,并不会等待任务结束才返回,相反,会立即获取一个临时结果(并不
是最终的结果,可能是封装好的一个对象)。

例子:

比如我去银行办理业务,可能会有两种方式:
第一种 :选择排队等候;
第二种 :选择取一个小纸条上面有我的号码,等到排到我这一号时由柜台的人通知我轮到我去办理业务了;

第一种:前者(排队等候)就是同步等待消息通知,也就是我要一直在等待银行办理业务情况;

第二种:后者(等待别人通知)就是异步等待消息通知。在异步消息处理中,等待消息通知者(在这个例子中就是等待办理业务的人)往往注册一个回调机制,
在所等待的事件被触发时由触发机制(在这里是柜台的人)通过某种机制(在这里是写在小纸条上的号码,喊号)找到等待该事件的人。

阻塞与非阻塞

  阻塞和非阻塞这两个概念与程序(线程)等待消息通知(无所谓同步或者异步)时的状态有关。也就是说阻塞与非阻塞主要是程序(线程)等待消息通知时的状态角度来说的

例子:

继续上面的那个例子,不论是排队还是使用号码等待通知,如果在这个等待的过程中,等待者除了等待消息通知之外不能做其它的事情,那么该机制就是阻塞的,
表现在程序中,也就是该程序一直阻塞在该函数调用处不能继续往下执行。 相反,有的人喜欢在银行办理这些业务的时候一边打打电话发发短信一边等待,这样的状态就是非阻塞的,因为他(等待者)没有阻塞在这个消息通知上,
而是一边做自己的事情一边等待。 注意:同步非阻塞形式实际上是效率低下的,想象一下你一边打着电话一边还需要抬头看到底队伍排到你了没有。如果把打电话和观察排队的位置看成是程序的两个操作的话,
这个程序需要在这两种不同的行为之间来回的切换,效率可想而知是低下的;而异步非阻塞形式却没有这样的问题,因为打电话是你(等待者)的事情,
而通知你则是柜台(消息触发机制)的事情,程序没有在两种不同的操作中来回切换。

阻塞:进程给CPU传达一个任务之后,一直等待CPU处理完成,然后才执行后面的操作

阻塞调用是指调用结果返回之前,当前线程会被挂起(如遇到io操作)。函数只有在得到结果之后才会将阻塞的线程激活。有人也许会把阻塞调用和同步调用等同起来,
实际上他是不同的。对于同步调用来说,很多时候当前线程还是激活的,只是从逻辑上当前函数没有返回而已。 举例: 1. 同步调用:apply一个累计1亿次的任务,该调用会一直等待,直到任务返回结果为止, 但并未阻塞住(即便是被抢走cpu的执行权限,那也是处于就绪态); 2. 阻塞调用:当socket工作在阻塞模式的时候,如果没有数据的情况下调用recv函数, 则当前线程就会被挂起,直到有数据为止。

非阻塞:进程给CPU传达任务后,继续处理后续的操作,隔段时间再来咨询之前的操作是否完成,这样的过程其实也叫轮询。

非阻塞和阻塞的概念相对应,指在不能立刻得到结果之前也会立刻返回,同时该函数不会阻塞当前线程。

同步阻塞形式

效率最低。拿上面的例子来说,就是你专心排队,什么别的事都不做。

异步阻塞形式

如果在银行等待办理业务的人采用的是异步的方式去等待消息被触发(通知),也就是领了一张小纸条,假如在这段时间里他不能离开银行做其它的事情,那么很显然,
这个人被阻塞在了这个等待的操作上面;
异步操作是可以被阻塞住的,只不过它不是在处理消息时阻塞,而是在等待消息通知时被阻塞。

同步非阻塞形式


 

实际上是效率低下的。
想象一下你一边打着电话一边还需要抬头看到底队伍排到你了没有,如果把打电话和观察排队的位置看成是程序的两个操作的话,
这个程序需要在这两种不同的行为之间来回的切换,效率可想而知是低下的。

 
 

异步非阻塞形式

效率更高,

因为打电话是你(等待者)的事情,而通知你则是柜台(消息触发机制)的事情,程序没有在两种不同的操作中来回切换。
比如说,这个人突然发觉自己烟瘾犯了,需要出去抽根烟,于是他告诉大堂经理说,排到我这个号码的时候麻烦到外面通知我一下,
那么他就没有被阻塞在这个等待的操作上面,自然这个就是异步+非阻塞的方式了。



  

很多人会把同步和阻塞混淆,是因为很多时候同步操作会以阻塞的形式表现出来,同样的,很多人也会把异步和非阻塞混淆,因为异步操作一般都不会在真正的IO操作处被阻塞。

进程的创建与结束

进程的创建

但凡是硬件,都需要有操作系统去管理,只要有操作系统,就有进程的概念,就需要有创建进程的方式,一些操作系统只为一个应用程序设计,比如微波炉中的控制器,一旦启动微波炉,所有的进程都已经存在。

而对于通用系统(跑很多应用程序),需要有系统运行过程中创建或撤销进程的能力,主要分为4种形式创建新的进程:

1. 系统初始化(查看进程linux中用ps命令,windows中用任务管理器,前台进程负责与用户交互,后台运行的进程与用户无关,运行在后台并且只在需要时才唤醒的进程,
  称为守护进程,如电子邮件、web页面、新闻、打印) 2. 一个进程在运行过程中开启了子进程(如nginx开启多进程,os.fork,subprocess.Popen等) 3. 用户的交互式请求,而创建一个新进程(如用户双击暴风影音) 4. 一个批处理作业的初始化(只在大型机的批处理系统中应用)

无论哪一种,新进程的创建都是由一个已经存在的进程执行了一个用于创建进程的系统调用而创建的

创建进程:

1. 在UNIX中该系统调用是:fork,fork会创建一个与父进程一模一样的副本,二者有相同的存储映像、同样的环境字符串和同样的打开文件
  (在shell解释器进程中,执行一个命令就会创建一个子进程) 2. 在windows中该系统调用是:CreateProcess,CreateProcess既处理进程的创建,也负责把正确的程序装入新进程。
关于创建子进程,UNIX和windows 1.相同的是:进程创建后,父进程和子进程有各自不同的地址空间(多道技术要求物理层面实现进程之间内存的隔离),
  任何一个进程在其地址空间中的修改都不会影响到另外一个进程。 2.不同的是:在UNIX中,子进程的初始地址空间是父进程的一个副本,提示:子进程和父进程是可以有只读的共享内存区的。
但是对于windows系统来说,从一开始父进程与子进程的地址空间就是不同的。

进程的创建-fork

Python的os模块封装了常见的系统调用,其中就包括fork,可以在Python程序中轻松创建子进程:

#!/usr/bin/env python
#coding:utf-8
import os

# 注意,fork函数,只在Unix/Linux/Mac上运行,windows不可以
pid = os.fork()

if pid == 0:
    print('哈哈1')
else:    
    print('哈哈2')

说明:

1.程序执行到os.fork()时,操作系统会创建一个新的进程(子进程),然后复制父进程的所有信息到子进程中

2.然后父进程和子进程都会从fork()函数中得到一个返回值,在子进程中这个值一定是0,而父进程中是子进程的 id号

在Unix/Linux操作系统中,提供了一个fork()系统函数,它非常特殊。

普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。

子进程永远返回0,而父进程返回子进程的ID。

这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID

getpid()、getppid()

import os

rpid = os.fork()
if rpid<0:
    print("fork调用失败。")
elif rpid == 0:
    print("我是子进程(%s),我的父进程是(%s)"%(os.getpid(),os.getppid()))
    
else:
    print("我是父进程(%s),我的子进程是(%s)"%(os.getpid(),rpid))

print("父子进程都可以执行这里的代码")

运行结果:

我是父进程(19360),我的子进程是(19361)
父子进程都可以执行这里的代码
我是子进程(19361),我的父进程是(19360)
父子进程都可以执行这里的代码

多进程修改全局变量

#coding=utf-8
import os
import time

num = 0

# 注意,fork函数,只在Unix/Linux/Mac上运行,windows不可以
pid = os.fork()

if pid == 0:
    num+=1
    print('哈哈1---num=%d'%num)
else:
    time.sleep(1)
    num+=1
    print('哈哈2---num=%d'%num)

总结:

1.多进程中,每个进程中所有数据(包括全局变量)都各有拥有一份,互不影响

多次fork问题

如果在一个程序,有2次的fork函数调用,是否就会有3个进程呢?

#coding=utf-8
import os
import time

# 注意,fork函数,只在Unix/Linux/Mac上运行,windows不可以
pid = os.fork()
if pid == 0:
    print('哈哈1')
else:
    print('哈哈2')

pid = os.fork()
if pid == 0:
    print('哈哈3')
else:
    print('哈哈4')

time.sleep(1)

说明:

 父子进程的执行顺序:

父进程、子进程执行顺序没有规律,完全取决于操作系统的调度算法

进程的结束

   1. 正常退出(自愿,如用户点击交互式页面的叉号,或程序执行完毕调用发起系统调用正常退出,在linux中用exit,在windows中用ExitProcess)

  2. 出错退出(自愿,python a.py中a.py不存在)

  3. 严重错误(非自愿,执行非法指令,如引用不存在的内存,1/0等,可以捕捉异常,try...except...)

  4. 被其他进程杀死(非自愿,如kill -9)

进程的层次结构

无论UNIX还是windows,进程只有一个父进程,不同的是:

    1.在UNIX中所有的进程,都是以init进程为根,组成树形结构。父子进程共同组成一个进程组,这样,当从键盘发出一个信号时,
    该信号被送给当前与键盘相关的进程组中的所有成员。 2.在windows中,没有进程层次的概念,所有的进程都是地位相同的,唯一类似于进程层次的暗示,是在创建进程时,父进程得到一个特别的令牌(称为句柄),
    该句柄可以用来控制子进程,但是父进程有权把该句柄传给其他子进程,这样就没有层次了。

进程的状态

tail -f access.log |grep '404'

执行程序tail,开启一个子进程,执行程序grep,开启另外一个子进程,两个进程之间基于管道'|'通讯,将tail的结果作为grep的输入。

进程grep在等待输入(即I/O)时的状态称为阻塞,此时grep命令都无法运行

其实在两种情况下会导致一个进程在逻辑上不能运行,

    1.进程挂起是自身原因,遇到I/O阻塞,便要让出CPU让其他进程去执行,这样保证CPU一直在工作

    2.与进程无关,是操作系统层面,可能会因为一个进程占用时间过多,或者优先级等原因,而调用其他的进程去使用CPU。

因而一个进程由三种状态

 

进程并发的实现

  进程并发的实现在于,硬件中断一个正在运行的进程,把此时进程运行的所有状态保存下来,为此,操作系统维护一张表格,即进程表(process table),

每个进程占用一个进程表项(这些表项也称为进程控制块)

  该表存放了进程状态的重要信息:程序计数器、堆栈指针、内存分配状况、所有打开文件的状态、帐号和调度信息,以及其他在进程由运行态转为就绪态或阻塞态时,必须保存的信息,从而保证该进程在再次启动时,就像从未被中断过一样。

在python程序中的进程操作:创建进程部分

之前我们已经了解了很多进程相关的理论知识,了解进程是什么应该不再困难了,刚刚我们已经了解了,运行中的程序就是一个进程。所有的进程都是通过它的父进程来创建的。因此,运行起来的python程序也是一个进程,那么我们也可以在程序中再创建进程。多个进程可以实现并发效果,也就是说,当我们的程序中存在多个进程的时候,在某些时候,就会让程序的执行速度变快。以我们之前所学的知识,并不能实现创建进程这个功能,所以我们就需要借助python中强大的模块。

仔细说来,multiprocess不是一个模块而是python中一个操作、管理进程的包。 之所以叫multi是取自multiple的多功能的意思,在这个包中几乎包含了和进程有关的所有子模块。由于提供的子模块非常多,为了方便大家归类记忆,我将这部分大致分为四个部分:创建进程部分,进程同步部分,进程池部分,进程之间数据共享。

multiprocess模块

如果你打算编写多进程的服务程序,Unix/Linux无疑是正确的选择。由于Windows没有fork调用,难道在Windows上无法用Python编写多进程的程序?

由于Python是跨平台的,自然也应该提供一个跨平台的多进程支持。multiprocessing模块就是跨平台版本的多进程模块。

python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程。

Python提供了multiprocessing。
multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似。

multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。

需要再次强调的一点是:与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。

multiprocessing模块提供了一个Process类来代表一个进程对象,下面的例子演示了启动一个子进程并等待其结束:

#!/usr/bin/env python
#coding:utf-8


from multiprocessing import Process
import os

# 子进程要执行的代码
def run_proc(name):
    print('子进程运行中,name=%s ,pid=%d...' % (name, os.getpid()))

if __name__=='__main__':
    print('父进程 %d.' % os.getpid())
    p = Process(target=run_proc, args=('test',))
    print('子进程将要执行')
    p.start()
    p.join()
    print('子进程已结束')

执行结果:

父进程 7136.
子进程将要执行
子进程运行中,name=test ,pid=4440...
子进程已结束

说明:

 创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动,这样创建进程比fork()还要简单。
    join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。

Process类介绍

 Process模块是一个创建进程的模块,借助这个模块,就可以完成进程的创建

Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)

强调:
1. 需要使用关键字的方式来指定参数
2. args指定的,为传给target函数的位置参数,是一个元组形式,必须有逗号

参数介绍:
group:参数未使用,值始终为None
target:表示这个进程实例所调用对象,即子进程要执行的任务
args:表示调用对象的位置参数元组,args=(1,2,'egon',)
kwargs:表示调用对象的关键字参数字典,kwargs={'name':'egon','age':18}
name:为子进程的名称

方法介绍:

p.start():启动进程,并调用该子进程中的p.run() 
p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法;
      如果没有给定target参数,对这个对象调用start()方法时,就将执行对象中的run()方法; p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。
        如果p还保存了一个锁那么也将不会被释放,进而导致死锁 p.is_alive():如果p仍然运行,返回True;判断进程实例是否还在执行; p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,
            p.join只能join住start开启的进程,而不能join住run开启的进程
            是否等待进程实例执行结束,或等待多少秒;

属性介绍

p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,
      必须在p.start()之前设置 p.name:进程的名称;当前进程实例别名,默认为Process-N,N为从1开始递增的整数; p.pid:进程的pid p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可) p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,
      这类连接只有在具有相同的身份验证键时才能成功(了解即可)

在windows中使用process模块的注意事项

在Windows操作系统中由于没有fork(linux操作系统中创建进程的机制),在创建子进程的时候会自动 import 启动它的这个文件,
而在 import 的时候又执行了整个文件。因此如果将process()直接写在文件中就会无限递归创建子进程报错。
所以必须把创建子进程的部分使用if __name__ ==‘__main__’ 判断保护起来,import 的时候 ,就不会递归运行了。

如图:

使用process模块创建进程

创建并开启子进程的方式一

#!/usr/bin/env python
#coding:utf-8

import time
import random
from multiprocessing import Process

def piao(name):
    print "%s piaoing " % name
    time.sleep(random.randrange(1,5))
    print "%s piao end" % name

if __name__ == "__main__":
    # 实例化得到四个对象
    p1 = Process(target=piao,args=('egon',))  #必须加,号
    p2 = Process(target=piao, args=('alex',))
    p3 = Process(target=piao, args=('wupeqi',))
    p4 = Process(target=piao, args=('yuanhao',))

    # 调用对象下的方法,开启四个进程
    p1.start()
    p2.start()
    p3.start()
    p4.start()
    print ""

执行结构:

egon piaoing 
主
alex piaoing 
wupeqi piaoing 
yuanhao piaoing 
egon piao end
wupeqi piao end
yuanhao piao end
alex piao end

创建并开启子进程的方式二

创建新的进程还能够使用类的方式,可以自定义一个类,继承Process类,每次实例化这个类的时候,就等同于实例化一个进程对象,:

python2 :

#!/usr/bin/env python
#coding:utf-8

import time
import random
from multiprocessing import Process

class Piao(Process):

    def __init__(self,name):
        super(Piao,self).__init__()
        self.name = name

    def run(self):
        print "%s piaoing " % self.name
        time.sleep(random.randrange(1, 5))
        print '%s piao end' % self.name

if __name__ == "__main__":
    # 实例化得到四个对象
    p1 = Piao('egon')
    p2 = Piao('alex')
    p3 = Piao('wupeiqi')
    p4 = Piao('yuanhao')

    # 调用对象下的方法,开启四个进程
    p1.start()  # start会自动调用run
    p2.start()
    p3.start()
    p4.start()
    print('')

python3:

import time
import random
from multiprocessing import Process
 
class Piao(Process):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        print('%s piaoing' %self.name)
 
        time.sleep(random.randrange(1,5))
        print('%s piao end' %self.name)
 
if __name__ == '__main__':
    #实例化得到四个对象
    p1=Piao('egon')
    p2=Piao('alex')
    p3=Piao('wupeiqi')
    p4=Piao('yuanhao')
 
    #调用对象下的方法,开启四个进程
    p1.start() #start会自动调用run
    p2.start()
    p3.start()
    p4.start()
    print('')

 执行结果:

egon piaoing 
alex piaoing 
主
wupeiqi piaoing 
yuanhao piaoing 
egon piao end
alex piao end
wupeiqi piao end
yuanhao piao end

进程之间的内存空间是隔离的

#!/usr/bin/env python
#coding:utf-8

from multiprocessing import Process

n = 100  # 在windows系统中应该把全局变量定义在if __name__ == '__main__'之上就可以了


def work():
    global n
    n = 0
    print '子进程内: ', n


if __name__ == '__main__':
    p = Process(target=work)
    p.start()
    print '主进程内: ', n

执行结果:

主进程内:  100
子进程内:  0

僵尸进程与孤儿进程(了解)

参考博客:http://www.cnblogs.com/Anker/p/3271773.html

一:僵尸进程(有害)

僵尸进程:一个进程使用fork创建子进程,如果子进程退出,而父进程并没有调用wait或waitpid获取子进程的状态信息,那么子进程的进程描述符仍然保存在系统中。这种进程称之为僵死进程。详解如下

我们知道在unix/linux中,正常情况下子进程是通过父进程创建的,子进程在创建新的进程。子进程的结束和父进程的运行是一个异步过程,即父进程永远无法预测子进程到底什么时候结束,如果子进程一结束就立刻回收其全部资源,那么在父进程内将无法获取子进程的状态信息。

因此,UNⅨ提供了一种机制可以保证父进程可以在任意时刻获取子进程结束时的状态信息:

1、在每个进程退出的时候,内核释放该进程所有的资源,包括打开的文件,占用的内存等。但是仍然为其保留一定的信息
(包括进程号the process ID,退出状态the termination status of the process,运行时间the amount of CPU time taken by the process等)
2、直到父进程通过wait / waitpid来取时才释放. 但这样就导致了问题,如果进程不调用wait / waitpid的话,那么保留的那段信息就不会释放,
其进程号就会一直被占用,但是系统所能使用的进程号是有限的,如果大量的产生僵死进程,将因为没有可用的进程号而导致系统不能产生新的进程. 此即为僵尸进程的危害,应当避免。

任何一个子进程(init除外)在exit()之后,并非马上就消失掉,而是留下一个称为僵尸进程(Zombie)的数据结构,等待父进程处理。这是每个子进程在结束时都要经过的阶段。如果子进程在exit()之后,父进程没有来得及处理,这时用ps命令就能看到子进程的状态是“Z”。如果父进程能及时 处理,可能用ps命令就来不及看到子进程的僵尸状态,但这并不等于子进程不经过僵尸状态。 如果父进程在子进程结束之前退出,则子进程将由init接管。init将会以父进程的身份对僵尸状态的子进程进行处理。

二:孤儿进程(无害)

孤儿进程:一个父进程退出,而它的一个或多个子进程还在运行,那么那些子进程将成为孤儿进程。孤儿进程将被init进程(进程号为1)所收养,并由init进程对它们完成状态收集工作。

孤儿进程是没有父进程的进程,孤儿进程这个重任就落到了init进程身上,init进程就好像是一个民政局,专门负责处理孤儿进程的善后工作。每当出现一个孤儿进程的时候,内核就把孤 儿进程的父进程设置为init,而init进程会循环地wait()它的已经退出的子进程。这样,当一个孤儿进程凄凉地结束了其生命周期的时候,init进程就会代表党和政府出面处理它的一切善后工作。因此孤儿进程并不会有什么危害。

我们来测试一下(创建完子进程后,主进程所在的这个脚本就退出了,当父进程先于子进程结束时,子进程会被init收养,成为孤儿进程,而非僵尸进程),

文件内容

#!/usr/bin/env python
#coding:utf-8


import os
import sys
import time

pid = os.getpid()
ppid = os.getppid()
print 'im father', 'pid', pid, 'ppid', ppid
pid = os.fork()
# 执行pid=os.fork()则会生成一个子进程
# 返回值pid有两种值:
#    如果返回的pid值为0,表示在子进程当中
#    如果返回的pid值>0,表示在父进程当中
if pid > 0:
    print 'father died..'
    sys.exit(0)

# 保证主线程退出完毕
time.sleep(1)
print 'im child', os.getpid(), os.getppid()

执行文件,输出结果:Windows下不能实现

im father pid 32515 ppid 32015
father died..
im child 32516 1

看,子进程已经被pid为1的init进程接收了,所以僵尸进程在这种情况下是不存在的,存在只有孤儿进程而已,孤儿进程声明周期结束自然会被init来销毁。

三:僵尸进程危害场景:

例如有个进程,它定期的产 生一个子进程,这个子进程需要做的事情很少,做完它该做的事情之后就退出了,因此这个子进程的生命周期很短,

但是,父进程只管生成新的子进程,至于子进程 退出之后的事情,则一概不闻不问,这样,系统运行上一段时间之后,系统中就会存在很多的僵死进程,

倘若用ps命令查看的话,就会看到很多状态为Z的进程。 严格地来说,僵死进程并不是问题的根源,罪魁祸首是产生出大量僵死进程的那个父进程。

因此,当我们寻求如何消灭系统中大量的僵死进程时,答案就是把产生大 量僵死进程的那个元凶枪毙掉(也就是通过kill发送SIGTERM或者SIGKILL信号啦)。

枪毙了元凶进程之后,它产生的僵死进程就变成了孤儿进 程,这些孤儿进程会被init进程接管,init进程会wait()这些孤儿进程,释放它们占用的系统进程表中的资源,

这样,这些已经僵死的孤儿进程 就能瞑目而去了。

四:测试

#1、产生僵尸进程的程序test.py内容如下
 
#coding:utf-8
from multiprocessing import Process
import time,os
 
def run():
    print('',os.getpid())
 
if __name__ == '__main__':
    p=Process(target=run)
    p.start()
     
    print('',os.getpid())
    time.sleep(1000)
 
 
#2、在unix或linux系统上执行
[root@vm172-31-0-19 ~]# python3  test.py &
[1] 18652
[root@vm172-31-0-19 ~]# 主 1865218653
 
[root@vm172-31-0-19 ~]# ps aux |grep Z
USER       PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
root     18653  0.0  0.0      0     0 pts/0    Z    20:02   0:00 [python3] <defunct> #出现僵尸进程
root     18656  0.0  0.0 112648   952 pts/0    S+   20:02   0:00 grep --color=auto Z
 
[root@vm172-31-0-19 ~]# top #执行top命令发现1zombie
top - 20:03:42 up 31 min,  3 users,  load average: 0.01, 0.06, 0.12
Tasks:  93 total,   2 running,  90 sleeping,   0 stopped,   1 zombie
%Cpu(s):  0.0 us,  0.3 sy,  0.0 ni, 99.7 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
KiB Mem :  1016884 total,    97184 free,    70848 used,   848852 buff/cache
KiB Swap:        0 total,        0 free,        0 used.   782540 avail Mem
 
  PID USER      PR  NI    VIRT    RES    SHR S %CPU %MEM     TIME+ COMMAND                                                                                                                                       
root      20   0   29788   1256    988 S  0.3  0.1   0:01.50 elfin                                                                                                                     
 
 
#3、
等待父进程正常结束后会调用wait/waitpid去回收僵尸进程
但如果父进程是一个死循环,永远不会结束,那么该僵尸进程就会一直存在,僵尸进程过多,就是有害的
解决方法一:杀死父进程
解决方法二:对开启的子进程应该记得使用join,join会回收僵尸进程
参考python2源码注释
class Process(object):
    def join(self, timeout=None):
        '''
        Wait until child process terminates
        '''
        assert self._parent_pid == os.getpid(), 'can only join a child process'
        assert self._popen is not None, 'can only join a started process'
        res = self._popen.wait(timeout)
        if res is not None:
            _current_process._children.discard(self)
 
join方法中调用了wait,告诉系统释放僵尸进程。discard为从自己的children中剔除
 
解决方法三:http://blog.csdn.net/u010571844/article/details/50419798

查看进程与pid

python2 :

#!/usr/bin/env python
#coding:utf-8


from multiprocessing import Process
import time
import random


class Piao(Process):
    def __init__(self, name):
        # self.name=name
        # super().__init__() #Process的__init__方法会执行self.name=Piao-1,
        #                    #所以加到这里,会覆盖我们的self.name=name

        # 为我们开启的进程设置名字的做法
        super(Piao,self).__init__()
        self.name = name

    def run(self):
        print('%s is piaoing' % self.name)
        time.sleep(random.randrange(1, 3))
        print('%s is piao end' % self.name)


p = Piao('egon')
p.start()
print('开始')
print(p.pid)  # 查看pid

python3:


#!/usr/bin/env python
#coding:utf-8


from multiprocessing import Process import time import random class Piao(Process): def __init__(self,name): # self.name=name # super().__init__() #Process的__init__方法会执行self.name=Piao-1, # #所以加到这里,会覆盖我们的self.name=name #为我们开启的进程设置名字的做法 super().__init__() self.name=name def run(self): print('%s is piaoing' %self.name) time.sleep(random.randrange(1,3)) print('%s is piao end' %self.name) p=Piao('egon') p.start() print('开始') print(p.pid) #查看pid

 执行结果:Windows不能实现

开始
27287
egon is piaoing
egon is piao end

查看主进程和子进程的进程号

python2:

#!/usr/bin/env python
#coding:utf-8


import os
from multiprocessing import Process

def f(x):
    print '子进程id :',os.getpid(),'父进程id :',os.getppid()
    return x*x

if __name__ == '__main__':
    print '主进程id :', os.getpid()
    p_lst = []
    for i in range(5):
        p = Process(target=f, args=(i,))
        p.start()

python3:

import os
from multiprocessing import Process

def f(x):
    print('子进程id :',os.getpid(),'父进程id :',os.getppid())
    return x*x

if __name__ == '__main__':
    print('主进程id :', os.getpid())
    p_lst = []
    for i in range(5):
        p = Process(target=f, args=(i,))
        p.start()

执行结果:

主进程id : 27381
子进程id : 27382 父进程id : 27381
子进程id : 27383 父进程id : 27381
子进程id : 27385 父进程id : 27381
子进程id : 27386 父进程id : 27381
子进程id : 27384 父进程id : 27381

进阶,多个进程同时运行(注意,子进程的执行顺序不是根据启动顺序决定的)

 多个进程同时运行

#!/usr/bin/env python
#coding:utf-8


import time
from multiprocessing import Process


def f(name):
    print('hello', name)
    time.sleep(1)


if __name__ == '__main__':
    p_lst = []
    for i in range(5):
        p = Process(target=f, args=('bob',))
        p.start()
        p_lst.append(p)

 执行结果:

('hello', 'bob')
('hello', 'bob')
('hello', 'bob')
('hello', 'bob')
('hello', 'bob')

Process对象的join方法

在主进程运行过程中如果想要并发的执行其他任务,我们可以开启子进程,此时主进程的任务和子进程的任务分为两种情况

  一种情况是:在主进程的任务与子进程的任务彼此独立的情况下,主进程的任务先执行完毕后,主进程还需要等待子进程执行完毕,然后统一回收资源

  一种情况是:如果主进程的任务在执行到某一个阶段时,需要等待子进程执行完毕后才能继续执行,就需要一种机制能够让主进程监测子进程是否运行完毕,在子进程执行完毕后才继续执行,否则一直在原地阻塞,这就是join方法的作用。

#!/usr/bin/env python
#coding:utf-8


from multiprocessing import Process
import time
import os

def task(name):
    print "%s is running ,parent is %s"%(name,os.getppid())
    time.sleep(1)


if __name__ == '__main__':
    p1 = Process(target=task,args=('子进程1',))
    p2 = Process(target=task, args=('子进程2',))
    p3 = Process(target=task, args=('子进程3',))
    p1.start()
    print(p1.is_alive())
    p2.start()
    p3.start()
    p1.join()
    p2.join()
    p3.join()
    print(p1.is_alive())
    print("主进程   %s is running ,parent is %s" % (os.getpid(), os.getppid()))
    print(p1.name)

执行结果:Windows下不能实现:AttributeError: 'module' object has no attribute 'getppid'

True
子进程1 is running ,parent is 27663
子进程2 is running ,parent is 27663
子进程3 is running ,parent is 27663
False
主进程   27663 is running ,parent is 25834
Process-1

有人会有疑问,既然join是等待进程结束,那么我像下面join()下去,进程不就变成串行了的吗?

  当然不是了,必须明确join是让谁等?

详细解析如下:
 
进程只要start就会在开始运行了,所以p1-p4.start()时,系统中已经有四个并发的进程了
 
而我们p1.join()是在等p1结束,没错p1只要不结束主线程就会一直卡在原地,这也是问题的关键
 
join是让主线程等,而p1-p4仍然是并发执行的,p1.join的时候,其余p2,p3,p4仍然在运行,等#p1.join结束,可能p2,p3,p4早已经结束了,这样p2.join,p3.join.p4.join直接通过检测,无需等待
 
所以4个join花费的总时间仍然是耗费时间最长的那个进程运行的时间

练习题:改写下面程序,分别实现下述打印效果

例子1:效果一:保证最先输出-------->4

#!/usr/bin/env python
#coding:utf-8


from multiprocessing import Process
import time
import random

def task(n):
    time.sleep(random.randint(1, 3))
    print '-------->%s' % n


if __name__ == '__main__':
    p1 = Process(target=task, args=(1,))
    p2 = Process(target=task, args=(2,))
    p3 = Process(target=task, args=(3,))

    p1.start()
    p2.start()
    p3.start()

    print '-------->4' 

例子2:效果二:保证最后输出-------->4

加上join,即可实现

#!/usr/bin/env python
#coding:utf-8


from multiprocessing import Process
import time
import random

def task(n):
    time.sleep(random.randint(1,3))
    print '-------->%s' % n


if __name__ == '__main__':
    p1 = Process(target=task, args=(1,))
    p2 = Process(target=task, args=(2,))
    p3 = Process(target=task, args=(3,))



    p1.start()
    p2.start()
    p3.start()

    p1.join()
    p2.join()
    p3.join()
    print '-------->4'

例子3:效果三:保证按顺序输出

代码:

 
#!/usr/bin/env python
#coding:utf-8

from multiprocessing import Process import time import random def task(n): time.sleep(random.randint(1,2)) print "---------->%s" % n if __name__ =='__main__': p1 =Process(target=task,args=(1,)) p2 = Process(target=task, args=(2,)) p3 = Process(target=task, args=(3,)) '''这样写 是没有意义的,只是练习一下串行,这样写程序就成穿行的了 因为多进程的目的就是为了实现并发的效果''' p1.start() p1.join() p2.start() p2.join() p3.start() p3.join() print "---------->4"

思考题:判断上述三种效果,哪种属于并发,那种属于串行?

前两种属于并发,第三种属于串行

process对象的其他属性或方法

进程对象的其他方法一:terminate与is_alive

例子1:

#!/usr/bin/env python
#coding:utf-8


import multiprocessing
import time
import threading


def func():
    print('subprocess start',time.asctime(time.localtime(time.time())))
    time.sleep(3)
    print('subprocess end',time.asctime(time.localtime(time.time())))

if __name__ == '__main__':
    p = multiprocessing.Process(target=func, name='1号')
    p.start()
    p.terminate() # #关闭进程,不会立即关闭,所以is_alive立刻查看的结果可能还是存活
    print(p.is_alive())  # 判断这个进程是否存在
    time.sleep(5)
    print('')
    print(p.is_alive()) # 判断这个进程是否存在
    # 默认所有进程结束后,程序才结束
    print('main process end', time.asctime(time.localtime(time.time())), multiprocessing.current_process())


"""
True
主
False
main process end Tue Aug 20 14:14:29 2019 <_MainProcess(MainProcess, started)>
"""

例子2:

#!/usr/bin/env python
#coding:utf-8


from multiprocessing import Process
import time
import random


def task(name):
    print('%s is piaoing' % name)
    time.sleep(random.randrange(1, 5))
    print('%s is piao end' % name)


if __name__ == '__main__':
    p1 = Process(target=task, args=('egon',))
    p1.start()

    p1.terminate()  # 关闭进程,不会立即关闭,所以is_alive立刻查看的结果可能还是存活
    print(p1.is_alive())  # 结果为True
    time.sleep(3)  #如果不加这个的话,下面那条打印存活命令可能还是True
    print('')
    print(p1.is_alive())  # 结果为False

例子3:

#!/usr/bin/env python
#coding:utf-8

from multiprocessing import Process
import time
import random

class Myprocess(Process):
    def __init__(self,person):
        self.name=person
        super(Myprocess,self).__init__()  #python 2
        #super().__init__()  #python 3

    def run(self):
        print('%s正在和网红脸聊天' %self.name)
        time.sleep(random.randrange(1,5))
        print('%s还在和网红脸聊天' %self.name)


p1=Myprocess('哪吒')
p1.start()

p1.terminate()#关闭进程,不会立即关闭,所以is_alive立刻查看的结果可能还是存活
print(p1.is_alive()) #结果为True
time.sleep(3)
print('开始')
print(p1.is_alive()) #结果为False

进程对象的其他属性:name与pid

#!/usr/bin/env python
#coding:utf-8



from multiprocessing import Process
import time
import random


def task(name):
    print('%s is piaoing' % name)
    time.sleep(random.randrange(1, 5))
    print('%s is piao end' % name)


if __name__ == '__main__':
    p1 = Process(target=task, args=('egon',), name='子进程1')  # 可以用关键参数来指定进程名
    p1.start()

    print(p1.name, p1.pid, )
    
"""
子进程1 6704
egon is piaoing
egon is piao end
"""

守护进程

会随着主进程的结束而结束。主进程创建子进程,然后将该进程设置成守护自己的进程,守护进程就好比崇祯皇帝身边的老太监,崇祯皇帝已死老太监就跟着殉葬了。

关于守护进程需要强调两点:

其一:守护进程会在主进程代码执行结束后就终止

其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children

注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止

 如果我们有两个任务需要并发执行,那么开一个主进程和一个子进程分别去执行就ok了,如果子进程的任务在主进程任务结束后就没有存在的必要了,那么该子进程应该在开启前就被设置成守护进程。主进程代码运行结束,守护进程随即终止

守护进程的启动

例子1:

#!/usr/bin/env python
#coding:utf-8


from multiprocessing import Process
import time
import random


def task(name):
    print('%s is piaoing' % name)
    time.sleep(random.randrange(1, 3))
    print('%s is piao end' % name)


if __name__ == '__main__':
    p = Process(target=task, args=('egon',))
    p.daemon = True  # 一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行
    p.start()
    print('')  # 只要终端打印出这一行内容,那么守护进程p也就跟着结束掉了
    
"""
"""

例子2:不能用于Windows

import os
import time
from multiprocessing import Process

class Myprocess(Process):
    def __init__(self,person):
        super().__init__()  #python 3
super(Myprocess,self).__init__() #python 2 self.person = person def run(self): print(os.getpid(),self.name) print('%s正在和女主播聊天' %self.person) p=Myprocess('哪吒') p.daemon=True #一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行 p.start() time.sleep(10) # 在sleep时查看进程id对应的进程ps -ef|grep id print('')

"""
(29577, 'Myprocess-1')
哪吒正在和女主播聊天

"""
 

主进程代码执行结束守护进程立即结束

 思考下面代码的执行结果可能有那些情况,为什么?

例子:不能用于Windows

#!/usr/bin/env python
#coding:utf-8

from multiprocessing import Process
import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")


p1=Process(target=foo)
p2=Process(target=bar)

p1.daemon=True
p1.start()
p2.start()
time.sleep(0.1)
print("main-------")#打印该行则主进程代码结束,则守护进程p1应该被终止.#可能会有p1任务执行的打印信息123,
#因为主进程打印main----时,p1也执行了,但是随即被终止.

"""
123
456
main-------
end456
"""

进程同步(multiprocess.Lock、multiprocess.Semaphore、multiprocess.Event)

互斥锁: multiprocess.Lock

通过刚刚的学习,我们千方百计实现了程序的异步,让多个任务可以同时在几个进程中并发处理,他们之间的运行没有顺序,一旦开启也不受我们控制。尽管并发编程让我们能更加充分的利用IO资源,但是也给我们带来了新的问题。当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题,如下:

#!/usr/bin/env python
#coding:utf-8

# 并发运行,效率高,但竞争同一打印终端,带来了打印错乱
from multiprocessing import Process
import os, time


def work():
    print('%s is running' % os.getpid())
    time.sleep(2)
    print('%s is done' % os.getpid())


if __name__ == '__main__':
    for i in range(3):
        p = Process(target=work)
        p.start()
        
"""
28168 is running
37668 is running
34320 is running
28168 is done
37668 is done
34320 is done
"""

如何控制,就是加锁处理。而互斥锁的意思就是互相排斥,如果把多个进程比喻为多个人,互斥锁的工作原理就是多个人都要去争抢同一个资源:卫生间,一个人抢到卫生间后上一把锁,其他人都要等着,等到这个完成任务后释放锁,其他人才有可能有一个抢到......所以互斥锁的原理,就是把并发改成穿行,降低了效率,但保证了数据安全不错乱

使用锁维护执行顺序

#!/usr/bin/env python
#coding:utf-8

#由并发变成了串行,牺牲了运行效率,但避免了竞争
from multiprocessing import Process,Lock
import os,time
def work(lock):
    lock.acquire() #加锁
    print('%s is running' %os.getpid())
    time.sleep(2)
    print('%s is done' %os.getpid())
    lock.release() #释放锁
if __name__ == '__main__':
    lock=Lock()
    for i in range(3):
        p=Process(target=work,args=(lock,))
        p.start()
        
        
"""
32848 is running
32848 is done
29044 is running
29044 is done
41048 is running
41048 is done
"""

上面这种情况虽然使用加锁的形式实现了顺序的执行,但是程序又重新变成串行了,这样确实会浪费了时间,却保证了数据的安全。   

接下来,我们以模拟抢票为例,来看看数据安全的重要性

多个进程共享同一文件,我们可以把文件当数据库,用多个进程模拟多个人执行抢票任务

#!/usr/bin/env python
#coding:utf-8

# 文件db.txt的内容为:{"count":1},需要事先创建好
# 注意一定要用双引号,不然json无法识别
from multiprocessing import Process
import time, json


def search(name):
    dic = json.load(open('db.txt'))
    time.sleep(1)
    print('33[43m%s 查到剩余票数%s33[0m' % (name, dic['count']))


def get(name):
    dic = json.load(open('db.txt'))
    time.sleep(1)  # 模拟读数据的网络延迟
    if dic['count'] > 0:
        dic['count'] -= 1
        time.sleep(1)  # 模拟写数据的网络延迟
        json.dump(dic, open('db.txt', 'w'))
        print('33[46m%s 购票成功33[0m' % name)


def task(name):
    search(name)
    get(name)


if __name__ == '__main__':
    for i in range(10):  # 模拟并发10个客户端抢票
        name = '<路人%s>' % i
        p = Process(target=task, args=(name,))
        p.start()

并发运行,效率高,但竞争写同一文件,数据写入错乱,只有一张票,卖成功给了10个人

<路人0> 查到剩余票数1
<路人1> 查到剩余票数1
<路人2> 查到剩余票数1
<路人3> 查到剩余票数1
<路人4> 查到剩余票数1
<路人5> 查到剩余票数1
<路人6> 查到剩余票数1
<路人7> 查到剩余票数1
<路人8> 查到剩余票数1
<路人9> 查到剩余票数1
<路人0> 购票成功
<路人4> 购票成功
<路人1> 购票成功
<路人5> 购票成功
<路人3> 购票成功
<路人7> 购票成功
<路人2> 购票成功
<路人6> 购票成功
<路人8> 购票成功
<路人9> 购票成功

加锁处理:购票行为由并发变成了串行,牺牲了运行效率,但保证了数据安全

#!/usr/bin/env python
#coding:utf-8

# 把文件db.txt的内容重置为:{"count":1}
from multiprocessing import Process, Lock
import time, json


def search(name):
    dic = json.load(open('db.txt'))
    time.sleep(1)
    print('33[43m%s 查到剩余票数%s33[0m' % (name, dic['count']))


def get(name):
    dic = json.load(open('db.txt'))
    time.sleep(1)  # 模拟读数据的网络延迟
    if dic['count'] > 0:
        dic['count'] -= 1
        time.sleep(1)  # 模拟写数据的网络延迟
        json.dump(dic, open('db.txt', 'w'))
        print('33[46m%s 购票成功33[0m' % name)


def task(name, lock):
    search(name)
    with lock:  # 相当于lock.acquire(),执行完自代码块自动执行lock.release()
        get(name)


if __name__ == '__main__':
    lock = Lock()
    for i in range(10):  # 模拟并发10个客户端抢票
        name = '<路人%s>' % i
        p = Process(target=task, args=(name, lock))
        p.start()

执行结果:

<路人0> 查到剩余票数1
<路人1> 查到剩余票数1
<路人2> 查到剩余票数1
<路人3> 查到剩余票数1
<路人4> 查到剩余票数1
<路人5> 查到剩余票数1
<路人6> 查到剩余票数1
<路人7> 查到剩余票数1
<路人8> 查到剩余票数1
<路人9> 查到剩余票数1
<路人0> 购票成功

互斥锁与join

使用join可以将并发变成串行,互斥锁的原理也是将并发变成串行,那我们直接使用join就可以了啊,为何还要互斥锁,说到这里我赶紧试了一下

#!/usr/bin/env python
#coding:utf-8

# 把文件db.txt的内容重置为:{"count":1}
from multiprocessing import Process, Lock
import time, json


def search(name):
    dic = json.load(open('db.txt'))
    print('33[43m%s 查到剩余票数%s33[0m' % (name, dic['count']))


def get(name):
    dic = json.load(open('db.txt'))
    time.sleep(1)  # 模拟读数据的网络延迟
    if dic['count'] > 0:
        dic['count'] -= 1
        time.sleep(1)  # 模拟写数据的网络延迟
        json.dump(dic, open('db.txt', 'w'))
        print('33[46m%s 购票成功33[0m' % name)


def task(name, ):
    search(name)
    get(name)


if __name__ == '__main__':
    for i in range(10):
        name = '<路人%s>' % i
        p = Process(target=task, args=(name,))
        p.start()
        p.join()

执行结果

<路人0> 查到剩余票数1
<路人0> 购票成功
<路人1> 查到剩余票数0
<路人2> 查到剩余票数0
<路人3> 查到剩余票数0
<路人4> 查到剩余票数0
<路人5> 查到剩余票数0
<路人6> 查到剩余票数0
<路人7> 查到剩余票数0
<路人8> 查到剩余票数0
<路人9> 查到剩余票数0

发现使用join将并发改成串行,确实能保证数据安全,但问题是连查票操作也变成只能一个一个人去查了,很明显大家查票时应该是并发地去查询而无需考虑数据准确与否,此时join与互斥锁的区别就显而易见了,join是将一个任务整体串行,而互斥锁的好处则是可以将一个任务中的某一段代码串行,比如只让task函数中的get任务串行

def task(name,):
    search(name) # 并发执行
 
    lock.acquire()
    get(name) #串行执行
    lock.release()

总结:

加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。

虽然可以用文件共享数据实现进程间通信,但问题是:

1.效率低(共享数据基于文件,而文件是硬盘上的数据)
2.需要自己加锁处理

因此我们最好找寻一种解决方案能够兼顾:

1、效率高(多个进程共享一块内存的数据)

2、帮我们处理好锁问题。

这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。

队列和管道都是将数据存放于内存中

队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,

我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。

信号量 —— multiprocess.Semaphore(了解)

互斥锁同时只允许一个线程更改数据,而信号量Semaphore是同时允许一定数量的线程更改数据 。
假设商场里有4个迷你唱吧,所以同时可以进去4个人,如果来了第五个人就要在外面等待,等到有人出来才能再进去玩。
实现:

信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞。
这是迪科斯彻(Dijkstra)信号量概念P()和V()的Python实现。信号量同步机制适用于访问像服务器这样的有限资源。 信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念

例子:

from multiprocessing import Process,Semaphore
import time,random

def go_ktv(sem,user):
    sem.acquire()
    print('%s 占到一间ktv小屋' %user)
    time.sleep(random.randint(0,3)) #模拟每个人在ktv中待的时间不同
    sem.release()

if __name__ == '__main__':
    sem=Semaphore(4)
    p_l=[]
    for i in range(13):
        p=Process(target=go_ktv,args=(sem,'user%s' %i,))
        p.start()
        p_l.append(p)

    for i in p_l:
        i.join()
    print('============》')

事件 —— multiprocess.Event(了解)

python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。

事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,
那么event.wait 方法时便不再阻塞。 clear:将“Flag”设置为False set:将“Flag”设置为True

红绿灯实例

from multiprocessing import Process, Event
import time, random


def car(e, n):
    while True:
        if not e.is_set():  # 进程刚开启,is_set()的值是Flase,模拟信号灯为红色
            print('33[31m红灯亮33[0m,car%s等着' % n)
            e.wait()    # 阻塞,等待is_set()的值变成True,模拟信号灯为绿色
            print('33[32m车%s 看见绿灯亮了33[0m' % n)
            time.sleep(random.randint(3, 6))
            if not e.is_set():   #如果is_set()的值是Flase,也就是红灯,仍然回到while语句开始
                continue
            print('车开远了,car', n)
            break


def police_car(e, n):
    while True:
        if not e.is_set():# 进程刚开启,is_set()的值是Flase,模拟信号灯为红色
            print('33[31m红灯亮33[0m,car%s等着' % n)
            e.wait(0.1) # 阻塞,等待设置等待时间,等待0.1s之后没有等到绿灯就闯红灯走了
            if not e.is_set():
                print('33[33m红灯,警车先走33[0m,car %s' % n)
            else:
                print('33[33;46m绿灯,警车走33[0m,car %s' % n)
        break



def traffic_lights(e, inverval):
    while True:
        time.sleep(inverval)
        if e.is_set():
            print('######', e.is_set())
            e.clear()  # ---->将is_set()的值设置为False
        else:
            e.set()    # ---->将is_set()的值设置为True
            print('***********',e.is_set())


if __name__ == '__main__':
    e = Event()
    for i in range(10):
        p=Process(target=car,args=(e,i,))  # 创建是个进程控制10辆车
        p.start()

    for i in range(5):
        p = Process(target=police_car, args=(e, i,))  # 创建5个进程控制5辆警车
        p.start()
    t = Process(target=traffic_lights, args=(e, 10))  # 创建一个进程控制红绿灯
    t.start()

    print('============》')

进程间通信——队列和管道(multiprocess.Queue、multiprocess.Pipe)

IPC(Inter-Process Communication) 

Process之间有时需要通信,操作系统提供了很多机制来实现进程间的通信。

 进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的

队列

创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。

创建队列的类


Queue([maxsize]) 
创建共享的进程队列,Queue是多进程安全的队列,
可以使用Queue实现多进程之间的数据传递。

参数介绍:

maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。
底层队列使用管道和锁定实现

但需要明确:

    1、队列内存放的是消息而非大数据

    2、队列占用的是内存空间,因而maxsize即便是无大小限制也受限于内存大小

方法:

Queue([maxsize]) 
创建共享的进程队列。maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。底层队列使用管道和锁定实现。另外,
还需要运行支持线程以便队列中的数据传输到底层管道中。 Queue的实例q具有以下方法: q.get( [ block [ ,timeout ] ] ) 返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常
  (定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。 q.get_nowait( ) 同q.get(False)方法。 q.put(item [, block [,timeout ] ] ) 将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常
(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。 q.qsize() 返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,
此方法可能引发NotImplementedError异常。 q.empty() 如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。 q.full() 如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)

其他方法(了解)

q.close() 
关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,
将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,如果某个使用者正被阻塞在get()操作上,
关闭生产者中的队列不会导致get()方法返回错误。 q.cancel_join_thread() 不会再进程退出时自动连接后台线程。这可以防止join_thread()方法阻塞。 q.join_thread() 连接队列的后台线程。此方法用于在调用q.close()方法后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。
调用q.cancel_join_thread()方法可以禁止这种行为。

单看队列用法


#!/usr/bin/env python
#coding:utf-8
'''
multiprocessing模块支持进程间通信的两种主要形式:管道和队列
都是基于消息传递实现的,但是队列接口
'''

from multiprocessing import Queue
q=Queue(3)

#put ,get ,put_nowait,get_nowait,full,empty
q.put(3)
q.put(3)
q.put(3)
# q.put(3)   # 如果队列已经满了,程序就会停在这里,等待数据被别人取走,再将数据放入队列。
           # 如果队列中的数据一直不被取走,程序就会永远停在这里。
try:
    q.put_nowait(3) # 可以使用put_nowait,如果队列满了不会阻塞,但是会因为队列满了而报错。
except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去,但是会丢掉这个消息。
    print('队列已经满了')

# 因此,我们再放入数据之前,可以先看一下队列的状态,如果已经满了,就不继续put了。
print(q.full()) #满了

print(q.get())
print(q.get())
print(q.get())
# print(q.get()) # 同put方法一样,如果队列已经空了,那么继续取就会出现阻塞。
try:
    q.get_nowait(3) # 可以使用get_nowait,如果队列满了不会阻塞,但是会因为没取到值而报错。
except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去。
    print('队列已经空了')

print(q.empty()) #空了
 
"""
队列已经满了
True
3
3
3
队列已经空了
True
"""

上面这个例子还没有加入进程通信,只是先来看看队列为我们提供的方法,以及这些方法的使用和现象。

子进程发送数据给父进程


#!/usr/bin/env python
#coding:utf-8


import time#引入一个时间模块 from multiprocessing import Process, Queue#引入一个进程和队列模块 def f(q):#定义一个函数 q.put([time.asctime(), 'from Eva', 'hello']) #调用主函数中p进程传递过来的进程参数 put函数为向队列中添加一条数据。 if __name__ == '__main__':#定义一个函数 q = Queue() #创建一个Queue对象 p = Process(target=f, args=(q,)) #创建一个进程 p.start()#开始进程 print(q.get())#拿出一个 p.join()#感知子进程结束
 
"""
['Tue Aug 20 15:34:43 2019', 'from Eva', 'hello']
"""

上面是一个queue的简单应用,使用队列q对象调用get函数来取得队列中最先进入的数据。 接下来看一个稍微复杂一些的例子:

批量生产数据放入队列再批量获取结果

例子:

 
#!/usr/bin/env python
#coding:utf-8

import os#引入操作系统模块 import time#引入时间模块 import multiprocessing#引入多元进程模块 # 向queue中输入数据的函数 def inputQ(queue):#定义一个函数 info = str(os.getpid()) + '(put):' + str(time.asctime()) queue.put(info) #向队列中放入一个信息 # 向queue中输出数据的函数 def outputQ(queue):#取队列中的数据 info = queue.get()#取信息 print ('%s%s33[32m%s33[0m'%(str(os.getpid()), '(get):',info))#打印这个内容 # Main if __name__ == '__main__':#如果用户名是当前用户名 multiprocessing.freeze_support()# record1 = [] # store input processes record2 = [] # store output processes queue = multiprocessing.Queue(3)#实例化一个队列 # 输入进程 for i in range(10):#循环10个数 process = multiprocessing.Process(target=inputQ,args=(queue,))#创建一个进程 process.start()#开始这个进程 record1.append(process)#添加到列表中 # 输出进程 for i in range(10):#循环10个数 process = multiprocessing.Process(target=outputQ,args=(queue,))#创建一个进程 process.start()#开始进程 record2.append(process)#添加到列表里 for p in record1:#循环这个列表 p.join()#感知子进程结束 for p in record2:#循环这个进程 p.join()#感知子进程结束
 
"""
16132(get):39356(put):Tue Aug 20 15:37:56 2019
15564(get):2372(put):Tue Aug 20 15:37:56 2019
37060(get):20164(put):Tue Aug 20 15:37:56 2019
10652(get):24584(put):Tue Aug 20 15:37:56 2019
35256(get):3836(put):Tue Aug 20 15:37:57 2019
16608(get):7820(put):Tue Aug 20 15:37:57 2019
39380(get):33096(put):Tue Aug 20 15:37:57 2019
4000(get):9524(put):Tue Aug 20 15:37:57 2019
45612(get):8068(put):Tue Aug 20 15:37:57 2019
28896(get):35148(put):Tue Aug 20 15:37:58 2019
"""

生产者消费者模型

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

为什么要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

基于队列实现生产者消费者模型

 例子1:

#!/usr/bin/env python
#coding:utf-8

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=(q,))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,))

    #开始
    p1.start()
    c1.start()
    print('')

此时的问题是主进程永远不会结束,原因是:生产者p在生产完后就结束了,但是消费者c在取空了q之后,则一直处于死循环中且卡在q.get()这一步。

解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就可以break出死循环。

改良版——生产者消费者模型

#!/usr/bin/env python
#coding:utf-8

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到结束信号则结束
        time.sleep(random.randint(1,3))
        print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res))
    q.put(None) #发送结束信号
if __name__ == '__main__':
    q=Queue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=(q,))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,))

    #开始
    p1.start()
    c1.start()
    print('')

注意:结束信号None,不一定要由生产者发,主进程里同样可以发,但主进程需要等生产者结束后才应该发送该信号

主进程在生产者生产完毕后发送结束信号None,代码如下:

#!/usr/bin/env python
#coding:utf-8

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到结束信号则结束
        time.sleep(random.randint(1,3))
        print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))

def producer(q):
    for i in range(2):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=(q,))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,))

    #开始
    p1.start()
    c1.start()

    p1.join()
    q.put(None) #发送结束信号
    print('')

但上述解决方式,在有多个生产者和多个消费者时,我们则需要用一个很low的方式去解决

 多个消费者的例子:有几个消费者就需要发送几次结束信号,代码如下:

#!/usr/bin/env python
#coding:utf-8

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到结束信号则结束
        time.sleep(random.randint(1,3))
        print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))

def producer(name,q):
    for i in range(2):
        time.sleep(random.randint(1,3))
        res='%s%s' %(name,i)
        q.put(res)
        print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=('包子',q))
    p2=Process(target=producer,args=('骨头',q))
    p3=Process(target=producer,args=('泔水',q))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,))
    c2=Process(target=consumer,args=(q,))

    #开始
    p1.start()
    p2.start()
    p3.start()
    c1.start()

    p1.join() #必须保证生产者全部生产完毕,才应该发送结束信号
    p2.join()
    p3.join()
    q.put(None) #有几个消费者就应该发送几次结束信号None
    q.put(None) #发送结束信号
    print('')

其实我们的思路无非是发送结束信号而已,有另外一种队列提供了这种机制 

JoinableQueue

JoinableQueue([maxsize]) 

创建可连接的共享进程队列。这就像是一个Queue对象,但队列允许项目的使用者通知生产者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

 参数介绍:

maxsize是队列中允许最大项数,省略则无大小限制。

 方法介绍

JoinableQueue的实例p除了与Queue对象相同的方法之外,还具有以下方法:

q.task_done() 
使用者使用此方法发出信号,表示q.get()返回的项目已经被处理。如果调用此方法的次数大于从队列中删除的项目数量,将引发ValueError异常。

q.join() 
生产者将使用此方法进行阻塞,直到队列中所有项目均被处理。阻塞将持续到为队列中的每个项目均调用q.task_done()方法为止。 

下面的例子说明如何建立永远运行的进程,使用和处理队列上的项目。生产者将项目放入队列,并等待它们被处理。

基于JoinableQueue队列实现消费之生产者模型

#!/usr/bin/env python
#coding:utf-8


from multiprocessing import Process, JoinableQueue
import time, random, os


def consumer(q):
    while True:
        res = q.get()
        time.sleep(random.randint(1, 3))
        print('33[45m%s 吃 %s33[0m' % (os.getpid(), res))
        q.task_done()  # 向q.join()发送一次信号,证明一个数据已经被取走了


def producer(name, q):
    for i in range(10):
        time.sleep(random.randint(1, 3))
        res = '%s%s' % (name, i)
        q.put(res)
        print('33[44m%s 生产了 %s33[0m' % (os.getpid(), res))
    q.join()  # 生产完毕,使用此方法进行阻塞,直到队列中所有项目均被处理。


if __name__ == '__main__':
    q = JoinableQueue()
    # 生产者们:即厨师们
    p1 = Process(target=producer, args=('包子', q))
    p2 = Process(target=producer, args=('骨头', q))
    p3 = Process(target=producer, args=('泔水', q))

    # 消费者们:即吃货们
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))
    c1.daemon = True
    c2.daemon = True

    # 开始
    p_l = [p1, p2, p3, c1, c2]
    for p in p_l:
        p.start()

    p1.join()
    p2.join()
    p3.join()
    print('')

    # 主进程等--->p1,p2,p3等---->c1,c2
    # p1,p2,p3结束了,证明c1,c2肯定全都收完了p1,p2,p3发到队列的数据
    # 因而c1,c2也没有存在的价值了,不需要继续阻塞在进程中影响主进程了。应该随着主进程的结束而结束,所以设置成守护进程就可以了。

生产者消费者模型总结

程序中有两类角色

一类负责生产数据(生产者)
 
一类负责处理数据(消费者

引入生产者消费者模型为了解决的问题是

平衡生产者与消费者之间的速度差
 
程序解开耦合

如何实现生产者消费者模型

生产者<--->队列<--->消费者

管道(了解)

创建管道的类:

Pipe([duplex]):在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,
强调一点:必须在产生Process对象之前产生管道

参数介绍:

dumplex:默认管道是全双工的,如果将duplex射成False,conn1只能用于接收,conn2只能用于发送。

主要方法:

 conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。
 conn1.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象

其他方法:

conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法
conn1.fileno():返回连接使用的整数文件描述符
conn1.poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。
          如果将timeout射成None,操作将无限期地等待数据到达。 conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。
              如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,
              再也不存在任何数据,将引发EOFError异常。 conn.send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,
              而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收 conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口
            (即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。
            如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。

 pipe初使用

#!/usr/bin/env python
#coding:utf-8



from multiprocessing import Process, Pipe#引入进程模块和管道模块

def f(conn):#定义一个函数
    conn.send("Hello The_Third_Wave")#发送一条信息
    conn.close()#关闭这个进程

if __name__ == '__main__':#如果名字等于当前名称
    parent_conn, child_conn = Pipe()#接收两个参数
    p = Process(target=f, args=(child_conn,))#创建一个进程
    p.start()#启动进程
    print(parent_conn.recv())#接收一个信息
    p.join()#等待进程结束
""" Hello The_Third_Wave """

应该特别注意管道端点的正确管理问题。如果是生产者或消费者中都没有使用管道的某个端点,就应将它关闭。这也说明了为何在生产者中关闭了管道的输出端,在消费者中关闭管道的输入端。如果忘记执行这些步骤,程序可能在消费者中的recv()操作上挂起。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生成EOFError异常。因此,在生产者中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点。

引发EOFError

#!/usr/bin/env python
#coding:utf-8
#引发EOFError
from multiprocessing import Process, Pipe引入进程模块和管道模块

def f(parent_conn,child_conn):#定义一个函数传入两个参数
    #parent_conn.close() #不写close将不会引发EOFError
    while True:#循环为真
        try:#异常处理
            print(child_conn.recv())#打印接收的值
        except EOFError:#万能异常
            child_conn.close()#关闭连接

if __name__ == '__main__':#如果用户名是当前用户名
    parent_conn, child_conn = Pipe()#接受两个参数
    p = Process(target=f, args=(parent_conn,child_conn,))#实例化一个进程
    p.start()#而开始进程
    child_conn.close()#关闭客户端连接
    parent_conn.send('hello')#发送信息
    parent_conn.close()#冠词这个信息
    p.join()#等待进程结束
 

pipe实现生产者消费者模型

#!/usr/bin/env python
#coding:utf-8


from multiprocessing import Process,Pipe

def consumer(p,name):
    produce, consume=p
    produce.close()
    while True:
        try:
            baozi=consume.recv()
            print('%s 收到包子:%s' %(name,baozi))
        except EOFError:
            break

def producer(seq,p):
    produce, consume=p
    consume.close()
    for i in seq:
        produce.send(i)

if __name__ == '__main__':
    produce,consume=Pipe()

    c1=Process(target=consumer,args=((produce,consume),'c1'))
    c1.start()


    seq=(i for i in range(10))
    producer(seq,(produce,consume))

    produce.close()
    consume.close()

    c1.join()
    print('主进程')

多个消费之之间的竞争问题带来的数据不安全问题

#!/usr/bin/env python
#coding:utf-8

from multiprocessing import Process,Pipe,Lock

def consumer(p,name,lock):
    produce, consume=p
    produce.close()
    while True:
        lock.acquire()
        baozi=consume.recv()
        lock.release()
        if baozi:
            print('%s 收到包子:%s' %(name,baozi))
        else:
            consume.close()
            break


def producer(p,n):
    produce, consume=p
    consume.close()
    for i in range(n):
        produce.send(i)
    produce.send(None)
    produce.send(None)
    produce.close()

if __name__ == '__main__':
    produce,consume=Pipe()
    lock = Lock()
    c1=Process(target=consumer,args=((produce,consume),'c1',lock))
    c2=Process(target=consumer,args=((produce,consume),'c2',lock))
    p1=Process(target=producer,args=((produce,consume),10))
    c1.start()
    c2.start()
    p1.start()

    produce.close()
    consume.close()

    c1.join()
    c2.join()
    p1.join()
    print('主进程')

进程之间的数据共享

展望未来,基于消息传递的并发编程是大势所趋

即便是使用线程,推荐做法也是将程序设计为大量独立的线程集合,通过消息队列交换数据。

这样极大地减少了对使用锁定和其他同步手段的需求,还可以扩展到分布式系统中。

但进程间应该尽量避免通信,即便需要通信,也应该选择进程安全的工具来避免加锁带来的问题。

以后我们会尝试使用数据库来解决现在进程之间的数据共享问题。

 Manager模块介绍

进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的
虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.

Manager例子

#!/usr/bin/env python
#coding:utf-8

from multiprocessing import Manager,Process,Lock#引入进程模块,锁模块
def work(d,lock):#定义一个工作方法
    with lock: #不加锁而操作共享的数据,肯定会出现数据错乱
        d['count']-=1

if __name__ == '__main__':#如果用户名等于当前用户名
    lock=Lock()#实例化一个锁
    with Manager() as m:
        dic=m.dict({'count':100})#传入一个字典
        p_l=[]#创建一个空列表
        for i in range(100):#循环100个数
            p=Process(target=work,args=(dic,lock))##创建一个进程
            p_l.append(p)#添加到列表里
            p.start()#开始进程
        for p in p_l:#循环列表
            p.join()#等待进程结束
        print(dic)#打印这个字典

进程数据共享

进程各自持有一份数据,默认无法共享数据

注意:该例子不能在Windows实现:


#!/usr/bin/env python
#coding:utf-8
 
from multiprocessing import Process
from multiprocessing import Manager
 
import time
 
li = []
 
def foo(i):
    li.append(i)
    print 'say hi',li
  
for i in range(10):
    p = Process(target=foo,args=(i,))
    p.start()
     
print 'ending',li

执行结果:

say hi [0]
say hi [1]
say hi [2]
say hi [4]
say hi [5]
say hi [6]
say hi [7]
ending []
say hi [9]
say hi [3]
say hi [8]

要想做到共享数据的方法有2种:

方法1:进程间数据共享方式一shared memory

注意:该例子不能在Windows实现:

from multiprocessing import Process,Array
temp = Array('i', [11,22,33,44])
 
def Foo(i):
    temp[i] = 100+i
    for item in temp:
        print i,'----->',item
 
for i in range(2):
    p = Process(target=Foo,args=(i,))
    p.start()

执行结果:

0 -----> 100
0 -----> 22
0 -----> 33
0 -----> 44
1 -----> 100
1 -----> 101
1 -----> 33
1 -----> 44

例子2:

#!/usr/bin/env python
#coding:utf8

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    a = Process(target=f, args=(num, arr))
    p.start()
    a.start()
    p.join()
    a.join()

    print(num.value)
    print(arr[:])

执行结果:

3.1415927
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

方法2:manage.dict()共享数据

注意:该例子不能在Windows实现:

from multiprocessing import Process,Manager
 
manage = Manager()
dic = manage.dict()
 
def Foo(i):
    dic[i] = 100+i
    print dic.values()
 
for i in range(2):
    p = Process(target=Foo,args=(i,))
    p.start()
    p.join()

执行结果:

[100]
[100, 101]

例子2:

#!/usr/bin/env python
#coding:utf-8


# !/usr/bin/env python
# coding:utf8

from multiprocessing import Process, Manager


def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()


if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))

        p = Process(target=f, args=(d, l))
        p.start()
        p.join()

        print(d)
        print(l)
        
"""
{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
"""

当创建进程时(非使用时),共享数据会被拿到子进程中,当进程中执行完毕后,再赋值给原值。

进程池和multiprocess.Pool模块

进程池

为什么要有进程池?

在程序实际处理问题过程中,忙时会有成千上万的任务需要被执行,闲时可能只有零星任务。那么在成千上万个任务需要被执行的时候,我们就需要去创建成千上万个进程么?首先,创建进程需要消耗时间,销毁进程也需要消耗时间。第二即便开启了成千上万的进程,操作系统也不能让他们同时执行,这样反而会影响程序的效率。因此我们不能无限制的根据任务开启或者结束进程。那么我们要怎么做呢?

在这里,要给大家介绍一个进程池的概念,定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果。

当需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态成生多个进程,但如果是上百甚至上千个目标,手动的去创建进程的工作量巨大,此时就可以用到multiprocessing模块提供的Pool方法。

初始化Pool时,可以指定一个最大进程数,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来执行:

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

进程池中的Queue

如果要使用Pool创建进程,就需要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue(),否则会得到一条如下的错误信息:

RuntimeError: Queue objects should only be shared between processes through inheritance.

multiprocess.Pool模块

Pool([numprocess  [,initializer [, initargs]]]):创建进程池

参数介绍

numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值
initializer:是每个工作进程启动时要执行的可调用对象,默认为None
initargs:是要传给initializer的参数组

主要方法:

p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。从进程池里取一个进程并执行
'''需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,
必须从不同线程调用p.apply()函数或者使用p.apply_async()''' p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。 '''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,
将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。'''
apply_async(func[, args[, kwds]]) : 使⽤⾮阻塞⽅式调⽤func( 并⾏执⾏, 堵塞⽅式必须等待上⼀个进程退出才能执⾏下⼀个进程) , args为
传递给func的参数列表, kwds为传递给func的关键字参数列表;
apply(func[, args[, kwds]]): 使⽤阻塞⽅式调⽤func
close(): 关闭Pool, 使其不再接受新的任务;关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
terminate(): 不管任务是否完成, ⽴即终⽌;
join(): 主进程阻塞, 等待⼦进程的退出, 必须在close或terminate之后使⽤;

 其他方法(了解)

方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具有以下方法
obj.get():返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发一场。如果远程操作中引发了异常,
它将在调用此方法时再次被引发。 obj.ready():如果调用完成,返回True obj.successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常 obj.wait([timeout]):等待结果变为可用。 obj.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数

进程池和多进程效率对比

 对于纯计算型的代码 使用进程池更好

#!/usr/bin/env python
#coding:utf-8

import time import random from multiprocessing import Process,Pool def wahaha(i): print(i*i) # time.sleep(random.randint(1,5)) return i * i * '-' def ret(argv): print(argv) if __name__ == '__main__': start = time.time() #apply_async 异步 p = Pool(5) for i in range(101): # p.apply_async(func=wahaha,args=(i,),callback=ret) #向池中添加新任务 p.apply_async(func=wahaha,args=(i,)) #向池中添加新任务 p.close() #停止往池中添加新任务 p.join() # join依赖close,一个进程池必须先close(完成提交任务)再join print(time.time() - start) start = time.time() #多进程执行 p_lst = [Process(target=wahaha, args=(i,)) for i in range(101)] for p in p_lst:p.start() for p in p_lst:p.join() print( time.time()- start) ======================= 0.6436324119567871 # 8.800149202346802 #

当子进程中高IO(有阻塞)时,反而不如多进程(不科学,相对)

#!/usr/bin/env python
#coding:utf-8

import time import random from multiprocessing import Process,Pool def wahaha(i): print(i*i) time.sleep(random.randint(1,5)) return i * i * '-' def ret(argv): print(argv) if __name__ == '__main__': start = time.time() #apply_async 异步 p = Pool(5) for i in range(101): # p.apply_async(func=wahaha,args=(i,),callback=ret) #向池中添加新任务 p.apply_async(func=wahaha,args=(i,)) #向池中添加新任务 p.close() #停止往池中添加新任务 p.join() # join依赖close,一个进程池必须先close(完成提交任务)再join print(time.time() - start) start = time.time() #多进程执行 p_lst = [Process(target=wahaha, args=(i,)) for i in range(101)] for p in p_lst:p.start() for p in p_lst:p.join() print( time.time()- start) ================================= 59.929256200790405 17.863743543624878

 进程池比起多进程来说 节省了开启进程回收进程资源的时间,给操作系统调度进程降低了难度

同步和异步

进程池的同步调用:apply

#!/usr/bin/env python
#coding:utf-8

import os,time from multiprocessing import Pool def work(n): print('%s run' %os.getpid()) time.sleep(3) return n**2 if __name__ == '__main__': p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务 res_l=[] for i in range(10): res=p.apply(work,args=(i,)) # 同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程中可能有阻塞也可能没有阻塞 # 但不管该任务是否存在阻塞,同步调用都会在原地等着 print(res_l)

进程池的异步调用:apply_async


#!/usr/bin/env python
#coding:utf-8

import os import time import random from multiprocessing import Pool def work(n): print('%s run' %os.getpid()) time.sleep(random.random()) return n**2 if __name__ == '__main__': p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务 res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) # 异步运行,根据进程池中有的进程数,每次最多3个子进程在异步执行 # 返回结果之后,将结果放入列表,归还进程,之后再执行新的任务 # 需要注意的是,进程池中的三个进程不会同时开启或者同时结束 # 而是执行完一个就释放一个进程,这个进程就去接收新的任务。 res_l.append(res) # 异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果 # 否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了 p.close() p.join()
for res in res_l: print(res.get()) #使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get

进程池基础之apply和appy_async方法区别:

p = Pool(5)
p.apply #每一个任务都是排队进行
p.apply_async #每一个任务都并发执行,可设置回调函数

server端:进程池版socket并发聊天

#Pool内的进程数默认是cpu核数,假设为4(查看方法os.cpu_count())
#开启6个客户端,会发现2个客户端处于等待状态
#在每个进程内查看pid,会发现pid使用为4个,即多个客户端公用4个进程
from socket import *
from multiprocessing import Pool
import os

server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)

def talk(conn):
    print('进程pid: %s' %os.getpid())
    while True:
        try:
            msg=conn.recv(1024)
            if not msg:break
            conn.send(msg.upper())
        except Exception:
            break

if __name__ == '__main__':
    p=Pool(4)
    while True:
        conn,*_=server.accept()
        p.apply_async(talk,args=(conn,))
        # p.apply(talk,args=(conn,client_addr)) #同步的话,则同一时间只有一个客户端能访问

client端:

from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))


while True:
    msg=input('>>: ').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))

发现:并发开启多个客户端,服务端同一时间只有4个不同的pid,只能结束一个客户端,另外一个客户端才会进来.

回调函数
需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数

我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。

 使用多进程请求多个url来减少网络等待浪费的时间

from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<进程%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def pasrse_page(res):
    print('<进程%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]
' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    p=Pool(3)
    res_l=[]
    for url in urls:
        res=p.apply_async(get_page,args=(url,),callback=pasrse_page)
        res_l.append(res)

    p.close()
    p.join()
    print([res.get() for res in res_l]) #拿到的是get_page的结果,其实完全没必要拿该结果,该结果已经传给回调函数处理了

'''
打印结果:
<进程3388> get https://www.baidu.com
<进程3389> get https://www.python.org
<进程3390> get https://www.openstack.org
<进程3388> get https://help.github.com/
<进程3387> parse https://www.baidu.com
<进程3389> get http://www.sina.com.cn/
<进程3387> parse https://www.python.org
<进程3387> parse https://help.github.com/
<进程3387> parse http://www.sina.com.cn/
<进程3387> parse https://www.openstack.org
[{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>
...',...}]
'''

爬虫实例

import re
from urllib.request import urlopen
from multiprocessing import Pool

def get_page(url,pattern):
    response=urlopen(url).read().decode('utf-8')
    return pattern,response

def parse_page(info):
    pattern,page_content=info
    res=re.findall(pattern,page_content)
    for item in res:
        dic={
            'index':item[0].strip(),
            'title':item[1].strip(),
            'actor':item[2].strip(),
            'time':item[3].strip(),
        }
        print(dic)
if __name__ == '__main__':
    regex = r'<dd>.*?<.*?class="board-index.*?>(d+)</i>.*?title="(.*?)".*?class="movie-item-info".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p>'
    pattern1=re.compile(regex,re.S)

    url_dic={
        'http://maoyan.com/board/7':pattern1,
    }

    p=Pool()
    res_l=[]
    for url,pattern in url_dic.items():
        res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)
        res_l.append(res)

    for i in res_l:
        i.get()

如果在主进程中等待进程池中所有任务都执行完毕后,再统一处理结果,则无需回调函数

from multiprocessing import Pool
import time,random,os

def work(n):
    time.sleep(1)
    return n**2
if __name__ == '__main__':
    p=Pool()

    res_l=[]
    for i in range(10):
        res=p.apply_async(work,args=(i,))
        res_l.append(res)

    p.close()
    p.join() #等待进程池中所有进程执行完毕

    nums=[]
    for res in res_l:
        nums.append(res.get()) #拿到所有结果
    print(nums) #主进程拿到所有的处理结果,可以在主进程中进行统一进行处理

进程池的其他实现方式:

https://docs.python.org/dev/library/concurrent.futures.html

参考:

https://www.cnblogs.com/wj-1314/p/9039735.html

http://www.cnblogs.com/Eva-J/articles/8253549.html

原文地址:https://www.cnblogs.com/zhongguiyao/p/11049624.html