进程实际操作篇2

温故而知新

操作系统

"""
操作系统就是一个协调、管理和控制计算机硬件资源和软件资源的控制程序
"""

多道技术

"""
单核实现并发的效果

串行: 在一条时间线上执行的,下一个进程的运行要等上一个进程运行完
并发: 看起来像是同时运行多个任务的就可以叫做并发
并行: 真正意义上同时运行多个任务

ps: 微博上的明星出轨  -- > 星轨

空间上和时间上的复用
空间上的复用:
	多个程序公用一套计算机硬件
时间上的复用:
	切换 + 保存的状态
"""

# 切换分为俩种情况
# 1.当一个程序遇到I/O操作的时候,操作系统会立刻剥夺该程序的cpu执行权限,去给下一个程序使用cpu,这是提高了cpu的利用率并且不影响程序的执行效率
# 2. 当一个程序运行的时间很长时,操作系统也会强行调走cpu的执行权限,去执行下一个程序, 这是降低了程序的运行的效率的 运行的时间 + 切换的时间  但是实现了单核并发的效果

进程

"""
程序就是一堆死代码	"死的"
进程则是正在执行的过程	"活的"
"""
# 进程的调度算法
"""
先来先服务 调度算法
短作业优先调度算法
时间片轮转法 + 多级反馈队列  (现在使用多的) 
"""

进程运行的三状态图

"""
就绪态: 一切程序必须要先过就绪态才能加入运行态
运行态: 正在被cpu运行
阻塞态: 程序遇到IO操作了
理想: 我们希望我们开发的程序一直处于就绪态和运行态之间
"""

俩对重要概念

  • 同步和异步

"""任务的提交方式"""
同步:	
    任务提交之后=原地等待任务的返回结果期间不做任何事
    
异步:
    任务提交之后不原地等待任务的返回结果执行,执行下一行代码
    	结果由异步回调机制做处理
  • 阻塞非阻塞

    """程序的运行状态"""
    阻塞: 阻塞态
    非阻塞态: 就绪态, 运行态
    

开启进程的俩种方式

from mutiprocessing import Process

# 1. 类实例化产生对象
# 2. 类的继承 run方法

# 在windows 里面开启进程的代码一定要写在main代码内
创建一个进程就是在内存空间种申请了一块内存空间讲需要的代码丢进去

join方法

"""
主进程等待子进程代码运行完毕后再往下执行代码
"""

进程间数据是相互隔离的(默认情况下)

ps: 人工智能相关参考网站

作为一名python程序员当你遇到一个功能的时候,第一时间你可以考虑是否有对应的模块已经帮你实现了该功能

pycharm过期时,可下载最新的版本,到这个网站:http://idea.medeming.com/jets/拿取激活码

今日内容详细

进程对象及其他用法

"""
一台计算机上面运行着很多进程,那么计算机是如何区分并管理这些进程服务端的呢?
计算机会给每一个运行的进程分配一个pid号
如何查看
windows电脑
	进入cmd输入 tasklist 即可 查看
	tasklist | findstr PID 号 即可查看具体的进程
mac电脑
	进入终端之后输入ps aux
	ps aux | grep PID 查看具体的进程

import os
os.getpid()  # 查看当前进程的进程号
os.getppid()  # 查看当前进程的父进程号

# 是告诉操作系统帮你企业杀死当前进程 但是需要一定的时间 而代码执行的运行速度很快
p.terminate()  # 杀死当前进程
time.sleep(0.1)
print(p.is_alive())  #  判断当前进程是否存活
"""

僵尸进程与孤儿进程(了解)

# 僵尸进程
"""
死了但是还没死透
当你开设了子进程之后, 该进程死后不会立刻释放占用的进程号
因为我要让父进程能够查看到它开设的子进程的一些基本信息 占用的pid号 运行时间...
所有的进程都会步入僵尸进程
	父进程不死,并且在无限制的创建子进程并且子进程也不会结束
	回收子进程占用的pid号
		父进程等待子进程运行结束
		父进程调用join方法
"""

# 孤儿进程
"""
子进程存活,父进程意外死亡
这个时候,本来是没有人来给子进程结束后回收pid号了
但是 操作系统会开设一个"儿童福利院"专门管理孤儿进程回收相关资源
"""

有个详细的博客介绍: https://www.cnblogs.com/Anker/p/3271773.html

