并发编程之多进程(实践)

Python多进程模块

multiprocessing模块介绍

python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程。Python提供了multiprocessing。 multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似。

multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。

注意: 进程是系统资源分配的最小单位, 所以进程间不共享全局变量

Process类介绍

multiprocessing模块中有一个很重要的类叫做Process类, Process类是创建进程的类

Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)

强调:
1. 需要使用关键字的方式来指定参数
2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号

参数介绍:
1 group参数未使用,值始终为None
2 target表示调用对象,即子进程要执行的任务
3 args表示调用对象的位置参数元组,args=(1,2,'egon',)
4 kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}
5 name为子进程的名称

方法介绍:
1 p.start():启动进程,并调用该子进程中的p.run() 
2 p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法  
3 p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
4 p.is_alive():如果p仍然运行,返回True
5 p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程 

属性介绍:
1 p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置
2 p.name:进程的名称
3 p.pid:进程的pid
4 p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)
5 p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)

注意:在windows中Process()必须放到# if __name__ == '__main__':下

创建进程

创建进程就是在内存中申请一块内存空间将需要运行的代码丢进去。

一个进程对应在内存中就是一块独立的内存空间, 多个进程对应在内存中就是多块独立的内存空间

进程与进程之间数据默认情况下是无法直接交互的。即进程之间默认不共享全局变量

创建进程的方式有两种: 基于Process类创建进程和继承Process类创建进程

基于Process类创建进程

from multiprocessing import Process
import time


def task(name):
    print('{} is running'.format(name))
    time.sleep(3)
    print('{} is over'.format(name))


if __name__ == '__main__':
    # 1. 创建对象
    # 容器类型就算里面只有一个元素, 建议一定要用逗号隔开
    p = Process(target=task, args=('feather',))
    # 2. 开启进程
    # 告诉操作系统帮你创建一个进程  异步
    p.start()
    print('主进程')

继承Process类创建进程

from multiprocessing import Process
import time


class MyProcess(Process):
    def run(self) -> None:
        print('start...')
        time.sleep(3)
        print('stop...')


if __name__ == '__main__':
    p = MyProcess()
    p.start()
    print('主进程')

join方法介绍

join方法主要用途是让主进程等待子进程运行结束。

from multiprocessing import Process
import time


def task(name, n):
    print('%s is running' % name)
    time.sleep(n)
    print('%s is over' % name)


if __name__ == '__main__':
    p1 = Process(target=task, args=('featherwit', 1))
    p2 = Process(target=task, args=('zhangjie', 2))
    p3 = Process(target=task, args=('xiaoming', 3))

    start_time = time.time()
    p1.start()
    p2.start()
    p3.start()

    # p.join()  # 主进程等待子进程运行结束之后再继续往后执行
    p1.join()
    p2.join()
    p3.join()

    print('主进程结束...', time.time() - start_time)  # 主进程结束... 3.0125980377197266
p.join()是让主线程等待p的结束,卡住的是主线程而绝非进程p。

进程只要start就会在开始运行了,所以p1-p3.start()时,系统中已经有四个并发的进程了

而我们p1.join()是在等p1结束,没错p1只要不结束主线程就会一直卡在原地,这也是问题的关键

join是让主线程等,而p1-p3仍然是并发执行的,p1.join的时候,其余p2,p3,p4仍然在运行,等3p1.join结束,可能p2,p3早已经结束了,这样p2.join,p3.join直接通过检测,无需等待

所以4个join花费的总时间仍然是耗费时间最长的那个进程运行的时间

同理, 上述代码可以使用for循环进行实现

from multiprocessing import Process
import time


def task(name, n):
    print('%s is running' % name)
    time.sleep(n)
    print('%s is over' % name)


if __name__ == '__main__':
    start_time = time.time()
    p_list = []
    for i in range(1, 4):
        p = Process(target=task, args=('子进程%i' % i, i))
        p_list.append(p)
        p.start()

    for p in p_list:
        p.join()

    print('主进程结束...', time.time() - start_time)  # 主进程结束... 3.0123398303985596

进程间数据互相隔离的问题

from multiprocessing import Process

money = 100


def task():
    global money  # 局部修改全局变量
    money = 666


if __name__ == '__main__':
    p = Process(target=task)
    p.start()
    p.join()
    print(money)  # 100

进程对象及其他方法

查看当前进程的id

一台计算机上面运行着很多进程, 那么计算机是如何区分并管理这些进程服务的呢?

计算机会给每一个运行的进程分配一个PID号。

例1: 使用current_process方法查看当前进程的PID

def task():
    print('%s is running' % current_process().pid)  # 查看当前进程的进程号

if __name__ == '__main__':
    p = Process(target=task)
    p.start()
    print('主进程: ', current_process().pid)

