并发编程(一)

一、 并发储备知识

1、1 什么是进程?

    * 正在执行的的一个过程。进程是对正在运行程序的一个抽象。

1、2 必备的理论基础

# 一、操作系统的作用
1. 隐藏丑陋复杂的硬件接口、提供良好的抽象接口
   2. 管理、调度进程,并且将多个进程对硬件的竞争变得有序
# 二、 多道技术
1. 产生背景:针对单核、实现并发
   2. 空间上的复用: 如内存中同时有多道程序
   3. 时间上的复用: 复用一个cpu的时间片
  强调:遇到io切,占用cpu时间过长也切,核心在于切之前将进程的状态保存下来,这样才能保证下次切回来时能基于上次切走的位置继续运行
# 三、 现代计算机
现在的主机一般是多核,那么每个核都会利用多道技术
   有4个cpu,运行于cpu1的某个进程遇到io阻塞,会等到io结束再重新调度,会被调度到4个
   cpu中的任意一个,具体由操作系统调度算法决定。
   
# 1. 串行
一个任务完完整整地运行完毕后,才能运行下一个任务
# 2. 并发
看起来多个任务是同时运行的即可,单核也可以实现并发
# 3. 并行
真正意义上多个任务的同时运行,只有多核才实现并行

二、 操作系统发展史

2、1 为什么要有操作系统

 

现代的计算机系统主要是由一个或者多个处理器,主存,硬盘,键盘,鼠标,显示器,打印机,网络接口及其他输入输出设备组成。

一般而言,现代计算机系统是一个复杂的系统。

其一:如果每位应用程序员都必须掌握该系统所有的细节,那就不可能再编写代码了(严重影响了程序员的开发效率)

其二:并且管理这些部件并加以优化使用,是一件极富挑战性的工作,于是,计算安装了一层软件(系统软件),称为操作系统。它的任务就是为用户程序提供一个更好、更简单、更清晰的计算机模型,并管理刚才提到的所有设备。

总结:

程序员无法把所有的硬件操作细节都了解到,管理这些硬件并且加以优化使用是非常繁琐的工作,这个繁琐的工作就是操作系统来干的,有了他,程序员就从这些繁琐的工作中解脱了出来,只需要考虑自己的应用软件的编写就可以了,应用软件直接使用操作系统提供的功能来间接使用硬件。

2、2 什么是操作系统

 

精简的说的话,操作系统就是一个协调、管理和控制计算机硬件资源和软件资源的控制程序。操作系统所处的位置如图1

#操作系统位于计算机硬件与应用软件之间,本质也是一个软件。操作系统由操作系统的内核(运行于内核态,管理硬件资源)以及系统调用(运行于用户态,为应用程序员写的应用程序提供系统调用接口)两部分组成,所以,单纯的说操作系统是运行于内核态的,是不准确的。

                                                                  图1

细说的话,操作系统应该分成两部分功能:

#一:隐藏了丑陋的硬件调用接口,为应用程序员提供调用硬件资源的更好,更简单,更清晰的模型(系统调用接口)。应用程序员有了这些接口后,就不用再考虑操作硬件的细节,专心开发自己的应用程序即可。
例如:操作系统提供了文件这个抽象概念,对文件的操作就是对磁盘的操作,有了文件我们无需再去考虑关于磁盘的读写控制(比如控制磁盘转动,移动磁头读写数据等细节),

#二:将应用程序对硬件资源的竞态请求变得有序化
例如:很多应用软件其实是共享一套计算机硬件,比方说有可能有三个应用程序同时需要申请打印机来输出内容,那么a程序竞争到了打印机资源就打印,然后可能是b竞争到打印机资源,也可能是c,这就导致了无序,打印机可能打印一段a的内容然后又去打印c...,操作系统的一个功能就是将这种无序变得有序。

详解:

#作用一:为应用程序提供如何使用硬件资源的抽象
例如:操作系统提供了文件这个抽象概念,对文件的操作就是对磁盘的操作,有了文件我们无需再去考虑关于磁盘的读写控制

注意:
操作系统提供给应用程序的该抽象是简单,清晰,优雅的。为何要提供该抽象呢?
硬件厂商需要为操作系统提供自己硬件的驱动程序(设备驱动,这也是为何我们要使用声卡,就必须安装声卡驱动。。。),厂商为了节省成本或者兼容旧的硬件,它们的驱动程序是复杂且丑陋的
操作系统就是为了隐藏这些丑陋的信息,从而为用户提供更好的接口
这样用户使用的shell,Gnome,KDE看到的是不同的界面,但其实都使用了同一套由linux系统提供的抽象接口


#作用二:管理硬件资源
现代的操作系统运行同时运行多道程序,操作系统的任务是在相互竞争的程序之间有序地控制对处理器、存储器以及其他I/O接口设备的分配。
例如:
同一台计算机上同时运行三个程序,它们三个想在同一时刻在同一台计算机上输出结果,那么开始的几行可能是程序1的输出,接着几行是程序2的输出,然后又是程序3的输出,最终将是一团糟(程序之间是一种互相竞争资源的过程)
操作系统将打印机的结果送到磁盘的缓冲区,在一个程序完全结束后,才将暂存在磁盘上的文件送到打印机输出,同时其他的程序可以继续产生更多的输出结果(这些程序的输出没有真正的送到打印机),这样,操作系统就将由竞争产生的无序变得有序化。

                                                 图 2

2、3 操作系统与普通软件的区别

 

1.主要区别是:你不想用暴风影音了你可以选择用迅雷播放器或者干脆自己写一个,但是你无法写一个属于操作系统一部分的程序(时钟中断处理程序),操作系统由硬件保护,不能被用户修改。