1、基本概念

  我们知道在unix/linux中,正常情况下,子进程是通过父进程创建的,子进程在创建新的进程。子进程的结束和父进程的运行是一个异步过程,即父进程永远无法预测子进程 到底什么时候结束。 当一个 进程完成它的工作终止之后,它的父进程需要调用wait()或者waitpid()系统调用取得子进程的终止状态。

  孤儿进程:一个父进程退出,而它的一个或多个子进程还在运行,那么那些子进程将成为孤儿进程。孤儿进程将被init进程(进程号为1)所收养,并由init进程对它们完成状态收集工作。

  僵尸进程:一个进程使用fork创建子进程,如果子进程退出,而父进程并没有调用wait或waitpid获取子进程的状态信息,那么子进程的进程描述符仍然保存在系统中。这种进程称之为僵死进程。

2、问题及危害

  unix提供了一种机制可以保证只要父进程想知道子进程结束时的状态信息, 就可以得到。这种机制就是: 在每个进程退出的时候,内核释放该进程所有的资源,包括打开的文件,占用的内存等。 但是仍然为其保留一定的信息(包括进程号the process ID,退出状态the termination status of the process,运行时间the amount of CPU time taken by the process等)。直到父进程通过wait / waitpid来取时才释放。 但这样就导致了问题,如果进程不调用wait / waitpid的话, 那么保留的那段信息就不会释放,其进程号就会一直被占用,但是系统所能使用的进程号是有限的,如果大量的产生僵死进程,将因为没有可用的进程号而导致系统不能产生新的进程. 此即为僵尸进程的危害,应当避免。

  孤儿进程是没有父进程的进程,孤儿进程这个重任就落到了init进程身上,init进程就好像是一个民政局,专门负责处理孤儿进程的善后工作。每当出现一个孤儿进程的时候,内核就把孤 儿进程的父进程设置为init,而init进程会循环地wait()它的已经退出的子进程。这样,当一个孤儿进程凄凉地结束了其生命周期的时候,init进程就会代表党和政府出面处理它的一切善后工作。因此孤儿进程并不会有什么危害。

  任何一个子进程(init除外)在exit()之后,并非马上就消失掉,而是留下一个称为僵尸进程(Zombie)的数据结构,等待父进程处理。这是每个 子进程在结束时都要经过的阶段。如果子进程在exit()之后,父进程没有来得及处理,这时用ps命令就能看到子进程的状态是“Z”。如果父进程能及时 处理,可能用ps命令就来不及看到子进程的僵尸状态,但这并不等于子进程不经过僵尸状态。 如果父进程在子进程结束之前退出,则子进程将由init接管。init将会以父进程的身份对僵尸状态的子进程进行处理。

  僵尸进程危害场景:

  例如有个进程,它定期的产 生一个子进程,这个子进程需要做的事情很少,做完它该做的事情之后就退出了,因此这个子进程的生命周期很短,但是,父进程只管生成新的子进程,至于子进程 退出之后的事情,则一概不闻不问,这样,系统运行上一段时间之后,系统中就会存在很多的僵死进程,倘若用ps命令查看的话,就会看到很多状态为Z的进程。 严格地来说,僵死进程并不是问题的根源,罪魁祸首是产生出大量僵死进程的那个父进程。因此,当我们寻求如何消灭系统中大量的僵死进程时,答案就是把产生大 量僵死进程的那个元凶枪毙掉(也就是通过kill发送SIGTERM或者SIGKILL信号啦)。枪毙了元凶进程之后,它产生的僵死进程就变成了孤儿进 程,这些孤儿进程会被init进程接管,init进程会wait()这些孤儿进程,释放它们占用的系统进程表中的资源,这样,这些已经僵死的孤儿进程 就能瞑目而去了。

僵尸进程(存在有害的可能) 孤儿进程(无害)

代码测试 在linux或者unix中

1、产生僵尸进程的程序test.py内容如下

#coding:utf-8
from multiprocessing import Process
import time,os

def run():
    print('子',os.getpid())

if __name__ == '__main__':
    p=Process(target=run)
    p.start()

    print('主',os.getpid())
    time.sleep(1000)

2、在unix或linux系统上执行 --- > 命令以及执行结果

[root@vm172-31-0-19 ~]# python3  test.py &
[1] 18652
[root@vm172-31-0-19 ~]# 主 18652
子 18653