例2: 使用os模块查看当前进程的PID

def task():
    print('%s is running' % os.getpid())  # 查看当前进程的进程号


if __name__ == '__main__':
    p = Process(target=task)
    p.start()
    print('主进程: ', os.getpid())

查看当前进程的父进程的id

from multiprocessing import Process, current_process
import os

def task():
    print('%s is running' % os.getpid())  # 查看当前进程的进程号
    print('子进程的父进程号', os.getppid())


if __name__ == '__main__':
    p = Process(target=task)
    p.start()
    print('主进程: ', os.getpid())
    print('主进程的主进程: ', os.getppid())  # 获取父进程的PID

杀死进程以及判断进程是否存活

使用terminate()方法可以杀死当前进程, 然后可以通过is_alive()方法判断当前进程是否存活

def task():
    print('%s is running' % os.getpid())  # 查看当前进程的进程号

if __name__ == '__main__':
    p = Process(target=task)
    p.start()
    p.terminate()  # 杀死当前进程
    print(p.is_alive())  # 判断当前进程是否存活

神奇的是, 这里最后会返回True, 表示进程是存活的, 这是因为terminate()是告诉操作系统帮你去杀死当前进程, 但是需要一定的时间, 而代码的运行速度是非常快的, 所以返回True

僵尸进程及孤儿进程

僵尸进程

一个进程使用fork创建子进程,如果子进程退出,而父进程并没有调用wait或waitpid获取子进程的状态信息,那么子进程的进程描述符仍然保存在系统中。这种进程称之为僵死进程。

我们知道在unix/linux中,正常情况下子进程是通过父进程创建的,子进程再创建新的进程。子进程的结束和父进程的运行是一个异步过程,即父进程永远无法预测子进程到底什么时候结束,如果子进程一结束就立刻回收其全部资源,那么在父进程内将无法获取子进程的状态信息。

因此,unix提供了一种机制可以保证父进程可以在任意时刻获取子进程结束时的状态信息:

  1. 在每个进程退出的时候,内核释放该进程所有的资源,包括打开的文件,占用的内存等。但是仍然为其保留一定的信息(包括进程号the process ID,退出状态the termination status of the process,运行时间the amount of CPU time taken by the process等)

  2. 直到父进程通过wait/waitpid来取时才释放. 但这样就导致了问题,如果进程不调用wait/waitpid的话,那么保留的那段信息就不会释放,其进程号就会一直被占用,但是系统所能使用的进程号是有限的,如果大量的产生僵死进程,将因为没有可用的进程号而导致系统不能产生新的进程. 此即为僵尸进程的危害,应当避免

任何一个子进程(init除外)在exit()之后,并非马上就消失掉,而是留下一个称为僵尸进程(Zombie)的数据结构,等待父进程处理。这是每个子进程在结束时都要经过的阶段。如果子进程在exit()之后,父进程没有来得及处理,这时用ps命令就能看到子进程的状态是“Z”。如果父进程能及时 处理,可能用ps命令就来不及看到子进程的僵尸状态,但这并不等于子进程不经过僵尸状态。 如果父进程在子进程结束之前退出,则子进程将由init接管。init将会以父进程的身份对僵尸状态的子进程进行处理。

孤儿进程

一个父进程退出,而它的一个或多个子进程还在运行,那么那些子进程将成为孤儿进程。孤儿进程将被init进程(进程号为1)所收养,并由init进程对它们完成状态收集工作。

孤儿进程是没有父进程的进程,孤儿进程这个重任就落到了init进程身上,init进程就好像是一个福利院,专门负责处理孤儿进程的善后工作。每当出现一个孤儿进程的时候,内核就把孤儿进程的父进程设置为init,而init进程会循环地wait()它的已经退出的子进程。这样,当一个孤儿进程凄凉地结束了其生命周期的时候,init进程就会代表党和政府出面处理它的一切善后工作。因此孤儿进程并不会有什么危害。

回收子进程pid号的方法

在操作系统中, 需要回收子进程的pid号, 才能解决僵尸进程的问题, 那么回收子进程的pid号的方法有如下两种:

  • 父进程等待子进程结束
  • 父进程调用join方法, join方法中调用了wait,告诉系统释放僵尸进程

调用了join()方法后, 在父进程中还是能够看到子进程的pid的, 这是因为p.join()是像操作系统发送请求,告知操作系统p的id号不需要再占用了,回收就可以, 此时在父进程内还可以看到p.pid,但此时的p.pid是一个无意义的id号,因为操作系统已经将该编号回收

守护进程

守护进行会随着主进程结束而结束

主进程创建守护进程

  其一:守护进程会在主进程代码执行结束后就终止

  其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children

注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止

from multiprocessing import Process
import time


def task(name):
    print('%s正常活着' % name)
    time.sleep(3)
    print('%s正常死亡' % name)