2.操作系统与用户程序的差异并不在于二者所处的地位。特别地,操作系统是一个大型、复杂、长寿的软件,

  • 大型:linux或windows的源代码有五百万行数量级。按照每页50行共1000行的书来算,五百万行要有100卷,要用一整个书架子来摆置,这还仅仅是内核部分。用户程序,如GUI,库以及基本应用软件(如windows Explorer等),很容易就能达到这个数量的10倍或者20倍之多。

  • 长寿:操作系统很难编写,如此大的代码量,一旦完成,操作系统所有者便不会轻易扔掉,再写一个。而是在原有的基础上进行改进。(基本上可以把windows95/98/Me看出一个操作系统,而windows NT/2000/XP/Vista则是两位一个操作系统,对于用户来说它们十分相似。还有UNIX以及它的变体和克隆版本也演化了多年,如System V版,Solaris以及FreeBSD等都是Unix的原始版,不过尽管linux非常依照UNIX模式而仿制,并且与UNIX高度兼容,但是linux具有全新的代码基础)

2、4 操作系统发展史

 

第一代计算机(1940~1955):真空管和插件板

第一代计算机的产生背景:

第一代之前人类是想用机械取代人力,第一代计算机的产生是计算机由机械时代进入电子时代的标志,从Babbage失败之后一直到第二次世界大战,数字计算机的建造几乎没有什么进展,第二次世界大战刺激了有关计算机研究的爆炸性进展。

lowa州立大学的john Atanasoff教授和他的学生Clifford Berry建造了据认为是第一台可工作的数字计算机。该机器使用300个真空管。大约在同时,Konrad Zuse在柏林用继电器构建了Z3计算机,英格兰布莱切利园的一个小组在1944年构建了Colossus,Howard Aiken在哈佛大学建造了Mark 1,宾夕法尼亚大学的William Mauchley和他的学生J.Presper Eckert建造了ENIAC。这些机器有的是二进制的,有的使用真空管,有的是可编程的,但都非常原始,设置需要花费数秒钟时间才能完成最简单的运算。

在这个时期,同一个小组里的工程师们,设计、建造、编程、操作及维护同一台机器,所有的程序设计是用纯粹的机器语言编写的,甚至更糟糕,需要通过成千上万根电缆接到插件板上连成电路来控制机器的基本功能。没有程序设计语言(汇编也没有),操作系统则是从来都没听说过。使用机器的过程更加原始,详见下‘工作过程’。

特点:

没有操作系统的概念 所有的程序设计都是直接操控硬件

工作过程:

程序员在墙上的机时表预约一段时间,然后程序员拿着他的插件版到机房里,将自己的插件板街道计算机里,这几个小时内他独享整个计算机资源,后面的一批人都得等着(两万多个真空管经常会有被烧坏的情况出现)。

后来出现了穿孔卡片,可以将程序写在卡片上,然后读入机而不用插件板

优点:

程序员在申请的时间段内独享整个资源,可以即时地调试自己的程序(有bug可以立刻处理)

缺点:

浪费计算机资源,一个时间段内只有一个人用。 注意:同一时刻只有一个程序在内存中,被cpu调用执行,比方说10个程序的执行,是串行的

第二代计算机(1955~1965):晶体管和批处理系统

 

第二代计算机的产生背景:

 

由于当时的计算机非常昂贵,自认很自然的想办法较少机时的浪费。通常采用的方法就是批处理系统。

特点: 设计人员、生产人员、操作人员、程序人员和维护人员直接有了明确的分工,计算机被锁在专用空调房间中,由专业操作人员运行,这便是‘大型机’。

有了操作系统的概念

有了程序设计语言:FORTRAN语言或汇编语言,写到纸上,然后穿孔打成卡片,再讲卡片盒带到输入室,交给操作员,然后喝着咖啡等待输出接口

工作过程:插图

 

第二代如何解决第一代的问题/缺点:

1.把一堆人的输入攒成一大波输入, 2.然后顺序计算(这是有问题的,但是第二代计算也没有解决) 3.把一堆人的输出攒成一大波输出

现代操作系统的前身:(见图)

优点:

批处理,节省了机时

缺点:

1.整个流程需要人参与控制,将磁带搬来搬去(中间俩小人)

2.计算的过程仍然是顺序计算-》串行

3.程序员原来独享一段时间的计算机,现在必须被统一规划到一批作业中,等待结果和重新调试的过程都需要等同批次的其他程序都运作完才可以(这极大的影响了程序的开发效率,无法及时调试程序)

第三代计算机(1965~1980):集成电路芯片和多道程序设计

第三代计算机的产生背景:

20世纪60年代初期,大多数计算机厂商都有两条完全不兼容的生产线。

一条是面向字的:大型的科学计算机,如IBM 7094,见上图,主要用于科学计算和工程计算

另外一条是面向字符的:商用计算机,如IBM 1401,见上图,主要用于银行和保险公司从事磁带归档和打印服务

开发和维护完全不同的产品是昂贵的,同时不同的用户对计算机的用途不同。

IBM公司试图通过引入system/360系列来同时满足科学计算和商业计算,360系列低档机与1401相当,高档机比7094功能强很多,不同的性能卖不同的价格

360是第一个采用了(小规模)芯片(集成电路)的主流机型,与采用晶体管的第二代计算机相比,性价比有了很大的提高。这些计算机的后代仍在大型的计算机中心里使用,此乃现在服务器的前身,这些服务器每秒处理不小于千次的请求。