[root@vm172-31-0-19 ~]# ps aux |grep Z
USER       PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
root     18653  0.0  0.0      0     0 pts/0    Z    20:02   0:00 [python3] <defunct> #出现僵尸进程
root     18656  0.0  0.0 112648   952 pts/0    S+   20:02   0:00 grep --color=auto Z

[root@vm172-31-0-19 ~]# top #执行top命令发现1zombie
top - 20:03:42 up 31 min,  3 users,  load average: 0.01, 0.06, 0.12
Tasks:  93 total,   2 running,  90 sleeping,   0 stopped,   1 zombie
%Cpu(s):  0.0 us,  0.3 sy,  0.0 ni, 99.7 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
KiB Mem :  1016884 total,    97184 free,    70848 used,   848852 buff/cache
KiB Swap:        0 total,        0 free,        0 used.   782540 avail Mem 

  PID USER      PR  NI    VIRT    RES    SHR S %CPU %MEM     TIME+ COMMAND                                                                                                                                        
root      20   0   29788   1256    988 S  0.3  0.1   0:01.50 elfin        

3、

  • 等待父进程正常结束后会调用wait/waitpid去回收僵尸进程
  • 但如果父进程是一个死循环,永远不会结束,那么该僵尸进程就会一直存在,僵尸进程过多,就是有害的
  • 解决方法一:杀死父进程
  • 解决方法二:对开启的子进程应该记得使用join,join会回收僵尸进程
参考python2源码注释
class Process(object):
    def join(self, timeout=None):
        '''
        Wait until child process terminates
        '''
        assert self._parent_pid == os.getpid(), 'can only join a child process'
        assert self._popen is not None, 'can only join a started process'
        res = self._popen.wait(timeout)
        if res is not None:
            _current_process._children.discard(self)

join方法中调用了wait,告诉系统释放僵尸进程。discard为从自己的children中剔除

思考: 在子进程join之后能否看到子进程p的id号

from multiprocessing import Process
import time,os

def task():
    print('%s is running' %os.getpid())
    time.sleep(3)

if __name__ == '__main__':
    p=Process(target=task)
    p.start()
    p.join() # 等待进程p结束后,join函数内部会发送系统调用wait,去告诉操作系统回收掉进程p的id号

    print(p.pid) #???此时能否看到子进程p的id号
    print('主')

答案

#答案:可以
#分析:
p.join()是像操作系统发送请求,告知操作系统p的id号不需要再占用了,回收就可以,
此时在父进程内还可以看到p.pid,但此时的p.pid是一个无意义的id号,因为操作系统已经将该编号回收

打个比方:
我党相当于操作系统,控制着整个中国的硬件,每个人相当于一个进程,每个人都需要跟我党申请一个身份证号
该号码就相当于进程的pid,人死后应该到我党那里注销身份证号,p.join()就相当于要求我党回收身份证号,但p的家人(相当于主进程)
仍然持有p的身份证,但此刻的身份证已经没有意义

守护进程

主进程创建守护进程

其一:守护进程会在主进程代码执行结束后就终止

其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children

注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止

# -*- coding: utf-8 -*-
# @Author  : JKey  
# Timer    : 2021/1/19 23:32
from multiprocessing import Process
import time


def task(name):
    print(f'{name} 还活着')
    time.sleep(3)
    print(f'{name} 死掉了')


if __name__ == '__main__':
    p1 = Process(target=task, args=('liu',))
    p1.daemon = True  # 将子进程设置成守护进程 这一句一定要放在start方法上面才有效,否则会报错
    p1.start()
    # p1.daemon = True  # AssertionError: process has already started
    print('主的死掉了')

迷惑人的例子 他们打印的先后顺序,或者说执行顺序

主进程代码运行完毕,守护进程就会结束
from multiprocessing import Process
import time

def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")

if __name__ == '__main__':
    p1=Process(target=foo)
    p2=Process(target=bar)

    p1.daemon=True
    p1.start()
    p2.start()
    print("main-------")

打印的几种情况

1 """
main-------
456
end456
"""

2 """
main-------
123
456
end456
"""

3 """
123
main-------
456
end456
"""

互斥锁

概念: 在编程中,引入了对象互斥锁的概念,来保证共享数据操作的完整性.每个对象都对应一个额可称为"互斥锁"的标记,这个标记用来保证在任一时刻,只能有一个线程访问该对象

进程之间数据不共享, 但是共享同一套文件系统, 所以访问同一个文件,或者同一个打印终端,是没有问题的.