if __name__ == '__main__':
    p = Process(target=task, args=('featherwit',))
    # 这一句一定要放在start方法上面才有效否则会直接报错
    p.daemon = True  # 将进程p设置成守护进程
    p.start()
    print('主进程死亡...')

互斥锁

进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,而共享带来的是竞争,竞争带来的结果就是错乱

from multiprocessing import Process
import json
import time
import random

json_str = json.dumps({"ticket_num": 1})


# 查票
def search(i):
    global json_str

    # 读取票数
    dic = json.loads(json_str)
    # 自动取值不要用[]的形式, 推荐使用get
    print('用户%s查询余票: %s' % (i, dic.get('ticket_num')))


# 买票 1. 先查  2. 再买
def buy(i):
    global json_str

    # 先查票
    dic = json.loads(json_str)
    # 模拟网络延时
    time.sleep(random.randint(1, 3))
    # 判断当前是否有余票
    if dic.get('ticket_num') > 0:
        # 买票, 修改余票数量
        dic['ticket_num'] -= 1
        print('%s买票成功' % i)
    else:
        print('%s买票失败' % i)


# 整合上面两个函数
def run(i):
    search(i)
    buy(i)


if __name__ == '__main__':
    for i in range(1, 11):
        p = Process(target=run, args=(i, ))
        p.start()

如何控制,就是加锁处理,  将并发变成了串行, 牺牲效率都是保证了数据的安全

from multiprocessing import Process, Lock
import json
import time
import random

json_str = json.dumps({"ticket_num": 1})


# 查票
def search(i):
    global json_str

    # 读取票数
    dic = json.loads(json_str)
    # 自动取值不要用[]的形式, 推荐使用get
    print('用户%s查询余票: %s' % (i, dic.get('ticket_num')))


# 买票 1. 先查  2. 再买
def buy(i):
    global json_str

    # 先查票
    dic = json.loads(json_str)
    # 模拟网络延时
    time.sleep(random.randint(1, 3))
    # 判断当前是否有余票
    if dic.get('ticket_num') > 0:
        # 买票, 修改余票数量
        dic['ticket_num'] -= 1
        print('%s买票成功' % i)
    else:
        print('%s买票失败' % i)


# 整合上面两个函数
def run(i, mutex):
    search(i)
    # 给买票环节加锁处理
    # 抢锁
    mutex.acquire()
    buy(i)
    # 释放锁
    mutex.release()


if __name__ == '__main__':
    # 在主进程中生成一把锁, 让所有的子进程抢, 谁先抢到, 谁先买票
    mutex = Lock()
    for i in range(1, 11):
        p = Process(target=run, args=(i, mutex))
        p.start()

加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
虽然可以用文件共享数据实现进程间通信,但问题是:

  • 效率低(共享数据基于文件,而文件是硬盘上的数据)
  • 需要自己加锁处理

因此我们最好找寻一种解决方案能够兼顾:

  • 效率高(多个进程共享一块内存的数据)
  • 帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。

队列和管道都是将数据存放于内存中
队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,
我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。

进程通信

进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的

管道: subprocess
队列: 管道 + 锁

Queue类介绍

Queue创建队列的类(底层就是以管道和锁定的方式实现)

# 创建
Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。

# 参数介绍
maxsize是队列中允许最大项数,省略则无大小限制。

# 方法介绍
q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.

q.get_nowait():同q.get(False)
q.put_nowait():同q.put(False)

q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
q.close() 
关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,如果某个使用者正被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。

q.cancel_join_thread() 
不会再进程退出时自动连接后台线程。这可以防止join_thread()方法阻塞。

q.join_thread() 
连接队列的后台线程。此方法用于在调用q.close()方法后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread()方法可以禁止这种行为。

队列的用法

队列类的简单实用

import queue

# 创建一个队列
q = queue.Queue()  # 括号内可以传数字, 表示生成的队列最大可以同时存放的数据量

# 往队列中存数据
q.put(111)
q.put(222)
q.put(333)
print(q.full())  # 判断当前队列是否满了
print(q.empty())  # 判断当前队列是否空了
q.put(444)
q.put(555)
# q.put(666)  # 当队列中数据放满之后, 如果还有数据要放, 那么程序会阻塞直到有位置让出来, 不会报错

# 在队列中取数据
v1 = q.get()
v2 = q.get()
v3 = q.get()
v4 = q.get()
v5 = q.get()
# v6 = q.get() # 当队列中如果已经没有数据的话, get()方法会阻塞
# v6 = q.get_nowait()  # 没有数据直接报错  _queue.Empty
# v6 = q.get(timeout=3)  # 没有数据之后等待3秒之后再报错  _queue.Empty
print(v1, v2, v3, v4, v5)