如何解决第二代计算机的问题1:

卡片被拿到机房后能够很快的将作业从卡片读入磁盘,于是任何时刻当一个作业结束时,操作系统就能将一个作业从磁带读出,装进空出来的内存区域运行,这种技术叫做 同时的外部设备联机操作:SPOOLING,该技术同时用于输出。当采用了这种技术后,就不在需要IBM1401机了,也不必将磁带搬来搬去了(中间俩小人不再需要)

如何解决第二代计算机的问题2:

第三代计算机的操作系统广泛应用了第二代计算机的操作系统没有的关键技术:多道技术

cpu在执行一个任务的过程中,若需要操作硬盘,则发送操作硬盘的指令,指令一旦发出,硬盘上的机械手臂滑动读取数据到内存中,这一段时间,cpu需要等待,时间可能很短,但对于cpu来说已经很长很长,长到可以让cpu做很多其他的任务,如果我们让cpu在这段时间内切换到去做其他的任务,这样cpu不就充分利用了吗。这正是多道技术产生的技术背景

多道技术:

多道技术中的多道指的是多个程序,多道技术的实现是为了解决多个程序竞争或者说共享同一个资源(比如cpu)的有序调度问题,解决方式即多路复用,多路复用分为时间上的复用和空间上的复用。

空间上的复用:将内存分为几部分,每个部分放入一个程序,这样,同一时间内存中就有了多道程序。

时间上的复用:当一个程序在等待I/O时,另一个程序可以使用cpu,如果内存中可以同时存放足够多的作业,则cpu的利用率可以接近100%,类似于我们小学数学所学的统筹方法(操作系统采用了多道技术后,可以控制进程的切换,或者说进程之间去争抢cpu的执行权限。这种切换不仅会在一个进程遇到 io 时进行,一个进程占用cpu时间过长也会切换,或者说被操作系统夺走cpu的执行权限)

###ps
现代计算机或者网络都是多用户的,多个用户不仅共享硬件,而且共享文件,数据库等信息,共享意味着冲突和无序。

操作系统主要使用来

1.记录哪个程序使用什么资源

2.对资源请求进行分配

3.为不同的程序和用户调解互相冲突的资源请求。

我们可将上述操作系统的功能总结为:处理来自多个程序发起的多个(多个即多路)共享(共享即复用)资源的请求,简称多路复用

多路复用有两种实现方式

1.时间上的复用

当一个资源在时间上复用时,不同的程序或用户轮流使用它,第一个程序获取该资源使用结束后,在轮到第二个。。。第三个。。。

例如:只有一个cpu,多个程序需要在该cpu上运行,操作系统先把cpu分给第一个程序,在这个程序运行的足够长的时间(时间长短由操作系统的算法说了算)或者遇到了I/O阻塞,操作系统则把cpu分配给下一个程序,以此类推,直到第一个程序重新被分配到了cpu然后再次运行,由于cpu的切换速度很快,给用户的感觉就是这些程序是同时运行的,或者说是并发的,或者说是伪并行的。至于资源如何实现时间复用,或者说谁应该是下一个要运行的程序,以及一个任务需要运行多长时间,这些都是操作系统的工作。

2.空间上的复用

每个客户都获取了一个大的资源中的一小部分资源,从而减少了排队等待资源的时间。

例如:多个运行的程序同时进入内存,硬件层面提供保护机制来确保各自的内存是分割开的,且由操作系统控制,这比一个程序独占内存一个一个排队进入内存效率要高的多。

有关空间复用的其他资源还有磁盘,在许多系统中,一个磁盘同时为许多用户保存文件。分配磁盘空间并且记录谁正在使用哪个磁盘块是操作系统资源管理的典型任务。

这两种方式合起来便是多道技术

空间上的复用最大的问题是:程序之间的内存必须分割,这种分割需要在硬件层面实现,由操作系统控制。如果内存彼此不分割,则一个程序可以访问另外一个程序的内存,

首先丧失的是安全性,比如你的qq程序可以访问操作系统的内存,这意味着你的qq可以拿到操作系统的所有权限。

其次丧失的是稳定性,某个程序崩溃时有可能把别的程序的内存也给回收了,比方说把操作系统的内存给回收了,则操作系统崩溃。

第三代计算机的操作系统仍然是批处理

许多程序员怀念第一代独享的计算机,可以即时调试自己的程序。为了满足程序员们很快可以得到响应,出现了分时操作系统

如何解决第二代计算机的问题3:

分时操作系统: 多个联机终端+多道技术

20个客户端同时加载到内存,有17在思考,3个在运行,cpu就采用多道的方式处理内存中的这3个程序,由于客户提交的一般都是简短的指令而且很少有耗时长的,索引计算机能够为许多用户提供快速的交互式服务,所有的用户都以为自己独享了计算机资源

CTTS:麻省理工(MIT)在一台改装过的7094机上开发成功的,CTSS兼容分时系统,第三代计算机广泛采用了必须的保护硬件(程序之间的内存彼此隔离)之后,分时系统才开始流行

MIT,贝尔实验室和通用电气在CTTS成功研制后决定开发能够同时支持上百终端的MULTICS(其设计者着眼于建造满足波士顿地区所有用户计算需求的一台机器),很明显真是要上天啊,最后摔死了。

后来一位参加过MULTICS研制的贝尔实验室计算机科学家Ken Thompson开发了一个简易的,单用户版本的MULTICS,这就是后来的UNIX系统。基于它衍生了很多其他的Unix版本,为了使程序能在任何版本的unix上运行,IEEE提出了一个unix标准,即posix(可移植的操作系统接口Portable Operating System Interface)

