python D31 守护进程、进程锁、队列

今日内容:

守护进程:主进程的代码运行结束,守护进程跟着结束

进程同步:

同步锁(互斥锁)  ***** 重点

保证数据安全用的,但是将锁起来的那段代码的执行变成了同步串行,牺牲了效率,保证了安全

L = Lock()

L.acquire()

数据操作

L.release()

信号量  

Semaphore 内部维护了一个计数器,acquire - 1,release + 1,当计数器为0的时候,阻塞其他进程

事件  

e = Event, 事件初识状态为False,当事件为Flase,e.wait()的地方会阻塞,e.set()e的状态改为True,如果e的状态为True,wait的地方就不等待了,不阻塞了,e.clear()e的状态改为False,  查看e的状态:is_set,

进程通信(IPC)

队列  ***** 重点

队列:进程安全的,能够保证数据安全,

Q = queue(4) ,给队列设定一个固定的长度,q.put(),q.get(),q.get_nowait(),q.get(False),

Q.put()在队列满了的时候会阻塞和q.get在队列空了之后会阻塞

Q.put_nowait()

Q.full q.empty

生产者消费者模型

缓冲区解耦的事情

Joinablequeue,能记录着你往队列里面put的数据量

其他方法和queue是一样的,queue多了两个方法:q.task_done,q.join

一、守护进程

主进程创建守护进程

      其一:守护进程会在主进程代码执行结束后就终止

      其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children

    注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止

import os
import time
from multiprocessing import Process

class Myprocess(Process):
    def __init__(self,person):
        super().__init__()
        self.person = person
    def run(self):
        print(os.getpid(),self.name)
        print('%s正在和女主播聊天' %self.person)
        time.sleep(3)
if __name__ == '__main__':
    p=Myprocess('太白')
    p.daemon=True #一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行
    p.start()
    # time.sleep(1) # 在sleep时linux下查看进程id对应的进程ps -ef|grep id
    print('')
守护进程

二、进程同步—加锁

进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理。

import os
import time
import random
from multiprocessing import Process

def work(n):
    print('%s: %s is running' %(n,os.getpid()))
    time.sleep(random.random())
    print('%s:%s is done' %(n,os.getpid()))

if __name__ == '__main__':
    for i in range(5):
        p=Process(target=work,args=(i,))
        p.start()

# 看结果:通过结果可以看出两个问题:问题一:每个进程中work函数的第一个打印就不是按照我们for循环的0-4的顺序来打印的
#问题二:我们发现,每个work进程中有两个打印,但是我们看到所有进程中第一个打印的顺序为0-2-1-4-3,但是第二个打印没有按照这个顺序,变成了2-1-0-3-4,说明我们一个进程中的程序的执行顺序都混乱了。
#问题的解决方法,第二个问题加锁来解决,第一个问题是没有办法解决的,因为进程开到了内核,有操作系统来决定进程的调度,我们自己控制不了
# 0: 9560 is running
# 2: 13824 is running
# 1: 7476 is running
# 4: 11296 is running
# 3: 14364 is running

# 2:13824 is done
# 1:7476 is done
# 0:9560 is done
# 3:14364 is done
# 4:11296 is done
多进程抢占输出资源,导致打印混乱
#由并发变成了串行,牺牲了运行效率,但避免了竞争
from multiprocessing import Process,Lock
import os,time
def work(n,lock):
    #加锁,保证每次只有一个进程在执行锁里面的程序,这一段程序对于所有写上这个锁的进程,大家都变成了串行
    lock.acquire()
    print('%s: %s is running' %(n,os.getpid()))
    time.sleep(1)
    print('%s:%s is done' %(n,os.getpid()))
    #解锁,解锁之后其他进程才能去执行自己的程序
    lock.release()
if __name__ == '__main__':
    lock=Lock()
    for i in range(5):
        p=Process(target=work,args=(i,lock))
        p.start()

#打印结果:
# 2: 10968 is running
# 2:10968 is done
# 0: 7932 is running
# 0:7932 is done
# 4: 4404 is running
# 4:4404 is done
# 1: 12852 is running
# 1:12852 is done
# 3: 980 is running
# 3:980 is done

