IPC机制,线程

一.IPC机制

  1.队列,及其方法

from multiprocessing import Process,Queue
# 实例化创建队列
"""
队列中的方法:
    1.put,向队列中添加数据
    2.get,从队列中取数据
    3,full,判断队列是否满了
    4,empty,判断队列是否为空
    5,get_nowait,从队列取值,没值不等待直接报错
    6,实例化产生队列时,Queue()括号内可以传数字,表示该队列中的数据量多少,不传为默认值(很大默认无穷)
"""
q=Queue(3)  # 最大存储数为3
q.put(1)
q.put(2)
q.put(3)
print(q.full())  # 判断队列是否满了
# q.put(4)  # 不会报错,会卡在这,等从队列中取出一个数据

print(q.get())
print(q.get())
print(q.get())
print(q.empty())  # 判断队列是否为空
# print(q.get())  # 不会报错,会卡在这,等待队列传值
print(q.get_nowait())  # 判断队列是否为空,空直接报错

ps:full,empty,get_nowait都不适用与多进程  

  2.IPC机制(管道加队列)

队列:创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。 

from multiprocessing import  Process,Queue

def put(q):
    q.put('为社么不是你来放')
    print(q)

def get(q):
    print(q.get('为社么不是你来取'))

if __name__ == '__main__':
    q=Queue()
    p=Process(target=put,args=(q,))
    g=Process(target=get,args=(q,))
    p.start()
    g.start()

  3.生产者消费者模型

1.基于队列产生的消费者生产者模型

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=(q,))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,))

    #开始
    p1.start()
    c1.start()
    print('')
生产者消费者模型1.0版本

  此时的问题是主进程永远不会结束,原因是:生产者p在生产完后就结束了,但是消费者c在取空了q之后,则一直处于死循环中且卡在q.get()这一步。

  解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就可以break出死循环。

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到结束信号则结束
        time.sleep(random.randint(1,3))
        print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res))
    q.put(None) #发送结束信号
if __name__ == '__main__':
    q=Queue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=(q,))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,))

    #开始
    p1.start()
    c1.start()
    print('')

改良版——生产者消费者模型
2.0版本

但上述解决方式,在有多个生产者和多个消费者时,我们则需要用一个很low的方式去解决

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到结束信号则结束
        time.sleep(random.randint(1,3))
        print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))

def producer(name,q):
    for i in range(2):
        time.sleep(random.randint(1,3))
        res='%s%s' %(name,i)
        q.put(res)
        print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=('包子',q))
    p2=Process(target=producer,args=('骨头',q))
    p3=Process(target=producer,args=('泔水',q))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,))
    c2=Process(target=consumer,args=(q,))

    #开始
    p1.start()
    p2.start()
    p3.start()
    c1.start()

    p1.join() #必须保证生产者全部生产完毕,才应该发送结束信号
    p2.join()
    p3.join()
    q.put(None) #有几个消费者就应该发送几次结束信号None
    q.put(None) #发送结束信号
    print('')

多个消费者的例子:有几个消费者就需要发送几次结束信号
多个消费者生产者

2.使用JoinableQueue

创建可连接的共享进程队列。这就像是一个Queue对象,但队列允许项目的使用者通知生产者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。 

JoinableQueue的实例p除了与Queue对象相同的方法之外,还具有以下方法:

q.task_done() 
使用者使用此方法发出信号,表示q.get()返回的项目已经被处理。如果调用此方法的次数大于从队列中删除的项目数量,将引发ValueError异常。

q.join() 
生产者将使用此方法进行阻塞,直到队列中所有项目均被处理。阻塞将持续到为队列中的每个项目均调用q.task_done()方法为止。 
下面的例子说明如何建立永远运行的进程,使用和处理队列上的项目。生产者将项目放入队列,并等待它们被处理。

方法介绍
from multiprocessing import Process,JoinableQueue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))
        q.task_done() #向q.join()发送一次信号,证明一个数据已经被取走了

def producer(name,q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='%s%s' %(name,i)
        q.put(res)
        print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res))
    q.join() #生产完毕,使用此方法进行阻塞,直到队列中所有项目均被处理。


if __name__ == '__main__':
    q=JoinableQueue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=('包子',q))
    p2=Process(target=producer,args=('骨头',q))
    p3=Process(target=producer,args=('泔水',q))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,))
    c2=Process(target=consumer,args=(q,))
    c1.daemon=True
    c2.daemon=True

    #开始
    p_l=[p1,p2,p3,c1,c2]
    for p in p_l:
        p.start()

    p1.join()
    p2.join()
    p3.join()
    print('') 
    
    #主进程等--->p1,p2,p3等---->c1,c2
    #p1,p2,p3结束了,证明c1,c2肯定全都收完了p1,p2,p3发到队列的数据
    #因而c1,c2也没有存在的价值了,不需要继续阻塞在进程中影响主进程了。应该随着主进程的结束而结束,所以设置成守护进程就可以了。