后来,在1987年,出现了一个UNIX的小型克隆,即minix,用于教学使用。芬兰学生Linus Torvalds基于它编写了Linux

三、进程概述

3、1 进程和程序的区别

    * 正在进行的一个过程或者说是一个任务,而程序仅仅只是一堆代码

强调: 同一个程序执行两次,那也是两个进程,比如打开暴风影音,虽然是同一个软件,但是一个可以播放向往的生活,一个可以播放极限挑战。

3、2 进程的创建(了解)

但凡是硬件,都需要有操作系统去管理,只要有操作系统,就有进程的概念,就需要有创建进程的方式,一些操作系统只为一个应用程序设计,比如微波炉中的控制器,一旦启动微波炉,所有的进程都已经存在。

而对于通用系统(跑很多应用程序),需要有系统运行过程中创建或撤销进程的能力,主要分为4种形式创建的新进程

1. **系统初始化**(查看进程 Linux中用ps命令,windows中用任务管理器,前台进程负责与用户交互,后台运行的进程与用户无关,运行在后台并且只在需要时才唤醒的进程,称为守护进程,如电子邮件、web页面、新闻、打印)
  1. 一个进程在运行过程中开启了子进程(如nginx开启多进程、os.fork、subprocess.Popen等)

  2. 用户的交互式请求,而创建一个新进程(如用户双击暴风影音)

  3. 一个批处理作业的初始化(只在大型机的批处理系统中应用)

无论哪一种,新进程的创建都是由一个已存在的进程执行了一个用于创建进程的系统调用而创建的:

    1.  在UNIX中该系统调用是:fork,fork会创建一个与父进程一模一样的副本,二者有相同的存储映像、同样的环境字符串和同样的打开文件(在shell解释器进程中。执行一个命令就会创建一个子进程)
  1. 在windows中该系统调用是:CreateProcess,CreateProcess及处理进程的创建,也负责把正确的程序装进新进程。

3、3 进程的三个基本状态

进程执行时的间断性,决定了进程可能具有多种状态。事实上,运行中的进程可能具有以下三种基本状态。

就绪状态(Ready):

进程已获得除处理器外的所需资源,等待分配处理器资源;只要分配了处理器进程就可执行。就绪进程可以按多个优先级划分队列。例如,当一个进程由于时间片用完而进入就绪状态时,排入低优先级队列;当进程有I/O操作完成而进入就绪状态时,排入搞优先级队列。

运行状态(Running):

进程占用处理器资源;处于此状态的进程的数目小于等于处理器的数目。在没有其他进程可以执行时(如所有进程都在阻塞状态),通常会自动执行系统的空闲进程。

阻塞状态(Blocked):

由于进程等待某种条件(如I/O操作或进程同步),在条件满足之前无法执行。该事件发生前即使吧处理器资源分配给该进程,也无法运行。

    * 尽量减少阻塞状态可以提升我们程序运行的效率

3、4 进程的终止

1. 正常退出(自愿,如用户点击交互式页面的叉号,或程序执行完毕调用发起系统调用正常退出,在Linux中用exit,在Windows中用ExitProcess)
  1. 出错退出(自愿,Python a.py 中a.py不存在)

  2. 严重错误(非自愿,执行非法指令,如引用不存在的内存,I/O等,可以捕捉异常,try...excepe...)

  3. 被其他进程杀死(非自愿,如kill-9)

3、5 进程实现并发(了解)

进程并发 = 保存状态+切换

进程并发的实现在于,硬件中断一个正在运行的进程,把此时进程运行的所有状态保存下来,为此,操作系统维护一张表格,即进程表(process table),每个进程占用一个进程表项(这些表项也称为进程控制块)

3、6 进程的层次结构(了解)

无论UNIX还是windows,进程只有一个父进程,不同的是:

1. 在UNIX中所有的进程,都是以init进程为根,组成树形结构。父子进程共同组成一个进程组,这样,当从键盘发出一个信号时,该信号被送给当前与键盘相关的进程组中的所有成员。
  1. 在Windows中,没有进程层次的概念,所有的进程都是地位相同,唯一类似于进程层次的暗示,是在创建进程时,父进程得到一个特别的令牌(称为句柄),该句柄可以用来控制子进程,但是父进程有权把该句柄传给其他子进程,这样就没有层次了。

 

四、多进程

4、1 multiprocessing模块

Python中的多线程无法利用多核优势,如果想要充分的利用多核cpu的资源(os.cpu_count()查看)。在Python中大部分情况需要使用多进程。Python提供了multiprocessing。

multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似。

multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供Process、Queue、Pipe、Lock等组件。

需要再次强调的一点是:与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。

4、2 Process类的介绍

创建进程的类:

Process([group[,target[,name[,args[,kwargs]]]]]),由该类实例化得到对象,表示一个子进程中的任务(尚未启动)
强调:
1. 需要使用关键字的方式来指定参数
2. args指定的传给target函数的位置参数,是一个元祖形式,必须有逗号

参数介绍:

group参数未使用,值始终为None
target表示调用对象,即子进程要执行的任务
kwargs表示调用对象的字典,kwargs={'name':'egon','age':'18'}
name为子进程的名称

方法介绍:

p.start():启动进程,并调用该子进程中的p.run()
p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定 要实现该方法
p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁。
p.is_alive():如果p任然运行,返回True
   
p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程。

属性介绍:

  1. p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止。并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置

  2. p.name:进程的名称

  3. p.pid:进程的pid

  4. p.exitcode():进程在运行是为None,如果为-N,表示被信号N结束(了解即可)

  5. p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只在具有相同的身份验证时才能成功(了解即可)