而共享带的是竞争, 竞争带来的结果就是混乱, 如何控制,就是加锁处理

案例1: 多个进程共享一个打印终端

并发运行,效率高,,但竞争同一打印到终端,带来了错乱

from multiprocessing import Process
import os, time, random


def work():
    print('%s is running' % os.getpid())
    time.sleep(random.random() * 10)  # 睡得时间随机,表示不同得子进程的工作时间不一
    print('%s is done' % os.getpid())


if __name__ == '__main__':
    for i in range(3):
        p = Process(target=work)
        p.start()


打印效果

3628 is running
3588 is running
6968 is running
3588 is done  # 第一个就乱了顺序
3628 is done
6968 is done

加锁:由并发变成了串行,牺牲了运行效率,但避免了竞争

from multiprocessing import Process, Lock
import os, time, random


def work(lock):
    lock.acquire()  # 抢到锁的就往下执行
    print('%s is running' % os.getpid())
    time.sleep(random.random() * 10)
    print('%s is done' % os.getpid())
    lock.release()  # 执行完就释放


if __name__ == '__main__':
    lock = Lock()  # 做一个父进程的唯一的锁
    for i in range(3):
        p = Process(target=work, args=(lock, ))
        p.start()

输出结果

13712 is running
13712 is done
9416 is running
9416 is done
15160 is running
15160 is done

案例2:多个进程共享同一文件

文件当数据库,模拟抢票

并发运行,效率高,但竞争写同一文件,数据写入错乱

代码如下

from multiprocessing import Process,Lock
import time,json,random
def search():
    dic=json.load(open('db.txt'))
    print('33[43m剩余票数%s33[0m' %dic['count'])

def get():
    dic=json.load(open('db.txt'))
    time.sleep(0.1) #模拟读数据的网络延迟
    if dic['count'] >0:
        dic['count']-=1
        time.sleep(0.2) #模拟写数据的网络延迟
        json.dump(dic,open('db.txt','w'))
        print('33[43m购票成功33[0m')

def task(lock):
    search()
    get()
if __name__ == '__main__':
    lock=Lock()
    for i in range(3): #模拟并发3个客户端抢票
        p=Process(target=task,args=(lock,))
        p.start()

输出结果

剩余票数1
剩余票数1
剩余票数1
购票成功
购票成功
购票成功

当没加锁时,你会发现明明只有一张票了,却被好多人都买到了,我靠,这不上车的时候要打起来!

所以我们对要修改数据的操作,加上锁,一次只能一个人修改,下个人来的就是上一个人修改之后的文件信息

from multiprocessing import Process, Lock
import time, json, random


def search():
    dic = json.load(open('db.json'))
    print('33[43m剩余票数%s33[0m' % dic['count'])


def get():
    dic = json.load(open('db.json'))
    time.sleep(0.1)  # 模拟读数据的网络延迟
    if dic['count'] > 0:
        dic['count'] -= 1
        time.sleep(0.2)  # 模拟写数据的网络延迟
        json.dump(dic, open('db.json', 'w'))
        print('33[43m购票成功33[0m')
    else:
        print('33[43m购票失败33[0m')


def task(lock):
    search()
    lock.acquire()
    get()
    lock.release()


if __name__ == '__main__':
    lock = Lock()
    for i in range(3):  # 模拟并发100个客户端抢票
        p = Process(target=task, args=(lock,))
        p.start()


输出结果:

剩余票数1
剩余票数1
剩余票数1
购票成功
购票失败
购票失败

###  总结: 

加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。

虽然可以用文件共享数据实现进程间通信,但问题是:
1.效率低(共享数据基于文件,而文件是硬盘上的数据)
2.需要自己加锁处理

因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。

1 队列和管道都是将数据存放于内存中
2 队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,
我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。




## 进程间的通讯(IPC)

共享内存

### 管道

管道的一些应用场景

- `tasklist | findstr xxx`
- ps aux | grep xxx

#### 队列

**队列等于管道加锁**

概念:

- 进程彼此之间相互隔离,要实现进程间通信(IPC),multiprocessing 模块支持 俩种形式: 队列和管道,这两种方式都是使用消息传递的.

创建队列的类(底层就是以管道和锁定的方式实现)

- Queue([maxsize]): 创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传输.

参数介绍:

