python 并发编程之多进程

一、 进程理论

1.什么是进程

进程:正在进行的一个过程或者说一个任务。而负责执行任务则是cpu

2.程序与进程的区别

程序仅仅只是一堆代码而已,而进程指的是程序的运行过程

举例:

厨师 为 A客人制作蛋糕,厨师有:

  • 做蛋糕的食谱
  • 原料:鸡蛋、面粉、水

在这个比喻中:

  • 食谱对应程序
  • 厨师对应计算机中的CPU
  • 原料 为输入的数据
  • 厨师阅读食谱、取出原料、制作的等一系列动作的总和对应 计算机中的进程

这个时候来了一个客人B,要求先为客人B制作蛋挞,因为 B 更加重要,于是,厨师记录下刚刚蛋糕做到哪一步了(保存进程状态)。

然后开始看蛋挞的食谱,开始做蛋挞(切换到高优先级的进程)

蛋挞做完后,回来继续做蛋糕,从他离开时候的那一步继续往下做。

需要强调的是:同一个程序执行两次,那也是两个进程,比如,打开两个QQ,各自登录不同的账号

3.并发与并行

无论是并行还是并发,在用户看来都是'同时'运行的,不管是进程还是线程,都只是一个任务而已,真正干活的是cpu,cpu来做这些任务,而一个cpu同一时刻只能执行一个任务

1.并发:是伪并行,即看起来是同时运行。单个cpu+多道技术就可以实现并发

举例(单核+多道,实现多个进程的并发执行):

张三同一时间要做多件事情:写作业、练听力、看美剧,但是张三同一个时间里只能做一件事,

所以为了实现多个任务并发执行的效果,可以写一会作业,然后练习听力,再看一会美剧


2.并行:同时运行,只有具备多个cpu才能实现并行

