python语法基础-并发编程-进程-进程锁和进程间通信

###############   守护进程  ##############

"""
守护进程

父进程中将一个子进程设置为守护进程,那么这个子进程会随着主进程的结束而结束。
主进程创建守护进程
其一:守护进程会在主进程代码执行结束后就终止
其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children
注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止

"""
# 第一版:主进程结束了,子进程还没有结束,
# import time
# from multiprocessing import Process
#
# def func():
#     while True:
#         time.sleep(1)
#         print("我还活着")
#
#
# if __name__ == '__main__':
#     p=Process(target=func)
#     p.start()
#     i = 0
#     while i<10:
#         time.sleep(1)
#         i+=1
#     print("主进程结束")


# 守护进程,就是主进程代码结束了而结束,记住不是主进程彻底结束,而是代码结束,
import time
from multiprocessing import Process


def func():
    while True:
        time.sleep(1)
        print("我还活着")


if __name__ == '__main__':
    p = Process(target=func)
    p.daemon = True  # 设置子进程为守护进程, #一定要在p.start()前设置,设置p为守护进程
    p.start()
    i = 0
    while i < 5:
        time.sleep(1)
        i += 1
    print("主进程代码结束")

其他的方法:

from multiprocessing import Process
import time
def func(name):
    print("%s在test...."%name)

if __name__ == "__main__":
    p = Process(target=func,args=("andy",))
    p.start()
    print(p.is_alive())  # # 判断一个进程是否活着
    p.terminate()  # 结束一个进程,
    time.sleep(1)
    print(p.is_alive())

##################       进程锁              #####################

"""
互斥锁:
通过刚刚的学习,我们千方百计实现了程序的异步,让多个任务可以同时在几个进程中并发处理, 他们之间的运行没有顺序,一旦开启也不受我们控制。 尽管并发编程让我们能更加充分的利用IO资源,但是也给我们带来了新的问题。 当多个进程使用同一份数据资源的时候,就会因为竞争而引发数据安全或顺序混乱问题。
"""

下面的代码演示了不同的任务争抢一个资源(终端输出)的场景。

from multiprocessing import Process
import time
import random


def task1():
    print('这是 task1 任务'.center(30, '-'))
    print('task1 进了洗手间')
    time.sleep(random.randint(1, 3))
    print('task1 办事呢...')
    time.sleep(random.randint(1, 3))
    print('task1 走出了洗手间')


def task2():
    print('这是 task2 任务'.center(30, '-'))
    print('task2 进了洗手间')
    time.sleep(random.randint(1, 3))
    print('task2 办事呢...')
    time.sleep(random.randint(1, 3))
    print('task2 走出了洗手间')


def task3():
    print('这是 task3 任务'.center(30, '-'))
    print('task3 进了洗手间')
    time.sleep(random.randint(1, 3))
    print('task3 办事呢...')
    time.sleep(random.randint(1, 3))
    print('task3 走出了洗手间')


if __name__ == '__main__':
    p1 = Process(target=task1)
    p2 = Process(target=task2)
    p3 = Process(target=task3)

    p1.start()
    p2.start()
    p3.start()

"""
---------这是 task1 任务----------
task1 进了洗手间
---------这是 task2 任务----------
task2 进了洗手间
---------这是 task3 任务----------
task3 进了洗手间
task3 办事呢...
task1 办事呢...
task3 走出了洗手间
task2 办事呢...
task2 走出了洗手间
task1 走出了洗手间
"""

通过加锁来控制

from multiprocessing import Process, Lock
import time
import random

# 生成一个互斥锁
mutex_lock = Lock()


def task1(lock):
    # 锁门
    lock.acquire()
    print('这是 task1 任务'.center(30, '-'))
    print('task1 进了洗手间')
    time.sleep(random.randint(1, 3))
    print('task1 办事呢...')
    time.sleep(random.randint(1, 3))
    print('task1 走出了洗手间')
    # 释放锁
    lock.release()