注意:在windows中Process()必须放到#if name == 'main':下

4、3 创建子进程的两种方式

第一种

from multiprocessing import Process
import time

def task(n):
   print('子进程开始')
   time.sleep(n)
   print('子进程结束')

if __name__ == '__main__':
   p = Process(target=task,args=(1,))
   p.start()  # p.start 只是向操作系统发了一个信号,请求开启子进程,操作系统具体什么时候开,开多长时间你控制不了。
   time.sleep(5) #
   print('我是主进程')

第二种

from multiprocessing import Process
import time


class MyProcesszzz(Process):
   def __init__(self,x):# 如果不传参没必要重写init
       super().__init__()
       self.n = x

   def run(self):
       print('我是子进程 start')
       time.sleep(self.n)
       print('我是子进程 end')


if __name__ == '__main__':
   p1 = MyProcesszzz(2)
   p1.start()

   print('我是主进程')

4、4 僵尸进程孤儿进程(了解)

4、4、1 僵尸进程

父进程一直不死不停地造子进程并且不回收僵尸进程(有害)

一个进程使用fork创建子进程,如果子进程退出,而父进程并没有调用wait或waitpid获取子进程的状态信息,那么子进程的进程描述符仍然保存在系统中。这种进程称之为僵尸进程。 因此,UNⅨ提供了一种机制可以保证父进程可以在任意时刻获取子进程结束时的状态信息: 1、在每个进程退出的时候,内核释放该进程所有的资源,包括打开的文件,占用的内存等。但是仍然为其保留一定的信息(包括进程号the process ID,退出状态the termination status of the process,运行时间the amount of CPU time taken by the process等) 2、直到父进程通过wait / waitpid来取时才释放. 但这样就导致了问题,如果进程不调用wait / waitpid的话,那么保留的那段信息就不会释放,其进程号就会一直被占用,但是系统所能使用的进程号是有限的,如果大量的产生僵死进程,将因为没有可用的进程号而导致系统不能产生新的进程. 此即为僵尸进程的危害,应当避免。 任何一个子进程(init除外)在exit()之后,并非马上就消失掉,而是留下一个称为僵尸进程(Zombie)的数据结构,等待父进程处理。这是每个子进程在结束时都要经过的阶段。如果子进程在exit()之后,父进程没有来得及处理,这时用ps命令就能看到子进程的状态是“Z”。如果父进程能及时 处理,可能用ps命令就来不及看到子进程的僵尸状态,但这并不等于子进程不经过僵尸状态。 如果父进程在子进程结束之前退出,则子进程将由init接管。init将会以父进程的身份对僵尸状态的子进程进行处理。

4、4、2 孤儿进程

一个父进程退出,而它的一个或多个子进程还在运行,那么那些子进程将成为孤儿进程。孤儿进程将被init进程(进程号为1)所收养,并且init进程对它们完成状态收集工作。

孤儿进程是没有父进程的进程,孤儿进程这个重任就落到了init进程身上,init进程就好像是一个民政局,专门负责处理孤儿进程的善后工作。每当出现一个孤儿进程的时候,内核就把孤 儿进程的父进程设置为init,而init进程会循环地wait()它的已经退出的子进程。这样,当一个孤儿进程凄凉地结束了其生命周期的时候,init进程就会代表党和政府出面处理它的一切善后工作。因此孤儿进程并不会有什么危害。

主进程是什么结束的呢?

论述可能出现的情况:

情况一

1 父进程正常回收子进程 (无论父进程结束自动回收和手动回收僵尸进程均无害)

情况二

2 父进程先死了,出现了孤儿进程(可能父进程先死了 ,子进程没有死 ,子进程编程了孤儿进程由init管理回收,无害。)

情况三

3 父进程一直不死不断开启子进程,并且不手动回收。(有害)

父进程一直不死不断开启子进程,一直不发起回收僵尸进程的情况,父进程不死,init不会帮父进程回收,产生了大量的僵尸进程无人回收,占用了大量的pid,会导致其他程序无pid可用。

如何解决情况三 ?

直接杀死父进程,所有的子进程均变为子进程均变为孤儿进程,统一由init回收。

4、5 证明进程内存空间隔离

# 一个进程修改代码,肯定生效
x = 0
def task():
   global x
   x = 50
   print(x)

task()
print(x)

 

from multiprocessing import Process
# 一个进程修改代码,肯定生效
x = 0
def task():
   global x # 子进程修改的是自己的名称空间里的x,与主进程无关。
   x = 50
   print(f'子进程的x:{x}')
if __name__ == '__main__':    
   p = Process(target=task)
   p.start()
   print(f'主进程的x:{x}')

 

4、6 Process的join方法

引入:

from multiprocessing import Process
import time
def task():
   print('子进程 开始')
   time.sleep(2)
   print('子进程 结束')
if __name__ == '__main__':

   p = Process(target=task)
   p.start()
   time.sleep(5) # 主进程io了5s钟,子进程已经结束
   print('主线程')
   
   '''输出:
   
  子进程 开始
  子进程 结束
  主线程
   
  '''

   # 思考:有没有一种只智能的方式可以动态的等待子进程结束?

1 join具体用法

from multiprocessing import Process
import time
def task():
   print('子进程 开始')
   time.sleep(5)
   print('子进程 结束')