JoinableQueue队列实现消费之生产者模型
JoinQueue版本

二.线程

  1.初识线程

什么是线程:
    进程线程其实都是虚拟单位,都是用来帮助我们形象的描述某种事物

    进程:资源单位
    线程:执行单位
        将内存比如成工厂
        那么进程就相当于是工厂里面的车间
        而你的线程就相当于是车间里面的流水线
    ps:每个进程都自带一个线程,线程才是真正的执行单位,进程只是在线程运行过程中
    提供代码运行所需要的资源

为什么要有线程
    开进程
        1.申请内存空间  耗资源
        2."拷贝代码"    耗资源

    开线程
        一个进程内可以起多个线程,并且线程与线程之间数据是共享的
    ps:开启线程的开销要远远小于开启进程的开销

  2.创建线程的两种方法

import time
from threading import Thread

def eat(name):
    print('%s is eatting'%name)
    time.sleep(3)
    print('%s is waking'%name)

t=Thread(target=eat,args=('xu',))  # 创建线程可以不再main内,但是进程必须再,
t.start()
print('')


class MyThread(Thread):
    def __init__(self,name):
        super().__init__()
        self.name=name


    def run(self):
        print('%s is eatting' % self.name)
        time.sleep(3)
        print('%s is waking'%self.name)

if __name__ == '__main__':
    t=MyThread('xu')
    t.start()
    print('')

ps:创建线程的代价远小于进程,所以在向系统发出创建线程的命令时,可能线程就创建好了

  3.线程对象及其方法

from threading import Thread,current_thread,active_count
import time
import os

def task(name,i):
    print('%s is running'%name)
    print('子current_thread:',current_thread().name)  # 获取线程名
    print('',os.getpid())  # 获取进程的pid码
    time.sleep(i)

    print('%s is over'%name)
# 开线程不需要在__main__代码块内 但是习惯性的还是写在__main__代码块内
t = Thread(target=task,args=('egon',1))
t1 = Thread(target=task,args=('jason',2))
t.start()  # 告诉操作系统开辟一个线程  线程的开销远远小于进程
t1.start()  # 告诉操作系统开辟一个线程  线程的开销远远小于进程
t1.join()  # 主线程等待子线程运行完毕
print('当前正在活跃的线程数',active_count())  # 获取活着的线程
# 小的代码执行完 线程就已经开启了
print('')
print('主current_thread:',current_thread().name)
print('',os.getpid())

ps:正在活跃的线程数为1是因为在等待t1的过程中t早已结束,所以存活的进程数为1只剩下主进程了

  4.守护线程

from threading import Thread,current_thread
import time



def task(i):
    print(current_thread().name)
    time.sleep(i)
    print('GG')
# for i in range(3):
#     t = Thread(target=task,args=(i,))
#     t.start()
t = Thread(target=task,args=(1,))
t.daemon = True
t.start()
print('')
# 主线程运行结束之后需要等待子线程结束才能结束呢?
"""
主线程的结束也就意味着进程的结束
主线程必须等待其他非守护线程的结束才能结束
(意味子线程在运行的时候需要使用进程中的资源,而主线程一旦结束了资源也就销毁了)
"""
from threading import Thread
import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")

if __name__ == '__main__':
    t1=Thread(target=foo)
    t2=Thread(target=bar)
    t1.daemon=True
    t1.start()
    t2.start()
    print("main-------")
迷惑人的小例子

出现上述结果的原因是因为在程序运行中,主线程的结束意味着进程的结束,进程一结束,假设内部线程还未结束,线程就没有资源可以使用了,所以主线程会等待所有非守护线程结束才结束.所以在这个立体中主线程会等待bar线程的结束才结束,而当bar线程结束时foo已经执行完了,所以输出结果如上图所示

  5.线程间通信

from threading import Thread


money = 666

def task():
    global money
    money = 999

t = Thread(target=task)
t.start()
t.join()
print(money)  # 999

ps:进程的创建是在内存中开辟一块空间,而线程是在进程的内部的,所以在同一进程内部的线程都是使用那个进程中的资源,所以线程间的通信的没有隔阂的

  6.线程互斥锁

from threading import Thread,Lock
import time


n = 100

def task(mutex):
    global  n
    mutex.acquire()
    tmp = n
    time.sleep(0.1)
    n = tmp - 1
    mutex.release()

t_list = []
mutex = Lock()
for i in range(100):
    t = Thread(target=task,args=(mutex,))
    t.start()
    t_list.append(t)
for t in t_list:
    t.join()
print(n)
原文地址:https://www.cnblogs.com/z929chongzi/p/11340842.html