def task2(lock):
    # 锁门
    lock.acquire()
    print('这是 task2 任务'.center(30, '-'))
    print('task2 进了洗手间')
    time.sleep(random.randint(1, 3))
    print('task2 办事呢...')
    time.sleep(random.randint(1, 3))
    print('task2 走出了洗手间')
    # 释放锁
    lock.release()


def task3(lock):
    # 锁门
    lock.acquire()
    print('这是 task3 任务'.center(30, '-'))
    print('task3 进了洗手间')
    time.sleep(random.randint(1, 3))
    print('task3 办事呢...')
    time.sleep(random.randint(1, 3))
    print('task3 走出了洗手间')
    # 释放锁
    lock.release()


if __name__ == '__main__':
    p1 = Process(target=task1, args=(mutex_lock, ))
    p2 = Process(target=task2, args=(mutex_lock, ))
    p3 = Process(target=task3, args=(mutex_lock, ))

    # 释放新建进程的信号,具体谁先启动无法确定
    p1.start()
    p2.start()
    p3.start()

"""
---------这是 task2 任务----------
task2 进了洗手间
task2 办事呢...
task2 走出了洗手间
---------这是 task1 任务----------
task1 进了洗手间
task1 办事呢...
task1 走出了洗手间
---------这是 task3 任务----------
task3 进了洗手间
task3 办事呢...
task3 走出了洗手间


"""

买票的案例:

并发出错:

from multiprocessing import Process, Lock
import json
import time
import random
import os


def search():
    time.sleep(0.5)
    with open('db.json', 'r', encoding='utf8') as f:
        data = json.load(f)
        print('剩余票数:{}'.format(data.get('count')))


def buy():

    with open('db.json', 'r', encoding='utf8') as f:
        data = json.load(f)
    if data.get('count', 0) > 0:
        data['count'] -= 1
        time.sleep(random.randint(1, 3))
        with open('db.json', 'w', encoding='utf8') as f2:
            json.dump(data, f2)
        print('{}购票成功!'.format(os.getpid()))
    else:
        print('购票失败')


def task():
    search()  # 查票并发
    buy()  # 串行买票


if __name__ == '__main__':
    for i in range(10):
        p = Process(target=task)
        p.start()

加上锁

from multiprocessing import Process, Lock
import json
import time
import random
import os

# 设置互斥锁
mutex_lock = Lock()


def search():
    time.sleep(0.5)
    with open('db.json', 'r', encoding='utf8') as f:
        data = json.load(f)
        print('剩余票数:{}'.format(data.get('count')))


def buy():

    with open('db.json', 'r', encoding='utf8') as f:
        data = json.load(f)
    if data.get('count', 0) > 0:
        data['count'] -= 1
        time.sleep(random.randint(1, 3))
        with open('db.json', 'w', encoding='utf8') as f2:
            json.dump(data, f2)
        print('{}购票成功!'.format(os.getpid()))
    else:
        print('购票失败')


def task(lock):
    search()  # 查票并发
    lock.acquire()
    buy()  # 串行买票
    lock.release()


if __name__ == '__main__':
    for i in range(10):
        p = Process(target=task, args=(mutex_lock, ))
        p.start()

###############         进程间的通信         ##############

"""
进程间的三种通信(IPC)方式:

方式一:队列(推荐使用)

方式二:管道(不推荐使用,了解即可)
管道相当于队列,但是管道不自动加锁

方式三:共享数据(不推荐使用,了解即可)
共享数据也没有自动加锁的功能,所以还是推荐用队列的。感兴趣的可以研究研究管道和共享数据

"""

###############    进程间的通信---队列   ##############