单核下,可以利用多道技术,多个核,每个核也都可以利用多道技术(多道技术是针对单核而言的

有四个核,六个任务,这样同一时间有四个任务被执行,假设分别被分配给了cpu1,cpu2,cpu3,cpu4,

一旦任务1遇到I/O就被迫中断执行,此时任务5就拿到cpu1的时间片去执行,这就是单核下的多道技术

而一旦任务1的I/O结束了,操作系统会重新调用它(需知进程的调度、分配给哪个cpu运行,由操作系统说了算)

可能被分 配给四个cpu中的任意一个去执行

4.进程的创建 (了解)

但凡是硬件,都需要有操作系统去管理,只要有操作系统,就有进程的概念,就需要有创建进程的方式,一些操作系统只为一个应用程序设计,比如微波炉中的控制器,一旦启动微波炉,所有的进程都已经存在。

而对于通用系统(跑很多应用程序),需要有系统运行过程中创建或撤销进程的能力,主要分为4中形式创建新的进程

  1. 系统初始化(查看进程linux中用ps命令,windows中用任务管理器,前台进程负责与用户交互,后台运行的进程与用户无关,运行在后台并且只在需要时才唤醒的进程,称为守护进程,如电子邮件、web页面、新闻、打印)
  2. 一个进程在运行过程中开启了子进程(如nginx开启多进程,os.fork,subprocess.Popen等)
  3. 用户的交互式请求,而创建一个新进程(如用户双击暴风影音)
  4. 一个批处理作业的初始化(只在大型机的批处理系统中应用)

无论哪一种,新进程的创建都是由一个已经存在的进程执行了一个用于创建进程的系统调用而创建的:

  1. 在UNIX中该系统调用是:fork,fork会创建一个与父进程一模一样的副本,二者有相同的存储映像、同样的环境字符串和同样的打开文件(在shell解释器进程中,执行一个命令就会创建一个子进程)
  2. 在windows中该系统调用是:CreateProcess,CreateProcess既处理进程的创建,也负责把正确的程序装入新进程。

关于创建的子进程,UNIX和windows

1.相同的是:进程创建后,父进程和子进程有各自不同的地址空间(多道技术要求物理层面实现进程之间内存的隔离),任何一个进程的在其地址空间中的修改都不会影响到另外一个进程。

2.不同的是:在UNIX中,子进程的初始地址空间是父进程的一个副本,提示:子进程和父进程是可以有只读的共享内存区的。但是对于windows系统来说,从一开始父进程与子进程的地址空间就是不同的。

5.进程的终止

  1. 正常退出(自愿,如用户点击交互式页面的叉号,或程序执行完毕调用发起系统调用正常退出,在linux中用exit,在windows中用ExitProcess)

  2. 出错退出(自愿,python a.py中a.py不存在)

  3. 严重错误(非自愿,执行非法指令,如引用不存在的内存,1/0等,可以捕捉异常,try...except...)

  4. 被其他进程杀死(非自愿,如kill -9)

6.进程的层次结构

无论UNIX还是windows,进程只有一个父进程,不同的是:

  1. 在UNIX中所有的进程,都是以init进程为根,组成树形结构。父子进程共同组成一个进程组,这样,当从键盘发出一个信号时,该信号被送给当前与键盘相关的进程组中的所有成员。
  2. 在windows中,没有进程层次的概念,所有的进程都是地位相同的,唯一类似于进程层次的暗示,是在创建进程时,父进程得到一个特别的令牌(称为句柄),该句柄可以用来控制子进程,但是父进程有权把该句柄传给其他子进程,这样就没有层次了。

7.进程并发的实现

进程并发的实现在于,硬件中断一个正在运行的进程,把此时进程运行的所有状态保存下来,为此,操作系统维护一张表格,即进程表(process table),每个进程占用一个进程表项(这些表项也称为进程控制块)

该表存放了进程状态的重要信息:程序计数器、堆栈指针、内存分配状况、所有打开文件的状态、帐号和调度信息,以及其他在进程由运行态转为就绪态或阻塞态时,必须保存的信息,从而保证该进程在再次启动时,就像从未被中断过一样。

8. 补充 必备理论基础

一 操作系统的作用:
1:隐藏丑陋复杂的硬件接口,提供良好的抽象接口
2:管理、调度进程,并且将多个进程对硬件的竞争变得有序
二 多道技术:
1.产生背景:针对单核,实现并发
ps:
现在的主机一般是多核,那么每个核都会利用多道技术
有4个cpu,运行于cpu1的某个程序遇到io阻塞,会等到io结束再重新调度,会被调度到4个
cpu中的任意一个,具体由操作系统调度算法决定。
2.空间上的复用:如内存中同时有多道程序
3.时间上的复用:复用一个cpu的时间片
强调:遇到io切,占用cpu时间过长也切,核心在于切之前将进程的状态保存下来,这样
才能保证下次切换回来时,能基于上次切走的位置继续运行


二、开启进程的两种方式

1. multiprocessing 模块

python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程。

multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数)

multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,>提供了Process、Queue、Pipe、Lock等组件。

注意:与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。

Process 类 说明:

创建进程的类:

Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,可用来开启一个子进程

强调:
1. 需要使用关键字的方式来指定参数
2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号

参数说明:

group参数未使用,值始终为None

target表示调用对象,即子进程要执行的任务

args表示调用对象的位置参数元组,args=(1,2,'egon',)

kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}

name为子进程的名称

方法说明:

p.start():启动进程,并调用该子进程中的p.run() 
p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法  

p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
p.is_alive():如果p仍然运行,返回True

p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间。

属性说明:

p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置

p.name:进程的名称

p.pid:进程的pid

2.开启进程的两种方式

第一种方式:

# 方式一
from multiprocessing import Process
import time


def task(name):
    print('%s is running'%name)
    time.sleep(2)
    print('%s is done'%name)


if __name__ == '__main__':
    # p = Process(target=task, kwargs={'name': '子进程1'})
    p = Process(target=task, args=('子进程1',))  # 两种方式都可以,这种方式一定要加逗号
    p.start()  # 给操作系统发一个信号,也仅仅是发一个信号,是否开启子进程是操作系统自己决定的

    print('主')
  
-------------------执行结果----------------
主
子进程1 is running
子进程1 is done

第二种方式:

# 方式二
from  multiprocessing import Process
import time


class MyProcess(Process):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):   #这里必须是叫 run ,命名必须为run
        print('%s is running' %self.name)
        time.sleep(2)
        print('%s is done' %self.name)


if __name__ == '__main__':
    p = MyProcess('子进程1')
    p.start()
    print('主')
    
-------------------执行结果----------------
主
子进程1 is running
子进程1 is done

尾巴:在windows中Process()必须放到# if name == 'main':下