if __name__ == '__main__':
   start_time = time.time() # 记录开始时间
   p = Process(target=task)
   p.start()
   p.join() # 如遇join会阻塞该子进程,直到该子进程结束。并且join调用了wait方法释放僵尸进程。
   end_time = time.time() # 记录结束时间
   print(end_time-start_time) # 计算时间差
   print('主线程')
   
   '''输出:
   
  子进程 开始
  子进程 结束
  5.051951885223389
   
  主线程
  '''

2 思考:如果有多个join是否是串行了?

from multiprocessing import Process
import time
def task(n):
   print('子进程 开始')
   time.sleep(n)
   print('子进程 结束')
if __name__ == '__main__':
   start_time = time.time() # 记录开始时间
   p1 = Process(target=task,args=(2,))
   p2 = Process(target=task,args=(4,))
   p3 = Process(target=task,args=(6,))

   p1.start()
   p2.start()
   p3.start()
   # 换顺序也是一样的也是按照时长最长的那个计算。
   p1.join() # 等待2s
   p2.join() # 等待2s
   p3.join() # 等待2s
   end_time = time.time() # 记录结束时间
   print(end_time-start_time) # 计算时间差
   print('主线程')



   '''输出:
   
  子进程 开始
  子进程 开始
  子进程 开始
  子进程 结束
  子进程 结束
  子进程 结束
  7.566513776779175
  主线程
   
  '''

3 join串行的情况

from multiprocessing import Process
import time
def task(n):
   print('子进程 开始')
   time.sleep(n)
   print('子进程 结束')
if __name__ == '__main__':
   start_time = time.time() # 记录开始时间
   p1 = Process(target=task,args=(2,))
   p2 = Process(target=task,args=(4,))
   p3 = Process(target=task,args=(6,))

   p1.start()
   p1.join()
   p2.start()
   p2.join()
   p3.start()
   p3.join()
   end_time = time.time() # 记录结束时间
   print(end_time-start_time) # 计算时间差
   print('主线程')

   '''输出:
   
  子进程 开始
  子进程 结束
  子进程 开始
  子进程 结束
  子进程 开始
  子进程 结束
  15.407774925231934
  主线程
   
  '''
   # ps:反而不如不开进程,正常调用三次来的快。

4 精炼代码:

from multiprocessing import Process
import time
def task(n):
   print('子进程 开始')
   time.sleep(n)
   print('子进程 结束')
if __name__ == '__main__':
   start_time = time.time() # 记录开始时间
   task_list = []
   for i in range(1,4):
       p = Process(target=task,args=(i,))
       p.start()
       task_list.append(p)
   print(task_list) # [<Process(Process-1, started)>, <Process(Process-2, started)>, <Process(Process-3, started)>]
   for i in task_list:
       i.join()
   end_time = time.time() # 记录结束时间
   print(end_time-start_time) # 计算时间差 4.764175891876221
   print('主线程')

4、7 Process的其他用法(了解)

pid用法:

'''
  在当前进程查看当前进程pid
      os.getpid()
      current_process().pid
  在当前进程查看子进程pid
      子进程对象.pid()
  在当前进程查看父进程pid
      os.getppid()  
'''

from multiprocessing import Process,current_process
import time,os
def task(x):
   print('进程开始')
   print('子进程对象的pid:',os.getpid()) # 在子进程中查看自己的pid
   print('子进程对象的pid:',current_process().pid) # 在子进程中查看自己的pid
   print('子进程对象的父进程对象的pid:',os.getppid()) # 查看父进程对象的pid
   time.sleep(100) # 注意要保证子进程不死,方便在cmd下测试。
   print('进程结束')

if __name__ == '__main__':
   p = Process(target=task,args=('子进程',))
   p.start()
   print('在父进程中查看子进程的pid',p.pid) # 在父进程中查看子进程的pid
   print('主进程自己查看自己的pid:',os.getpid()) # 父进程查看自己的pid
   print('主进程自己查看自己的pid:',current_process().pid) # 父进程查看自己的pid

name的用法:

from multiprocessing import Process,current_process

def task():
   print(current_process().name) # 在子进程查看自己的name

if __name__ == '__main__':
   p = Process(target=task)
   p2 = Process(target=task)
   p3 = Process(target=task,name='rocky') # 已定义name属性为rocky
   p.start()
   p2.start()
   p3.start()
   print(p.name) #Process-1
   print(p2.name) #Process-2
   print(p3.name) #rocky

is_alive用法:

from multiprocessing import Process,current_process
import time
def task():
   print('子进程开始')
   time.sleep(1)
   print('子进程结束')

if __name__ == '__main__':
   p = Process(target=task)
   p.start()
   # p.terminate() #发送一个指令给操作系统但是不会立即结束子进程
   # time.sleep(1)
   print(p.is_alive()) # True 判断子进程代码是否结束
   time.sleep(3)
   print(p.is_alive()) # Fa

4、8 守护进程

主进程创建守护进程

  其一:守护进程会在主进程代码执行结束后就终止

  其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children

例一:主进程代码运行完,守护进程立即结束

from multiprocessing import Process
import time
def foo():
   print('守护进程开始')
   time.sleep(5)
   print('守护进程开始')


if __name__ == '__main__':
   p1 = Process(target=foo)
   p1.daemon = True # 一定要凡在start之前,表示设置为一个守护进程。
   p1.start()
   print('主进程')
   
   '''输出:
  主进程
  '''
   # 守护进程一旦发现主进程代码运行完,立刻结束,并不会管自己的进程是否运行完

例二 主进程代码运行完后等在子进程运行阶段,线程不会参与守护

from multiprocessing import Process

import time
def foo():
   print('守护进程开始')
   time.sleep(5)
   print('守护进程结束')