```python
maxsize:是队列种允许最大项数,省略则会有一个默认的最大值

最后可以查看到队列的默认的最大数字,但因为内存的关系,可能也放不到这么多

方法介绍

主要方法:

q.put 方法

用以插入数据到队列中,put方法还有俩个可选参数: blocked 和 timeout.
如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间.
如果超时,会抛出 queue.Full 的异常, 如果blocked 为 False, 但该Queue已满,会立即抛出queue.Full的异常

q.get 方法

可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出_queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出_queue.Empty异常.

q.get_nowait() 方法 :同q.get(False)

from multiprocessing import Process, Queue

q = Queue(3)  # 立马就创建一个内存空间,是一个共享的
# put() 方法向队列中存放值
q.put([1, 2, 3])
q.put({'name': 1})
# print(q.full())  # False
q.put('this is cx')
# print(q.full())  # True
# print(q.put_nowait(1))  # queue.Full
# q.put(10)  # 当一个队列内被放满时,还有值要发就必须要等到队列有位置的时候,才能放入,不然默认会一直等待
# 10 表示的是加入队列的值, block表示的是加锁 True表示的是锁生效, timeout表示的是超时时间,如果这个时间队列还没有空闲的位置就会抛异常
# q.put(10, block=True, timeout=3)  # queue.Full
# 也可以直接不加锁, 队列还没有空闲的位置就会直接抛异常
# q.put(10, block=False)  # queue.Full
"""
所以我们可以捕捉这些异常,当发生时,就可以提醒队列满了,或者做其他的事
"""

# get() 方法,去队列中取值
print(q.get())
print(q.qsize())  # 2
print(q.get())
print(q.empty())  # False
print(q.get())
print(q.get_nowait())  # _queue.Empty
print(q.empty())  # True

# print(q.get())  # 和put一样,取值,队列没有,会一直阻塞原地,等到他有值取,才结束阻塞
# 和put方法一样,也可以设置加锁和超时时间
# print(q.get(block=True, timeout=2))  # 抛出的异常类型 _queue.Empty
# print(q.get(block=False))

生产者和消费者模型

概念:

  • 在并发编程中使用生产者和消费者模式能够解决大多数问题.该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度.

为什么要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程,在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

基于队列实现生产者消费者模型

存在一些问题的 模型

from multiprocessing import Process, Queue, JoinableQueue
import time, random


def producer(q, name, food):
    for i in range(1, 3):
        res = f'{food}{i}'
        time.sleep(random.randint(1, 2))

        q.put(res)
        print(f'生产者{name}造了{res}成功')


def consumer(q, name): 
    while True:  # 模拟消费者胃口很大,来多少食物吃多少食物
        res = q.get()  # 会出现一个情况, 没有数据就会卡住
        time.sleep(random.randint(1, 2))
        print(f"消费者{name} 吃了 {res}")

if __name__ == '__main__':
    q = Queue()
    p = Process(target=producer, args=(q, "jkey", "包子"))
    p1 = Process(target=producer, args=(q, "song", "泔水"))

    c1 = Process(target=consumer, args=(q, 'xiong'))
    c2 = Process(target=consumer, args=(q, 'zui'))
    c1.daemon = True
    c2.daemon = True
    p.start()
    p1.start()
    c1.start()
    c2.start()

    p.join()
    p1.join() 

但是这个案例会有一个问题,那就是在消费者的.get() 方法 , 我们知道当队列中的数据被取完时,它就会阻塞在原地.那主进程又要等着子进程运行完毕才结束主进程的生命周期,所以就会出现出现卡住的情况.

那我们要怎么解决呢?

那我们只需要在生产者生产完食物的时候,再发送一个制造完毕的信号到队列中,消费者取的时候就可以根据这个提示信号,来结束掉当前的子进程的运行,即可

但是你得让全部的消费者进程都结束掉,一个None信号被一个消费者进程接受,意味着,有几个消费者就得发几个结束信息.

from multiprocessing import Process, Queue, JoinableQueue
import time, random


def producer(q, name, food):
    for i in range(1, 3):
        res = f'{food}{i}'
        time.sleep(random.randint(1, 2))

        q.put(res)
        print(f'生产者{name}造了{res}成功')


def consumer(q, name):
    while True:
        res = q.get()  # 会出现一个情况, 没有数据就会卡住
        if res is None: break
        time.sleep(random.randint(1, 2))
        print(f"消费者{name} 吃了 {res}")


if __name__ == '__main__':
    q = Queue()
    p = Process(target=producer, args=(q, "jkey", "包子"))
    p1 = Process(target=producer, args=(q, "song", "泔水"))

    c1 = Process(target=consumer, args=(q, 'xiong'))
    c2 = Process(target=consumer, args=(q, 'zui'))
    c1.daemon = True
    c2.daemon = True
    p.start()
    p1.start()
    c1.start()
    c2.start()

    p.join()
    p1.join()
    q.put(None) 
    q.put(None)

但是这样解决不够精明,因为当我们的消费者数量增加的时候,发送的None的结束型号的个数也要随着增加.

那其实我们的进程类里面还有一种对象模块 joinableQueue模块

是一个可以等待的队列模块,该模块除了比Queue模块多了俩个方法之外,其他的功能都是一样的.

JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

参数介绍:

maxsize是队列中允许最大项数,省略则无大小限制。    

  #方法介绍:
JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止

于是我们的模型就变成了这样

from multiprocessing import Process, JoinableQueue
import time, random


def producer(q, name, food):
    for i in range(1, 3):
        res = f'{food}{i}'
        time.sleep(random.randint(1, 2))

        q.put(res)
        print(f'生产者{name}造了{res}成功')
    q.join()  # 等放到队列里面的值被取干净了


def consumer(q, name):
    while True:
        res = q.get()  # 会出现一个情况, 没有数据就会卡住
        # if res is None: break
        time.sleep(random.randint(1, 2))
        print(f"消费者{name} 吃了 {res}")
        q.task_done()  # 告诉队列已经从里面取出了一个数据并且处理完毕了


if __name__ == '__main__':
    # q = Queue()
    q = JoinableQueue()  # 可被等待的Q
    p = Process(target=producer, args=(q, "jkey", "包子"))
    p1 = Process(target=producer, args=(q, "song", "泔水"))

    c1 = Process(target=consumer, args=(q, 'xiong'))
    c2 = Process(target=consumer, args=(q, 'zui'))
    c1.daemon = True
    c2.daemon = True
    p.start()
    p1.start()
    c1.start()
    c2.start()

    p.join()
    p1.join()  # 意味着 俩生产者已经生产完数据了, 并且因为有了q.join所以 现在害意味着队列中的数据也被取空了,所以,消费者就就没有存在的必要了 于是将消费者都设置成守护进程
    # q.put(None)
    # q.put(None)


"""
q.join() 和 q.task_done 是一个对应的关系
join() 表示等放到一个队列中的数据被拿走  ; 一般与生产者 即 队列的 put结合
而task_done是表示我已经在队列里面拿了一个值了,并且处理完毕了  ; 一般与消费者 即 队列的get 结合
"""

共享数据

展望未来,基于消息传递的并发编程是大势所趋

即便是使用线程,推荐做法也是将程序设计为大量独立的线程集合

通过消息队列交换数据。这样极大地减少了对使用锁定和其他同步手段的需求,

还可以扩展到分布式系统中

进程间通信应该尽量避免使用本节所讲的共享数据的方式

进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的

虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此

manager()返回的manager对象控制一个服务器进程,该进程保存Python对象,并允许其他进程使用代理操作它们。

manager()返回的管理器将支持类型list、dict、Namespace、Lock、RLock、Semaphore、BoundedSemaphore、Condition、Event、Barrier、Queue、Value和Array。例如,

进程之间操作共享的数据

对同一个字典中的数量进程修改操作

# manager这里可以共享列表,字典等很多数据类型
from multiprocessing import Process, Manager, Lock


def work(d, lock):
    with lock:  # 不加锁而操作共享的数据,肯定会出现数据错乱
        d['count'] -= 1


if __name__ == '__main__':
    lock = Lock()
    with Manager() as m:
        dic = m.dict({'count': 100})  # 生成一个字典,可在多个进程间共享和传递
        p_l = []
        for i in range(100):
            p = Process(target=work, args=(dic, lock))
            p_l.append(p)
            p.start()
        for p in p_l:
            p.join()
        print(dic)

信号量

信号量Semahpore(同线程一样)

信号量和互斥锁的区别

互斥锁 同时只允许一个线程更改数据, 而 semahpore 是同时允许一定数量的线程更改数据.

比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去,如果指定信号量为3,那么来一个人获得一把锁,计数加1,当计数等于3时,后面的人均需要等待。一旦释放,就有人可以获得一把锁

信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念

案例 : 去公共厕所 上厕所

from multiprocessing import Process, Semaphore
import time, random


def go_wc(sem, user):
    sem.acquire()
    t = time.strftime('%X')
    print(f'{user}在{t}时间上 占到了一个茅坑')
    time.sleep(random.randint(0, 2))   # 上厕所的时间
    sem.release()


if __name__ == '__main__':
    sem = Semaphore()  # 信号量 和 互斥锁一样的用法
    p_l = []
    for i in range(12):
        p = Process(target=go_wc, args=(sem, f"user{i}"))
        p.start()
        p_l.append(p)
    for i in p_l:
        i.join()

    print('+++++++++')

大总结

总结一下,我们今天学的知识

1.什么是僵尸进程

  • 僵尸进程指的是当一个进程运行完毕之后,不会立马死掉,而是会将重要的资源释放掉(cpu,内存,打开的文件等),但是还是会保留一些状态信息,比如 pid, 僵尸进程是一个数据结构,是每个子进程都会经历的一个过程.除了 pid为1的进程,就是我们的init进程,它死了就意味着关机了.
  • 僵尸进程本身是没有害的,但是当它的父进程,是一个只知道造子进程而不会为这些子进程运行完变成僵尸进程时回收掉这些资源,或者它回收的间隔没有僵尸进程创建的快,这个时候才是有害的,就会造成pid不够用,后面的程序运行不进来.
  • 如何处理呢?
    • 1.向父进程发送 kill -CHLD pid 的命令告诉父进程去回收僵尸子进程
    • 2.当查看还有僵尸子进程时,就可以发送 kill -9 父pid号 将这个产生好多僵尸进程的进程给杀死掉.
    • 3.为这个进程后面加上 signal模块 signal(SIGCHLD, SIG_IGN)

2.什么是孤儿进程

  • 当一个进程的父进程意外死亡时,这个进程就变成了孤儿进程,这些孤儿进程会被pid为1的init管理回收.
  • 孤儿进程是无害的

3.守护进程

  • 守护进程指的是当给一个进程加上守护进程时,守护的进程的代码执行完后这个守护进程也会一起"死亡".
  • p.dameo() = True
  • 一定要放到p.start的前面

4.互斥锁

  • 互斥锁是用来保证共享数据的安全性,保证共享数据操作的完整性.
  • 例如,同时抢票,你看到的那个时间段的票可能是10张,但是过了一会儿后,(在一会的时间那十张票已经卖完了)你要购买票的时候,你应该提醒的是购票失败.而不是你也购票成功,不然就会导致你和别人买的是一个座位.

5.ipc 进程间的通讯

  1. 管道

    1. 一般应用在命令中的比较多

      如: tasklist | findstr pid

      ​ ps aux | grep pid

  2. 队列

    1. Queue模块
      1. put()往里面存值
      2. get() 去立马取值
      3. .full() 方法 判断队列中的个数有没有放满 (不够准确,会存在碰巧,刚判断完,就取了一个值)
      4. .empty() 判断队列中的数据有没有被取完,也不够准确

    这俩种方法都是为了实现俩个不同的进程 之间进行通信

6.生产者消费者模型

  1. 这是一种模式, 为的就是让生产者不用等到消费者消费完位的产品之后,我才开始生产,而是可以通过一个媒介.来让生产者和消费者都能不用等待对方完成任务之后才能接着干活的情况.而这个媒介就是我们的队列.
  2. 比如: 生产者就是一个卖包子的,消费者就是一个买包子的,而卖包子的不能在造完包子之后直接扔在买包子的嘴里,买包子的也不用蹲在那里等着,而是我通过一个蒸笼,我造好了包子,就往蒸笼里面放,你买包子的就可以直接在蒸笼里面拿.这就释放出了,造包子的不用等你去拿而腾不出手来造包子.

7.共享数据

  1. 共享数据是 一种不需要采用管道或者队列就能实现俩个进程之间相互通信的
  2. 它共享的数据可以是任何的数据类型
  3. 使用 Manager 模块, 造一个 manager对象,该对象可以基于自己下的数据类型让多进程实现通信

8.信号量

  1. 信号量和互斥锁一样,都是通过加锁来对数据进行安全保护的,
  2. 但是信号量运行多个人用一把锁,也就是同一时间,可以有多个人更改数据.
  3. 互斥锁就好比家用厕所,一次只能有一个人上厕所,而信号量就好比公共厕所,可以支持5个人同一时间上厕所.
原文地址:https://www.cnblogs.com/jkeykey/p/14304907.html