#结果分析:(自己去多次运行一下,看看结果,我拿出其中一个结果来看)通过结果我们可以看出,多进程刚开始去执行的时候,每次运行,首先打印出来哪个进程的程序是不固定的,但是我们解决了上面打印混乱示例代码的第二个问题,那就是同一个进程中的两次打印都是先完成的,然后才切换到下一个进程去,打印下一个进程中的两个打印结果,说明我们控制住了同一进程中的代码执行顺序,如果涉及到多个进程去操作同一个数据或者文件的时候,就不担心数据算错或者文件中的内容写入混乱了。
加锁,保证数据安全,牺牲效率
#加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
虽然可以用文件共享数据实现进程间通信,但问题是:
1.效率低(共享数据基于文件,而文件是硬盘上的数据)
2.需要自己加锁处理

#因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。
队列和管道都是将数据存放于内存中
队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,
我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。

IPC通信机制(了解):IPC是intent-Process Communication的缩写,含义为进程间通信或者跨进程通信,是指两个进程之间进行数据交换的过程。IPC不是某个系统所独有的,任何一个操作系统都需要有相应的IPC机制,
比如Windows上可以通过剪贴板、管道和邮槽等来进行进程间通信,而Linux上可以通过命名共享内容、信号量等来进行进程间通信。Android它也有自己的进程间通信方式,Android建构在Linux基础上,继承了一
部分Linux的通信方式。
进程锁总结

信号量

互斥锁同时只允许一个线程更改数据,而信号量Semaphore是同时允许一定数量的线程更改数据 。
假设商场里有4个迷你唱吧,所以同时可以进去4个人,如果来了第五个人就要在外面等待,等到有人出来才能再进去玩。
实现:
信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞。这是迪科斯彻(Dijkstra)信号量概念P()和V()的Python实现。信号量同步机制适用于访问像服务器这样的有限资源。
信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念
比如大保健:提前设定好,一个房间只有4个床(计数器现在为4),那么同时只能四个人进来,谁先来的谁先占一个床(acquire,计数器减1),4个床满了之后(计数器为0了),第五个人就要等着,等其中一个人出来(release,计数器加1),他就去占用那个床了。

from multiprocessing import Process,Semaphore
import time,random

def go_ktv(sem,user):
    sem.acquire()
    print('%s 占到一间ktv小屋' %user)
    time.sleep(random.randint(0,3)) #模拟每个人在ktv中待的时间不同
    sem.release()

if __name__ == '__main__':
    sem=Semaphore(4)
    p_l=[]
    for i in range(13):
        p=Process(target=go_ktv,args=(sem,'user%s' %i,))
        p.start()
        p_l.append(p)

    for i in p_l:
        i.join()
    print('============》')
信号量使用

三、队列

进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的。队列就像一个特殊的列表,但是可以设置固定长度,并且从前面插入数据,从后面取出数据,先进先出。

q = Queue([maxsize]) 
创建共享的进程队列。maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。底层队列使用管道和锁定实现。另外,还需要运行支持线程以便队列中的数据传输到底层管道中。 
Queue的实例q具有以下方法:

q.get( [ block [ ,timeout ] ] ) 
返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。

q.get_nowait( ) 
同q.get(False)方法。

q.put(item [, block [,timeout ] ] ) 
将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。

q.qsize() 
返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。


q.empty() 
如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。

q.full() 
如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。。
queue方法介绍

queue的其他方法(了解)

q.close() 
关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,如果某个使用者正被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。

q.cancel_join_thread() 
不会再进程退出时自动连接后台线程。这可以防止join_thread()方法阻塞。

q.join_thread() 
连接队列的后台线程。此方法用于在调用q.close()方法后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread()方法可以禁止这种行为。
#看下面的队列的时候,按照编号看注释
import time
from multiprocessing import Process, Queue

# 8. q = Queue(2) #创建一个Queue对象,如果写在这里,那么在windows还子进程去执行的时候,我们知道子进程中还会执行这个代码,但是子进程中不能够再次创建了,也就是这个q就是你主进程中创建的那个q,通过我们下面在主进程中先添加了一个字符串之后,在去开启子进程,你会发现,小鬼这个字符串还在队列中,也就是说,我们使用的还是主进程中创建的这个队列。
def f(q):
    # q = Queue() #9. 我们在主进程中开启了一个q,如果我们在子进程中的函数里面再开一个q,那么你下面q.put('姑娘,多少钱~')添加到了新创建的这q里里面了
    q.put('姑娘,多少钱~')  #4.调用主函数中p进程传递过来的进程参数 put函数为向队列中添加一条数据。
    # print(q.qsize()) #6.查看队列中有多少条数据了

def f2(q):
    print('》》》》》》》》')
    print(q.get())  #5.取数据