"""
Queue介绍
我们可以创建一个共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。 Queue的实例q常用方法: ################################### Queue([maxsize]) 创建共享的进程队列。 参数 :maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。 底层队列使用管道和锁定实现。 q.get( [ block [ ,timeout ] ] ) 返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。 block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。 timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。 q.get_nowait( ) 同q.get(False)方法。 q.put(item [, block [,timeout ] ] ) 将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。 block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。 timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。 q.qsize() 返回队列中目前项目的正确数量。 此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。 在某些系统上,此方法可能引发NotImplementedError异常。 q.empty() 如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。 也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。 q.full() 如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。。
"""

基本的队列操作:

'''
multiprocessing模块支持进程间通信的两种主要形式:管道和队列
都是基于消息传递实现的,但是队列接口
'''

from multiprocessing import Queue
q=Queue(3)

#put ,get ,put_nowait,get_nowait,full,empty
q.put(3)
q.put(3)
q.put(3)
# q.put(3)   # 如果队列已经满了,程序就会停在这里,等待数据被别人取走,再将数据放入队列。
           # 如果队列中的数据一直不被取走,程序就会永远停在这里。
try:
    q.put_nowait(3) # 可以使用put_nowait,如果队列满了不会阻塞,但是会因为队列满了而报错。
except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去,但是会丢掉这个消息。
    print('队列已经满了')

# 因此,我们再放入数据之前,可以先看一下队列的状态,如果已经满了,就不继续put了。
print(q.full()) #满了

print(q.get())
print(q.get())
print(q.get())
# print(q.get()) # 同put方法一样,如果队列已经空了,那么继续取就会出现阻塞。
try:
    q.get_nowait(3) # 可以使用get_nowait,如果队列满了不会阻塞,但是会因为没取到值而报错。
except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去。
    print('队列已经空了')

print(q.empty()) #空了

上面还没有设计到进程间的通信,下面看一个简单的主进程和子进程之间通信的例子:

import time
from multiprocessing import Process, Queue

def f(q):
    q.put([time.asctime(), 'hi', 'hello'])  #调用主函数中p进程传递过来的进程参数 put函数为向队列中添加一条数据。

if __name__ == '__main__':
    q = Queue() #创建一个Queue对象
    p = Process(target=f, args=(q,)) #创建一个进程
    p.start()
    print(q.get())
    p.join()

############       生产者消费者模型        ##############

"""
什么是生产者消费者模式?两个角色、一个场所
两个角色:
产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者;
一个场所:
生产者和消费者之间的中介就叫做缓冲区。

为什么要使用生产者和消费者模式?
如果不使用这种模式,
那么生产者就必须等待消费者处理完,才能继续生产数据。这就阻塞了,不能并发,
同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。
为了解决这种生产消费能力不均衡的问题,所以便有了生产者和消费者模式。

使用了这种模式:
解决生产者和消费者的强耦合问题,生产者不需要等待消费者消费完了才可以生产了,而是直接扔给阻塞队列,
消费者也不需要等待生产者了,直接到阻塞队列取数据,
阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力

生产者/消费者模型的优点:
1、解耦,即降低生产者和消费者之间的依赖关系。
2、支持并发,即生产者和消费者可以是两个独立的并发主体,互不干扰的运行。
3、支持忙闲不均,平衡了生产者和消费者的处理能力
"""

# 队列的生产者和消费者模型
# 买包子的例子
# 有蒸包子的人,这就是生产者,有买包子的人,这就是消费者,
# 实际中,可能会有数据供需不平衡的问题,
# 就是数据生产的多了没有消费,所以我们要增加消费者,或者减少生产
# 数据消费的多了,我们要增加生产者,来解决这个问题,

# 基于队列实现生产者消费者模型

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=(q,))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,))

    #开始
    p1.start()
    c1.start()
    print('')

此时的问题是主进程永远不会结束,原因是:生产者p在生产完后就结束了,但是消费者c在取空了q之后,则一直处于死循环中且卡在q.get()这一步。

解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就可以break出死循环。

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到结束信号则结束
        time.sleep(random.randint(1,3))
        print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res))
    q.put(None) #发送结束信号
if __name__ == '__main__':
    q=Queue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=(q,))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,))

    #开始
    p1.start()
    c1.start()
    print('')