进程之间内存空间是隔离的:

from multiprocessing import Process

n=100 #在windows系统中应该把全局变量定义在if __name__ == '__main__'之上就可以了

def work():
    global n
    n=0
    print('子进程内: ',n)


if __name__ == '__main__':
    p=Process(target=work)
    p.start()
    print('主进程内: ',n)
   
-------------------执行结果----------------   
主进程内:  100
子进程内:  0

查看pid

from multiprocessing import Process
import time, os


def task():
    print('%s is running, parent id is <%s>'%(os.getpid(), os.getppid()))
    time.sleep(2)
    print('%s is done, parent id is <%s>' %(os.getpid(), os.getppid()))


if __name__ == '__main__':
    p = Process(target=task)
    p.start()

    print('主', os.getpid(),os.getppid())
 
-------------------执行结果----------------   
主 7364 11808
10856 is running, parent id is <7364>
10856 is done, parent id is <7364>

11808 是 pycharm 的pid

3. 基于多进程实现并发的套接字通信

客户端:

import socket
gd_client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
gd_client.connect(('127.0.0.1', 9969))

while True:
    cmd = input('>>:').strip()
    gd_client.send(cmd.encode('utf-8'))

    data = gd_client.recv(1024)
    print(data.decode('utf-8'))

服务端:

import socket
from multiprocessing import Process

def talk(conn):
    data = conn.recv(1024)
    print(data.decode('utf-8'))
    conn.send(data.upper())


def server():
    gd_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    gd_server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1)
    gd_server.bind(('127.0.0.1', 9969))
    gd_server.listen(5)

    while True:
        conn, addr = gd_server.accept()
        p = Process(target=talk, args=(conn,))
        p.start()


if __name__ == '__main__':
    server()

这样,先运行服务端,然后客户端就可以运行多个了。

4.join 方法 和 is_alive,terminate

(1) join

在主进程运行过程中如果想并发地执行其他的任务,我们可以开启子进程,此时主进程的任务与子进程的任务分两种情况

  • 情况一:在主进程的任务与子进程的任务彼此独立的情况下,主进程的任务先执行完毕后,主进程还需要等待子进程执行完毕,然后统一回收资源。
  • 情况二:如果主进程的任务在执行到某一个阶段时,需要等待子进程执行完毕后才能继续执行,就需要有一种机制能够让主进程检测子进程是否运行完毕,在子进程执行完毕后才继续执行,否则一直在原地阻塞,这就是join方法的作用
from multiprocessing import Process
import time, os


def task(name, n):
    print('%s is running' %name)
    time.sleep(n)

if __name__ == '__main__':
    start = time.time()
    p1 = Process(target=task, args=('子进程1', 5))
    p2 = Process(target=task, args=('子进程2', 3))
    p3 = Process(target=task, args=('子进程3', 2))

    # p1.start()
    # p2.start()
    # p3.start()
    p_list = [p1, p2, p3]  # 简写方式
    for p in p_list:
        p.start()

    # p1.join()
    # p2.join()
    # p3.join()
    for p in p_list:   # 简写方式
        p.join()

    print('主', time.time()-start)
    
-------------------执行结果----------------   
子进程1 is running
子进程2 is running
子进程3 is running
主 5.246000051498413

尾巴: join 是等待进程结束,p.join() 是让主进程等待,卡住的是主进程,子进程不会被卡住,所以这种方式并是串行。

进程只要start就会在开始运行了,所以p1-p3 ,在.start()时,系统中已经有四个并发的进程了

而我们p1.join()是在等p1结束,没错p1只要不结束主线程就会一直卡在原地,这也是问题的关键

join是让主线程等,而p1-p4仍然是并发执行的,p1.join的时候,其余p2,p3仍然在运行,等#p1.join结束,可能p2,p3早已经结束了,这样p2.join,p3.join直接通过检测,无需等待

所以3个join花费的总时间仍然是耗费时间最长的那个进程运行的时间

我们看下面这段代码,如果是这么写的话,就得等 p1完了之后再执行 p2,p2完了再执行p3

from multiprocessing import Process
import time, os


def task(name, n):
    print('%s is running' %name)
    time.sleep(n)