def task():
   print('子进程开始')
   time.sleep(3)
   print('子进程开始')

if __name__ == '__main__':
   p1 = Process(target=foo)
   p2 = Process(target=task)
   p1.daemon = True # 一定要凡在start 之前
   p1.start()
   p2.start()
   # p2 = Process(target=task)
   print('主进程')

   '''输出:
  主进程
  子进程开始
  子进程开始
  '''
   #分析 守护进程在运行完主进程最后一行代码就结束,但是主进程并没有结束,主进程在等待子进程运行结束.

什么情况适合用守护进程?

当主进程代码结束,该子进程再执行无意义的情况可以用守护进程。

4、9 进程同步(锁)

进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的。

而共享带来的是竞争,竞争带来的结果就是错乱,而我们控制的方法,就是加锁处理。

多个进程打印同一终端

# 并发运行,效率高,但竞争同一打印终端,带来了打印错乱
from multiprocessing import Process
import os, time
def task():
   print('%s is running' % os.getpid())
   time.sleep(2)
   print('%s is done' % os.getpid())

if __name__ == '__main__':
   for i in range(3):
       p = Process(target=task)
       p.start()

运行结果:

6688 is running 4880 is running 1996 is running 6688 is done 4880 is done 1996 is done

# 并发运行变成串行,牺牲了运行效率,但避免了竞争
from multiprocessing import Process,Lock
import os, time
def task():
   lock.acquire()
   print('%s is running' % os.getpid())
   time.sleep(2)
   print('%s is done' % os.getpid())
   lock.release()
if __name__ == '__main__':
   lock = Lock()
   for i in range(3):
       p = Process(target=task)
       p.start()

运行结果:

9088 is running 9088 is done 5460 is running 5460 is done 3892 is running 3892 is done

多个进程共享同一个文件

文件当数据库,模拟抢票

# 并发运行变成串行,牺牲了运行效率,但避免了竞争
from multiprocessing import Process, Lock
import os, time
import json
def search():
   with open('db.txt', encoding='utf-8')as f:
       data = json.load(f)
       print(f'还剩{data["count"]}张票')
def get():
   with open('db.txt', encoding='utf-8')as f:
       data = json.load(f)
   time.sleep(1)
   if data['count'] > 0:
       print(f'还剩{data["count"]}张票')
       print(f'{os.getpid()},抢到1张票')
       time.sleep(1)
       with open('db.txt', 'w', encoding='utf-8')as f:
           data['count'] -= 1
           json.dump(data, f)
   else:
       print('票已售完')
def task(lock):
   search()
   lock.acquire()
   get()
   lock.release()

if __name__ == '__main__':
   lock = Lock()
   for i in range(1, 5):#模拟5人抢票
       p = Process(target=task, args=(lock,))
       p.start()

运行结果:

还剩2张票 还剩2张票 还剩2张票 还剩2张票 还剩2张票 10012,抢到1张票 还剩1张票 6552,抢到1张票 票已售完 票已售完

4、10 队列(推荐使用)

#加锁可以保证多个进程修改同一块数据时,同一时间只有一个任务可以进行修改,即串行的修改,这样虽然牺牲了速率但保证了数据的安全
虽然可以用文件共享数据实现进程间通信,但问题是:
1. 效率低(共享数据基于文件,而文件是硬盘上的数据)
2. 需要自己加锁处理

# 因此我们最好找寻一种解决方案能够兼顾:
1、效率高(多个进程共享一块内存的数据)
2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。

理由:
1 队列和管道都是将数据存放于内存中
2 队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,
我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。

进程之间彼此相互隔离,要实现进程间相互通信(IPC), multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的

创建队列的类(底层就是以管道和锁定的方式实现):

1 Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。

参数介绍:

1 maxsize是队列中允许最大项数,省略则无大小限制。

方法介绍:

主要方法:

1 q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue。Full异常。如果blocked为False,但该Queue已满时,会立即抛出Queue.Full异常。
2 q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常。
3 q.get_nowait():同q.get(False)
4 q.put_nowait():同q,put(False)
5 q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
6 q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
7 q.size():返回对列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样

 

其他方法(了解):

1 q.cancel_join_thread():不会在进程退出时自动连接后台线程。可以防止join_thread()方法阻塞。

2 q.close():关闭队列,防止队列中加入更多数据。调用此方法,后台线程将继续写入那些已经入队列但尚未写入的数据,但将在此方法完成时关闭。如果q被垃圾收集,将调用此方法。关闭队列不会在队列使用者中产生任何类型的数据结束信号或者异常。例如,如果某个使用者正在被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。
 
3 q.join_thread():连接对列的后台线程。此方法用于调用q.close()方法之后,等待所有队列项被取消。默认情况下,此方法有不是q的原始创建者的所有进程调用。调用q.cancel_join_thread方法可以禁用这种行为。
'''
multiprocessing模块支持进程间通信的两种主要形式:管道和队列
都是基于消息传递实现的,但是队列接口
'''

from multiprocessing import Process,Queue
import time
q=Queue(3)


#put ,get ,put_nowait,get_nowait,full,empty
q.put(3)
q.put(3)
q.put(3)
print(q.full()) #满了

print(q.get())
print(q.get())
print(q.get())
print(q.empty()) #空了

 

4、11 生产者消费者模型

生产者消费者模型

在并发编程中使用生产者和消费者模式能够解决大多数问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

为什么要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢了,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

什么是生产者消费者模式

*生产者消费者模式是通过一个容器来解决生产者和消费者强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列中取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

基于队列实现生产者消费者模型