队列类与多进程结合的实例

from multiprocessing import Queue, Process


def producer(q):
    q.put('我是23号技师, 很高兴为你服务')


def consumer(q):
    print(q.get())


if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=consumer, args=(q,))
    p1.start()
    p2.start()

生产者消费者模型

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

基于队列的生产者消费者模型

from multiprocessing import Process, Queue
import time
import random


# 生产者
def producer(name, food, q):
    for i in range(1, 5):
        data = '%s生产了%s%s个' % (name, food, i)
        # 模拟延迟
        time.sleep(random.randint(1, 3))
        print(data)
        # 将数据放入队列中
        q.put(data)


# 消费者
def consumer(name, q):
    while True:
        food = q.get()  # 没有数据就会卡住
        time.sleep(random.randint(1, 3))
        print('%s吃了%s' % (name, food))


if __name__ == '__main__':
    q = Queue()

    p1 = Process(target=producer, args=('YXC', 'LS', q))
    p2 = Process(target=producer, args=('DZ', 'SG', q))

    c1 = Process(target=consumer, args=('featherwit', q))
    c2 = Process(target=consumer, args=('zhangjie', q))

    p1.start()
    p2.start()

    c1.start()
    c2.start()
View Code

此时的问题是主进程永远不会结束,原因在于生产者在生产完数据之后就结束了, 但是消费者在取空了q之后,则一直处于死循环中且卡在q.get()这一步。

解决方法无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就可以break出死循环。

from multiprocessing import Process, Queue
import time
import random


# 生产者
def producer(name, food, q):
    for i in range(1, 5):
        data = '%s生产了%s%s个' % (name, food, i)
        # 模拟延迟
        time.sleep(random.randint(1, 3))
        print(data)
        # 将数据放入队列中
        q.put(data)


# 消费者
def consumer(name, q):
    while True:
        food = q.get()  # 没有数据就会卡住
        # 判断当前是否有结束的标识
        if food is None: break
        time.sleep(random.randint(1, 3))
        print('%s吃了%s' % (name, food))


if __name__ == '__main__':
    q = JoinableQueue()
    p1 = Process(target=producer, args=('YXC', 'LS', q))
    p2 = Process(target=producer, args=('DZ', 'SG', q))

    c1 = Process(target=consumer, args=('featherwit', q))
    c2 = Process(target=consumer, args=('zhangjie', q))

    p1.start()
    p2.start()

    c1.start()
    c2.start()

    p1.join()
    p2.join()

    # 等待生产者生产完毕之后, 往队列中添加特定的结束符号
    q.put(None)
    q.put(None)
View Code

但是在有多个生产者和消费者的时候, 如果只在队列中添加了一个结束信号, 即只有一个消费者拿到None退出循环, 但是其他的消费者还是会一直处于死循环中且卡在q.get()这一步

解决方法是有多少个消费者就发多少个结束信号, 这是一种很low的做法, 可以使用multiprocessing里的JoinableQueue类来实现这个做法。

JoinableQueue类介绍

JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

#参数介绍:
maxsize是队列中允许最大项数,省略则无大小限制。    

#方法介绍:
JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止

JoinableQueue的原理就是, 每当你往队列中存入数据的时候, 内部会有一个计数器+1, 每当你调用task_done()的时候该计数器-1, q.join()方法会在计数器为0的时候才往后运行代码。

使用JoinableQueue实现生产者消费者模型

from multiprocessing import Process, JoinableQueue
import time
import random


# 生产者
def producer(name, food, q):
    for i in range(1, 5):
        data = '%s生产了%s%s个' % (name, food, i)
        # 模拟延迟
        time.sleep(random.randint(1, 3))
        print(data)
        # 将数据放入队列中
        q.put(data)


# 消费者
def consumer(name, q):
    while True:
        food = q.get()  # 没有数据就会卡住
        time.sleep(random.randint(1, 3))
        print('%s吃了%s' % (name, food))
        q.task_done()  # 告诉队列你已经从里面取出了一个数据并且处理完毕


if __name__ == '__main__':
    q = JoinableQueue()
    p1 = Process(target=producer, args=('YXC', 'LS', q))
    p2 = Process(target=producer, args=('DZ', 'SG', q))

    c1 = Process(target=consumer, args=('featherwit', q))
    c2 = Process(target=consumer, args=('zhangjie', q))

    p1.start()
    p2.start()

    # 将消费者设置为守护进程
    c1.daemon = True
    c2.daemon = True
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    q.join()  # 等待队列中所有的数据被取完再往下执行代码

只要q.join()执行完毕, 说明消费者已经处理完数据, 消费者就没有存在的必要了。那么需要将消费者设置为守护进程的模式才能退出。

原文地址:https://www.cnblogs.com/featherwit/p/13373191.html