if __name__ == '__main__':
    start = time.time()
    p1 = Process(target=task, args=('子进程1', 5))
    p2 = Process(target=task, args=('子进程2', 3))
    p3 = Process(target=task, args=('子进程3', 2))

    p1.start()
    p1.join()
    p2.start()
    p2.join()
    p3.start()
    p3.join()

    print('主', time.time()-start)
    
-------------------执行结果----------------   
子进程1 is running   (隔5s)
子进程2 is running	  (隔3s)
子进程3 is running	  (隔2s)
主 10.475000143051147  

(2) is_alive ,terminate

from multiprocessing import Process
import time, os


def task():
    print('%s is running,parent id is <%s>' %(os.getpid(),os.getppid()))
    time.sleep(2)
    print('%s is done,parent id is <%s>' %(os.getpid(),os.getppid()))

if __name__ == '__main__':
    p = Process(target=task,)
    p.start()
    print(p.is_alive())  # True

    p.terminate() # 关闭进程,不会立刻关闭,所以is_alive 立刻查看的结果可能还是活的
    print(p.is_alive())  # True
    p.join()
    print('主')
    print(p.is_alive())  # False
    
-------------------执行结果----------------   
True
True
主
False

(3) name, pid

from multiprocessing import Process
import time


def task(nick_name):
    print('%s is running' %nick_name)
    time.sleep(2)
    print('run end')


if __name__ == '__main__':
    p = Process(target=task, args=('鸣人',) , name='子进程1')
    p.start()
    print(p.name, p.pid)
    
-------------------执行结果----------------   
子进程1 9812
鸣人 is running
run end

三、守护进程

主进程创建子进程,然后将该进程设置成守护自己的进程

两个条件:

  • 守护进程会在主进程代码执行结束后就终止

  • 守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children

如果我们有两个任务需要并发执行,那么开一个主进程和一个子进程分别去执行就ok了,如果子进程的任务在主进程任务结束后就没有存在的必要了,那么该子进程应该在开启前就被设置成守护进程。主进程代码运行结束,守护进程随即终止

import time
from multiprocessing import Process


def task(name):
    print('%s is running' %name)
    time.sleep(2)
    # p = Process(target=time.sleep, args=(3,))  # 守护进程不允许自己再创建自己的子进程,会报错
    # p.start()


if __name__ == '__main__':
    p = Process(target=task, args=('进程1',))
    p.daemon = True   # 一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行
    p.start()

    print('主...')  # 只要终端打印出这一行内容,那么守护进程p也就跟着结束掉了
    
    
-------------------执行结果----------------   
主...

四、互斥锁

1.互斥锁

from multiprocessing import Process
import time

def task(name):
    print('%s A' %name)
    time.sleep(1)
    print('%s B' %name)
    time.sleep(1)
    print('%s C' %name)

if __name__ == '__main__':
    for i in range(3):
        p=Process(target=task,args=('进程%s' %i,))
        p.start()

-------------------执行结果---------------- 
进程0 A
进程1 A
进程2 A
进程0 B
进程1 B
进程2 B
进程0 C
进程1 C
进程2 C

互斥锁的原理,就是把并发改成串行,降低了效率,但保证了数据安全不错乱

比如:A B C 三个人使用卫生间,A上卫生间,门上锁,B C 要等候,等到A完成任务后释放锁,B C 才有可能抢到,及把并行改为串行

from multiprocessing import Process,Lock
import time

def task(name, mutex):
    mutex.acquire()
    print('%s A' %name)
    time.sleep(1)
    print('%s B' %name)
    time.sleep(1)
    print('%s C' %name)
    mutex.release()

if __name__ == '__main__':
    mutex = Lock()
    for i in range(3):
        p=Process(target=task,args=('进程%s' %i,mutex))
        p.start()


-------------------执行结果---------------- 
进程0 A
进程0 B
进程0 C
进程1 A
进程1 B
进程1 C
进程2 A
进程2 B
进程2 C

2.抢票模拟

多个进程共享同一文件,我们可以把文件当数据库,用多个进程模拟多个人执行抢票任务

db.txt

{"count": 1}
from multiprocessing import Process
import json, time

def search(name):
    time.sleep(1)
    dic = json.load(open('db.txt', 'r', encoding='utf-8'))
    print('<%s> 查看剩余票数:%s' %(name, dic['count']))