if __name__ == '__main__':
    q = Queue() #1.创建一个Queue对象
    q.put('小鬼')

    p = Process(target=f, args=(q,)) #2.创建一个进程
    p2 = Process(target=f2, args=(q,)) #3.创建一个进程
    p.start()
    p2.start()
    time.sleep(1) #7.如果阻塞一点时间,就会出现主进程运行太快,导致我们在子进程中查看qsize为1个。
    # print(q.get()) #结果:小鬼
    print(q.get()) #结果:姑娘,多少钱~
    p.join()
基于队列的即时通信

下面我们来看一个叫做生产者消费者模型的东西:

      在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

    为什么要使用生产者和消费者模式

      在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

    什么是生产者消费者模式

      生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力,并且我可以根据生产速度和消费速度来均衡一下多少个生产者可以为多少个消费者提供足够的服务,就可以开多进程等等,而这些进程都是到阻塞队列或者说是缓冲区中去获取或者添加数据。

    通俗的解释:看图说话。。背景有点乱,等我更新~~

    

    那么我们基于队列来实现一个生产者消费者模型,代码示例:

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=(q,))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,))

    #开始
    p1.start()
    c1.start()
    print('')
基于生产者消费者模型

生产者消费者模型总结:

#生产者消费者模型总结

#程序中有两类角色
一类负责生产数据(生产者)
一类负责处理数据(消费者)

#引入生产者消费者模型为了解决的问题是:
平衡生产者与消费者之间的工作能力,从而提高程序整体处理数据的速度

#如何实现:
生产者<-->队列<——>消费者
#生产者消费者模型实现类程序的解耦和

生产者消费者模型总结

通过上面基于队列的生产者消费者代码示例,我们发现一个问题:主进程永远不会结束,原因是:生产者p在生产完后就结束了,但是消费者c在取空了q之后,则一直处于死循环中且卡在q.get()这一步。

    解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就可以break出死循环

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到结束信号则结束
        time.sleep(random.randint(1,3))
        print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))

def producer(q):
    for i in range(5):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res))
    q.put(None) #在自己的子进程的最后加入一个结束信号
if __name__ == '__main__':
    q=Queue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=(q,))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,))

    #开始
    p1.start()
    c1.start()

    print('')
子进程生产者在生产完毕后发送结束信号None
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到结束信号则结束
        time.sleep(random.randint(1,3))
        print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))

def producer(q):
    for i in range(2):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=(q,))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,))

    #开始
    p1.start()
    c1.start()

    p1.join() #等待生产者进程结束
    q.put(None) #发送结束信号
    print('')
主进程在生产者生产完毕后发送结束信号None

但上述解决方式,在有多个生产者和多个消费者时,由于队列我们说了是进程安全的,我一个进程拿走了结束信号,另外一个进程就拿不到了,还需要多发送一个结束信号,有几个取数据的进程就要发送几个结束信号,我们则需要用一个很low的方式去解决

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到结束信号则结束
        time.sleep(random.randint(1,3))
        print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))

def producer(name,q):
    for i in range(2):
        time.sleep(random.randint(1,3))
        res='%s%s' %(name,i)
        q.put(res)
        print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res))



if __name__ == '__main__':
    q=Queue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=('包子',q))
    p2=Process(target=producer,args=('骨头',q))
    p3=Process(target=producer,args=('泔水',q))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,))
    c2=Process(target=consumer,args=(q,))

    #开始
    p1.start()
    p2.start()
    p3.start()
    c1.start()

    p1.join() #必须保证生产者全部生产完毕,才应该发送结束信号
    p2.join()
    p3.join()
    q.put(None) #有几个消费者就应该发送几次结束信号None
    q.put(None) #发送结束信号
    print(''
有多个消费者和生产者的时候需要发送多次结束信号

JoinableQueue([maxsize]) 

#JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

   #参数介绍:
    maxsize是队列中允许最大项数,省略则无大小限制。    
  #方法介绍:
    JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
    q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
    q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止,也就是队列中的数据全部被get拿走了。
from multiprocessing import Process,JoinableQueue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        # time.sleep(random.randint(1,3))
        time.sleep(random.random())
        print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))
        q.task_done() #向q.join()发送一次信号,证明一个数据已经被取走并执行完了

def producer(name,q):
    for i in range(10):
        # time.sleep(random.randint(1,3))
        time.sleep(random.random())
        res='%s%s' %(name,i)
        q.put(res)
        print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res))
    print('%s生产结束'%name)
    q.join() #生产完毕,使用此方法进行阻塞,直到队列中所有项目均被处理。
    print('%s生产结束~~~~~~'%name)