注意:结束信号None,不一定要由生产者发,主进程里同样可以发,但主进程需要等生产者结束后才应该发送该信号

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到结束信号则结束
        time.sleep(random.randint(1,3))
        print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))

def producer(q):
    for i in range(2):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=(q,))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,))

    #开始
    p1.start()
    c1.start()

    p1.join()
    q.put(None) #发送结束信号
    print('')

但上述解决方式,在有多个生产者和多个消费者时,我们则需要用一个很low的方式去解决

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到结束信号则结束
        time.sleep(random.randint(1,3))
        print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))

def producer(name,q):
    for i in range(2):
        time.sleep(random.randint(1,3))
        res='%s%s' %(name,i)
        q.put(res)
        print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=('包子',q))
    p2=Process(target=producer,args=('骨头',q))
    p3=Process(target=producer,args=('泔水',q))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,))
    c2=Process(target=consumer,args=(q,))

    #开始
    p1.start()
    p2.start()
    p3.start()
    c1.start()

    p1.join() #必须保证生产者全部生产完毕,才应该发送结束信号
    p2.join()
    p3.join()
    q.put(None) #有几个消费者就应该发送几次结束信号None
    q.put(None) #发送结束信号
    print('')

# 问题:有多个消费者的时候,只有一个消费者拿到这个值,然后结束了,但是拿不到的消费者,就还没有结束,
# 这种写法太麻烦了,如果有1000个还得了,怎么办?使用新的一个模块:JoinableQueue

JoinableQueue([maxsize]) 
创建可连接的共享进程队列。这就像是一个Queue对象,但队列允许项目的使用者通知生产者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。 

"""
JoinableQueue的实例p除了与Queue对象相同的方法之外,还具有以下方法:

q.task_done()
使用者使用此方法发出信号,表示q.get()
返回的项目已经被处理。如果调用此方法的次数大于从队列中删除的项目数量,将引发ValueError异常。

q.join()
生产者将使用此方法进行阻塞,直到队列中所有项目均被处理。阻塞将持续到为队列中的每个项目均调用q.task_done()
方法为止。
下面的例子说明如何建立永远运行的进程,使用和处理队列上的项目。生产者将项目放入队列,并等待它们被处理。

"""

from multiprocessing import Process, JoinableQueue
import time, random, os


def consumer(q):
    while True:
        res = q.get()
        time.sleep(random.randint(1, 3))
        print('33[45m%s 吃 %s33[0m' % (os.getpid(), res))
        q.task_done()  # 向q.join()发送一次信号,证明一个数据已经被取走了


def producer(name, q):
    for i in range(10):
        time.sleep(random.randint(1, 3))
        res = '%s%s' % (name, i)
        q.put(res)
        print('33[44m%s 生产了 %s33[0m' % (os.getpid(), res))
    q.join()  # 生产完毕,使用此方法进行阻塞,直到队列中所有项目均被处理。


if __name__ == '__main__':
    q = JoinableQueue()
    # 生产者们:即厨师们
    p1 = Process(target=producer, args=('包子', q))
    p2 = Process(target=producer, args=('骨头', q))
    p3 = Process(target=producer, args=('泔水', q))

    # 消费者们:即吃货们
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))
    c1.daemon = True
    c2.daemon = True

    # 开始
    p_l = [p1, p2, p3, c1, c2]
    for p in p_l:
        p.start()

    p1.join()
    p2.join()
    p3.join()
    print('')

    # 主进程等--->p1,p2,p3等---->c1,c2
    # p1,p2,p3结束了,证明c1,c2肯定全都收完了p1,p2,p3发到队列的数据
    # 因而c1,c2也没有存在的价值了,不需要继续阻塞在进程中影响主进程了。应该随着主进程的结束而结束,所以设置成守护进程就可以了。

###################################################

原文地址:https://www.cnblogs.com/andy0816/p/12289717.html