def get(name):
    time.sleep(2)
    dic = json.load(open('db.txt', 'r', encoding='utf-8'))
    if dic['count'] > 0:
        dic['count'] -= 1
        time.sleep(1)
        json.dump(dic, open('db.txt', 'w', encoding='utf-8'))
        print('<%s> 购票成功' %name)


def task(name):
    search(name)
    get(name)

if __name__ == '__main__':

    for i in range(10):
        p = Process(target=task, args=('旅客%s'%i,))
        p.start()
        
        
-------------------执行结果---------------- 
<旅客0> 查看剩余票数:1
<旅客1> 查看剩余票数:1
<旅客3> 查看剩余票数:1
<旅客4> 查看剩余票数:1
<旅客5> 查看剩余票数:1
<旅客2> 查看剩余票数:1
<旅客7> 查看剩余票数:1
<旅客8> 查看剩余票数:1
<旅客9> 查看剩余票数:1
<旅客6> 查看剩余票数:1
<旅客0> 购票成功
<旅客1> 购票成功
<旅客3> 购票成功
<旅客4> 购票成功
<旅客5> 购票成功
<旅客2> 购票成功
<旅客7> 购票成功
<旅客8> 购票成功
<旅客9> 购票成功
<旅客6> 购票成功

-------
db.txt: {"count": 0}

所有人查到的都是1张票,然后,10个人都买成功了,这是并行产生的问题

所以,要利用互斥锁,改为串行执行

# db.txt   {"count": 0}
from multiprocessing import Process,Lock
import json, time

def search(name):
    time.sleep(1)
    dic = json.load(open('db.txt', 'r', encoding='utf-8'))
    print('<%s> 查看剩余票数:%s' %(name, dic['count']))


def get(name):

    time.sleep(2)
    dic = json.load(open('db.txt', 'r', encoding='utf-8'))
    if dic['count'] > 0:
        dic['count'] -= 1
        time.sleep(1)
        json.dump(dic, open('db.txt', 'w', encoding='utf-8'))
        print('<%s> 购票成功' %name)



def task(name):
    search(name)
    mutex.acquire()
    get(name, mutex)
    mutex.release()

if __name__ == '__main__':
    mutex = Lock()
    for i in range(10):
        p = Process(target=task, args=('旅客%s' %i, mutex))
        p.start()
      
      
-------------------执行结果----------------         
<旅客1> 查看剩余票数:3
<旅客0> 查看剩余票数:3
<旅客3> 查看剩余票数:3
<旅客2> 查看剩余票数:3
<旅客5> 查看剩余票数:3
<旅客7> 查看剩余票数:3
<旅客4> 查看剩余票数:3
<旅客9> 查看剩余票数:3
<旅客6> 查看剩余票数:3
<旅客8> 查看剩余票数:3
<旅客1> 购票成功
<旅客0> 购票成功
<旅客3> 购票成功

3.互斥锁与join

互斥锁只对局部的针对共享数据的部分进行操作,join会将整个进程变为串行,二互斥锁可以针对一个进程中的局部部分进行处理

将上面抢票的例子改为join:

from multiprocessing import Process,Lock
import json, time

def search(name):
    time.sleep(1)
    dic = json.load(open('db.txt', 'r', encoding='utf-8'))
    print('<%s> 查看剩余票数:%s' %(name, dic['count']))


def get(name):

    time.sleep(2)
    dic = json.load(open('db.txt', 'r', encoding='utf-8'))
    if dic['count'] > 0:
        dic['count'] -= 1
        time.sleep(1)
        json.dump(dic, open('db.txt', 'w', encoding='utf-8'))
        print('<%s> 购票成功' %name)
    else:
        print('<%s> 购票失败' % name)



def task(name):
    search(name)
    # mutex.acquire()
    get(name)
    # mutex.release()

if __name__ == '__main__':
    mutex = Lock()
    for i in range(10):
        # p = Process(target=task, args=('旅客%s' %i, mutex))
        p = Process(target=task, args=('旅客%s' %i,))
        p.start()
        p.join()
        
        