if __name__ == '__main__':
    q=JoinableQueue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=('包子',q))
    p2=Process(target=producer,args=('骨头',q))
    p3=Process(target=producer,args=('泔水',q))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,))
    c2=Process(target=consumer,args=(q,))
    c1.daemon=True #如果不加守护,那么主进程结束不了,但是加了守护之后,必须确保生产者的内容生产完并且被处理完了,所有必须还要在主进程给生产者设置join,才能确保生产者生产的任务被执行完了,并且能够确保守护进程在所有任务执行完成之后才随着主进程的结束而结束。
    c2.daemon=True

    #开始
    p_l=[p1,p2,p3,c1,c2]
    for p in p_l:
        p.start()

    p1.join() #我要确保你的生产者进程结束了,生产者进程的结束标志着你生产的所有的人任务都已经被处理完了
    p2.join()
    p3.join()
    print('')
    
    # 主进程等--->p1,p2,p3等---->c1,c2
    # p1,p2,p3结束了,证明c1,c2肯定全都收完了p1,p2,p3发到队列的数据
    # 因而c1,c2也没有存在的价值了,不需要继续阻塞在进程中影响主进程了。应该随着主进程的结束而结束,所以设置成守护进程就可以了
JoinableQueue队列实现生产者消费者模型

四、时间

python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。

    事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。

clear:将“Flag”设置为False
set:将“Flag”设置为True
from multiprocessing import Process,Semaphore,Event
import time,random

e = Event() #创建一个事件对象
print(e.is_set())  #is_set()查看一个事件的状态,默认为False,可通过set方法改为True
print('look here!')
# e.set()          #将is_set()的状态改为True。
# print(e.is_set())#is_set()查看一个事件的状态,默认为False,可通过set方法改为Tr
# e.clear()        #将is_set()的状态改为False
# print(e.is_set())#is_set()查看一个事件的状态,默认为False,可通过set方法改为Tr
e.wait()           #根据is_set()的状态结果来决定是否在这阻塞住,is_set()=False那么就阻塞,is_set()=True就不阻塞
print('give me!!')

#set和clear  修改事件的状态 set-->True   clear-->False
#is_set     用来查看一个事件的状态
#wait       依据事件的状态来决定是否阻塞 False-->阻塞  True-->不阻塞
时间的使用方法
from multiprocessing import Process, Event
import time, random

def car(e, n):
    while True:
        if not e.is_set():  # 进程刚开启,is_set()的值是Flase,模拟信号灯为红色
            print('33[31m红灯亮33[0m,car%s等着' % n)
            e.wait()    # 阻塞,等待is_set()的值变成True,模拟信号灯为绿色
            print('33[32m车%s 看见绿灯亮了33[0m' % n)
            time.sleep(random.randint(2,4))
            if not e.is_set():   #如果is_set()的值是Flase,也就是红灯,仍然回到while语句开始
                continue
            print('车开远了,car', n)
            break

# def police_car(e, n):
#     while True:
#         if not e.is_set():# 进程刚开启,is_set()的值是Flase,模拟信号灯为红色
#             print('33[31m红灯亮33[0m,car%s等着' % n)
#             e.wait(0.1) # 阻塞,等待设置等待时间,等待0.1s之后没有等到绿灯就闯红灯走了
#             if not e.is_set():
#                 print('33[33m红灯,警车先走33[0m,car %s' % n)
#             else:
#                 print('33[33;46m绿灯,警车走33[0m,car %s' % n)
#         break

def traffic_lights(e, inverval):
    while True:
        time.sleep(inverval)
        if e.is_set():
            print('######', e.is_set())
            e.clear()  # ---->将is_set()的值设置为False
        else:
            e.set()    # ---->将is_set()的值设置为True
            print('***********',e.is_set())


if __name__ == '__main__':
    e = Event()
    for i in range(10):
        p=Process(target=car,args=(e,i,))  # 创建10个进程控制10辆车
        time.sleep(random.random(1, 3))    #车不是一下子全过来
        p.start()

    # for i in range(5):
    #     p = Process(target=police_car, args=(e, i,))  # 创建5个进程控制5辆警车
    #     p.start()

    #信号灯必须是单独的进程,因为它不管你车开到哪了,我就按照我红绿灯的规律来闪烁变换,对吧
    t = Process(target=traffic_lights, args=(e, 5))  # 创建一个进程控制红绿灯
    t.start()

    print('预备~~~~开始!!!')
通过事件模拟红绿灯实例






原文地址:https://www.cnblogs.com/z520h123/p/10034280.html