from multiprocessing import  Process, Queue
import time, random, os
def consumer(q):
   while True:
       res = q.get()
       time.sleep(random.randint(1,3))
       print(f'{os.getpid()}吃{res}')

def producer(q):
   for i in range(10):
       time.sleep(random.randint(1,3))
       res = '包子%s'% i
       q.put(res)
       print(f'{os.getpid()}生产了{res}')

if __name__ == '__main__':
   q = Queue()
   p1 = Process(target=producer, args=(q,))
   c1 = Process(target=consumer, args=(q,))
   p1.start()
   c1.start()
   print('主')
# 生产者消费者模型总结
# 程序中有两类角色(生产者)
  一类负责生产数据(生产者)
      二类负责处理数据(消费者)
        # 引入生产者消费者模型为了解决的问题是:
      平衡生产者与消费者之间工作能力,从而提高程序整体处理数据的速度
         # 如何实现:
      生产者<-->队列<-->消费者
         # 生产者消费者模型实现类程序的解耦合

此时的代码就有了一个问题,主进程永远不会结束,原因是:生产者p在生产结束了,但是消费者c在取空之后,则一直处于死循环中且卡在q.get()这一步。

解决方式无非是让生产者在生产完毕后,往队列中在发一个结束信号,这样消费者在接收到信号后就可以break出死循环。

from multiprocessing import  Process, Queue
import time, random, os
def consumer(q):
   while True:
       res = q.get()
       if res is None: break#接收到结束信号,break出循环
       time.sleep(random.randint(1,3))
       print(f'{os.getpid()}吃{res}')

def producer(q):
   for i in range(10):
       time.sleep(random.randint(1,3))
       res = '包子%s'% i
       q.put(res)
       print(f'{os.getpid()}生产了{res}')
   q.put(None)#发送结束信号

if __name__ == '__main__':
   q = Queue()
   p1 = Process(target=producer, args=(q,))
   c1 = Process(target=consumer, args=(q,))
   p1.start()
   c1.start()
   print('主')

注意:结束信号None,不一定要由生产者发,主进程里同样可以发,但主进程需要等生产者结束后才应该发送信号

from multiprocessing import  Process, Queue
import time, random, os
def consumer(q):
   while True:
       res = q.get()
       if res is None: break#接收到结束信号,break出循环
       time.sleep(random.randint(1,3))
       print(f'{os.getpid()}吃{res}')

def producer(q):
   for i in range(10):
       time.sleep(random.randint(1,3))
       res = '包子%s'% i
       q.put(res)
       print(f'{os.getpid()}生产了{res}')


if __name__ == '__main__':
   q = Queue()
   p1 = Process(target=producer, args=(q,))
   c1 = Process(target=consumer, args=(q,))
   p1.start()
   c1.start()
   p1.join()
   q.put(None)#发送结束信号
   print('主')

但上述解决方案,在有多个生产者和多个消费者时,我们则需要用一个很low的方案去解决

from multiprocessing import  Process, Queue
import time, random, os
def consumer(q):
   while True:
       res = q.get()
       if res is None: break
       time.sleep(random.randint(1,3))
       print(f'{os.getpid()}吃{res}')

def producer(name, q):
   for i in range(10):
       time.sleep(random.randint(1,3))
       res = '%s %s'% (name, i)
       q.put(res)
       print(f'{os.getpid()}生产了{res}')


if __name__ == '__main__':
   q = Queue()
   p1 = Process(target=producer, args=('包子', q))
   p2= Process(target=producer, args=('面条', q))
   p3 = Process(target=producer, args=('粥', q))

   c1 = Process(target=consumer, args=(q,))
   c2 = Process(target=consumer, args=(q,))
   p1.start()
   p2.start()
   p3.start()
   c1.start()
   c2.start()
   p1.join()#必须保证生产者全部生产完毕,才应该发送结束信号
   p2.join()
   p3.join()
   q.put(None)#有几个消费者就应该发送几次结束信号None
   q.put(None)
   print('主')

其实对于这种情况,我们有例外一种队列提供这种机制

    # JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成这个项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
   
   #参数介绍:
  maxsize是队列中允许最大项数,省略则无大小限制。
   
# 方法介绍:
   JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
   q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
   q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
   
from multiprocessing import  Process, JoinableQueue
import time, random, os
def consumer(q):
   while True:
       res = q.get()
       if res is None: break
       time.sleep(random.randint(1,3))
       print(f'{os.getpid()}吃{res}')
       q.task_done()#向q.join()发送一次信号,证明一个数据已经被取走了

def producer(name, q):
   for i in range(10):
       time.sleep(random.randint(1,3))
       res = '%s %s'% (name, i)
       q.put(res)
       print(f'{os.getpid()}生产了{res}')


if __name__ == '__main__':
   q = JoinableQueue()
   p1 = Process(target=producer, args=('包子', q))
   p2= Process(target=producer, args=('面条', q))
   p3 = Process(target=producer, args=('粥', q))

   c1 = Process(target=consumer, args=(q,))
   c2 = Process(target=consumer, args=(q,))

   c1.daemon =True
   c2.daemon =True
   p_1 = [p1,p2,p3,c1,c2]
   for p in p_1:
       p.start()
   p1.join()
   p2.join()
   p3.join()
   q.join()# 主进程代码运行完毕--生产者运行完毕,队列也取空了--消费者没有存在的意义了。
# 主进程代码运行完毕,某个进程应该结束掉,这种情况使用守护线程会非常边便捷。
 

 

原文地址:https://www.cnblogs.com/hanyi12/p/11570323.html