-------------------执行结果----------------  
<旅客0> 查看剩余票数:3
<旅客0> 购票成功
<旅客1> 查看剩余票数:2
<旅客1> 购票成功
<旅客2> 查看剩余票数:1
<旅客2> 购票成功
<旅客3> 查看剩余票数:0
<旅客3> 购票失败
<旅客4> 查看剩余票数:0
<旅客4> 购票失败
<旅客5> 查看剩余票数:0
<旅客5> 购票失败
<旅客6> 查看剩余票数:0
<旅客6> 购票失败
<旅客7> 查看剩余票数:0
<旅客7> 购票失败
<旅客8> 查看剩余票数:0
<旅客8> 购票失败
<旅客9> 查看剩余票数:0
<旅客9> 购票失败

发现使用join将并发改成穿行,确实能保证数据安全,但问题是连查票操作也变成只能一个一个人去查了,很明显大家查票时应该是并发地去查询而无需考虑数据准确与否,此时join与互斥锁的区别就显而易见了,join是将一个任务整体串行,而互斥锁的好处则是可以将一个任务中的某一段代码串行,比如只让task函数中的get任务串行

def task(name,):
    search(name) # 并发执行

    lock.acquire()
    get(name) #串行执行
    lock.release()

4.总结

加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行地修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。

虽然可以用文件共享数据实现进程间通信,但问题是:

1、效率低(共享数据基于文件,而文件是硬盘上的数据)

2、需要自己加锁处理

因此我们最好找寻一种解决方案能够兼顾:

1、效率高(多个进程共享一块内存的数据)

2、帮我们处理好锁问题。

这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。

队列和管道都是将数据存放于内存中,而队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,因而队列才是进程间通信的最佳选择。

我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可扩展性。

五、队列

进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的

Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。

参数:

maxsize是队列中允许最大项数,省略则无大小限制。
但需要明确:
    1、队列内存放的是消息而非大数据
    2、队列占用的是内存空间,因而maxsize即便是无大小限制也受限于内存大小
    

方法:

q.put方法用以插入数据到队列中。
q.get方法可以从队列读取并且删除一个元素。

队列的使用:

from multiprocessing import Queue

q = Queue(3)  # 明确队列里放三个数据

q.put('hello')
q.put({'name':'nurato'})
print(q.full())  # False
q.put([1,2,3])
print(q.full())  # True 判断队列是否放满了

# 在队列已经放满的情况下
# q.put(4)  # 在运行的时候,此时程序会卡在这里,等待队列里取走一个,才能再往队列了放

# q.get() 从队列取数据,先进先出
print(q.get())  # hello
print(q.get())  # {'name': 'nurato'}
print(q.get())  # [1, 2, 3]
print(q.empty())  # False  队列为空,返回True
print(q.get())  # 这个时候再取就卡住了


-------------------执行结果----------------  
False
True
hello
{'name': 'nurato'}
[1, 2, 3]
True

六、生产者消费者模型

为什么要使用生产者消费者模型

生产者指的是生产数据的任务,消费者指的是处理数据的任务,在并发编程中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

什么是生产者和消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

这个阻塞队列就是用来给生产者和消费者解耦的

from multiprocessing import Process, Queue
import time


def producer(q, name):
    for i in range(10):
        res = "%s_蛋糕%s" %(name,i)
        time.sleep(0.5)
        print('%s生产了%s' %(name, res))
        q.put(res)


def consumer(q, name):
    while True:
        res = q.get()
        if res is None: break  #队列是先进先出的,最后进去的三个是None,所以我们用None来做判断
        time.sleep(1)
        print('%s 吃了 (%s)' %(name,res))


if __name__ == "__main__":
    # 容器
    q = Queue()

    # 生产者
    p1 = Process(target=producer, args=(q, '肯德基'))
    p2 = Process(target=producer, args=(q, '德克士'))
    p3 = Process(target=producer, args=(q, '田老师'))


    # 消费者
    c1 = Process(target=consumer, args=(q, '张三'))
    c2 = Process(target=consumer, args=(q, '李四'))


    p1.start()
    p2.start()
    p3.start()

    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()
    q.put(None)  #有几个消费者,就put几个None
    q.put(None)
    q.put(None)
    print('主')
    
  

    
-------------------执行结果----------------  
德克士生产了德克士_蛋糕0
田老师生产了田老师_蛋糕0
肯德基生产了肯德基_蛋糕0
德克士生产了德克士_蛋糕1
田老师生产了田老师_蛋糕1
肯德基生产了肯德基_蛋糕1
德克士生产了德克士_蛋糕2
张三吃了德克士_蛋糕0
田老师生产了田老师_蛋糕2
李四吃了田老师_蛋糕0
肯德基生产了肯德基_蛋糕2
德克士生产了德克士_蛋糕3
田老师生产了田老师_蛋糕3
肯德基生产了肯德基_蛋糕3
张三吃了肯德基_蛋糕0
德克士生产了德克士_蛋糕4
田老师生产了田老师_蛋糕4
李四吃了德克士_蛋糕1
肯德基生产了肯德基_蛋糕4
德克士生产了德克士_蛋糕5
田老师生产了田老师_蛋糕5
肯德基生产了肯德基_蛋糕5
张三吃了田老师_蛋糕1
德克士生产了德克士_蛋糕6
田老师生产了田老师_蛋糕6
李四吃了肯德基_蛋糕1
肯德基生产了肯德基_蛋糕6
德克士生产了德克士_蛋糕7
田老师生产了田老师_蛋糕7
肯德基生产了肯德基_蛋糕7
张三吃了德克士_蛋糕2
德克士生产了德克士_蛋糕8
田老师生产了田老师_蛋糕8
李四吃了田老师_蛋糕2
肯德基生产了肯德基_蛋糕8
德克士生产了德克士_蛋糕9
田老师生产了田老师_蛋糕9
肯德基生产了肯德基_蛋糕9
主
张三吃了肯德基_蛋糕2
李四吃了德克士_蛋糕3
张三吃了田老师_蛋糕3
李四吃了肯德基_蛋糕3
张三吃了德克士_蛋糕4
李四吃了田老师_蛋糕4
张三吃了肯德基_蛋糕4
李四吃了德克士_蛋糕5
张三吃了田老师_蛋糕5
李四吃了肯德基_蛋糕5
张三吃了德克士_蛋糕6
李四吃了田老师_蛋糕6
张三吃了肯德基_蛋糕6
李四吃了德克士_蛋糕7
张三吃了田老师_蛋糕7
李四吃了肯德基_蛋糕7
张三吃了德克士_蛋糕8
李四吃了田老师_蛋糕8
张三吃了肯德基_蛋糕8
李四吃了德克士_蛋糕9
张三吃了田老师_蛋糕9
李四吃了肯德基_蛋糕9
    

以上的方式,只是一种编程思路。实际开发中很少采用

JoinableQueue([maxsize])

这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止

# 基于JoinableQueue实现生产者消费者模型
from multiprocessing import Process, JoinableQueue
import time


def producer(q, name):
    for i in range(2):
        res = 'cake %s' %i
        time.sleep(0.5)
        print('%s 生产了 %s' %(name,res))

        q.put(res)
    q.join()


def consumer(q):
    while True:
        res=q.get()
        if res is None:break
        time.sleep(1)
        print('消费者吃了%s' % res)
        q.task_done()


if __name__ == '__main__':
    # 容器
    q = JoinableQueue()

    # 生产者们
    p1 = Process(target=producer, args=(q,'pro_01' ))
    p2 = Process(target=producer, args=(q,'pro_02' ))
    p3 = Process(target=producer, args=(q,'pro_03' ))


    # 消费者们
    c1 = Process(target=consumer, args=(q, ))
    c2 = Process(target=consumer, args=(q, ))
    c1.daemon = True
    c2.daemon = True

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()
    print('主......')
    
    
-------------------执行结果----------------  
pro_02 生产了 cake 0
pro_01 生产了 cake 0
pro_03 生产了 cake 0
pro_02 生产了 cake 1
pro_01 生产了 cake 1
pro_03 生产了 cake 1
消费者吃了cake 0
消费者吃了cake 0
消费者吃了cake 0
消费者吃了cake 1
消费者吃了cake 1
消费者吃了cake 1
主......

其实我们的思路无非是发送结束信号而已,有另外一种队列提供了这种机制

生产者消费者 模型总结

  • 程序有两类角色:一类负责生产数据(生产者),另一类负责处理数据(消费者)
  • 引入生产者消费者模型可以解决: 平衡生产者与消费者之间的速度差,降低程序耦合
  • 如何实现消费者模型:生产者<--->队列<--->消费者
原文地址:https://www.cnblogs.com/friday69/p/9585032.html