进程Queue、线程Queue、堆栈、生产者消费者模型

没学队列之前,可以用文件实现进程之间通信

但是有2个问题:

  1. 速度慢:文件是保存在硬盘空间

  2. 为了数据安全要加锁(处理锁是一件很麻烦的事,容易死锁,建议自己轻易不要处理锁)

队列:队列是基于管道加锁实现的

进程Queue

进程队列:

from multiprocessing import Queue

q = Queue(3)
q.put([1,2])
q.put('a')
q.put(1)


print(q.get())
print(q.get())
print(q.get())
##
# [1, 2]
# a
# 1

取值的过程中,可能发生夯住的情况

程序卡住1:取的比放进去的多

from multiprocessing import Queue

q = Queue(3)
q.put(1)


print(q.get())
print(q.get())       # 放了1个,取了2次(第二次管道空了,就是取空),第2次程序卡住了(锁住了)

程序卡住2:放进去的比管道设置的多

from multiprocessing import Queue

q = Queue(3)
q.put(1)
q.put(2)
q.put(2)
q.put(4)        #设置管道为3,放进去4个,程序卡住了(锁住了)


print(q.get())
print(q.get())

总之:取的次数不能超过放进去的次数,放进去的次数不能超过管道设置的次数

q = Queue( )

可以不设置大小,但是内存不是无限大的

不可以放大的数据(比如一个视频)(可以放一个视频连接)

线程Queue

线程队列:

import queue

q = queue.Queue(3)
q.put(1)
q.put(2)

print(q.get())
print(q.get())

# 1
# 2

线程--堆栈

队列:先进先出

堆栈:后进先出

import queue

q = queue.LifoQueue(3)
q.put(1)
q.put(2)
q.put(3)

print(q.get())
print(q.get())
print(q.get())


# 3
# 2
# 1

优先级队列

import queue

q = queue.PriorityQueue(3)
q.put((2,'a'))              #参数是一个元组,元组的第一个元素是一个数字,数字越小,优先级越高
q.put((-2,'b'))
q.put((0,'c'))

print(q.get())
print(q.get())
print(q.get())

# (-2, 'b')
# (0, 'c')
# (2, 'a')

生产者和消费者模型

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

为什么要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

如果只产生数据,不处理数据,就不需要用这个模型。

示例:

  生产者和消费者中间加个队列,结耦

  生产者和队列 消费者和队列

  生产者产生的数据放进队列里面,消费者处理数据就从队列里面拿值

import time, os
from multiprocessing import Process, Queue


def task(q):
    for i in range(10):
        res = '第%s个包子' % (i + 1)
        time.sleep(0.2)
        q.put(res)
        print('%s生产了%s' % (os.getpid(),res))


def eat(q):
    while True:
        res = q.get()
        if res is None:
            break
        time.sleep(0.5)
        print('路人甲吃了%s' % res)
        


if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=task, args=(q,))
    p2 = Process(target=eat, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    q.put(None)

#######
768生产了第1个包子
768生产了第2个包子
768生产了第3个包子
路人甲吃了第1个包子
768生产了第4个包子
768生产了第5个包子
768生产了第6个包子
路人甲吃了第2个包子
768生产了第7个包子
768生产了第8个包子
路人甲吃了第3个包子
768生产了第9个包子
768生产了第10个包子
路人甲吃了第4个包子
路人甲吃了第5个包子
路人甲吃了第6个包子
路人甲吃了第7个包子
路人甲吃了第8个包子
路人甲吃了第9个包子
路人甲吃了第10个包子

Queue是生产者给消费者发送信号

JoinableQueue 消费者给生产者发送信号

import time, os
from multiprocessing import Process, JoinableQueue


def task(q):
    for i in range(3):
        res = '第%s个包子' % (i + 1)
        time.sleep(0.2)
        q.put(res)
        print('%s生产了%s' % (os.getpid(), res))
    q.join()                                       #生产者生产完数据后,查看一共接收了多少个信号,如果信号数量<放进去的数据量,等待;当信号数=放进去的数据数  时,生产者执行完毕(两个数量相等,也说明了处理数据的消费者也把数据处理完了,也需要关闭了)(放在循环外面)
#生产者结束: 消费者确实把所有数据都收到

def eat(q):
    while True:
        res = q.get()
        time.sleep(0.5)
        print('%s吃了%s' % (os.getpid(), res))
        q.task_done()                   #消费者每次拿一个值,就给生产者发送一个信号(放在循环里面,且把数据处理完之后再发送)


if __name__ == '__main__':
    q = JoinableQueue()
    p = Process(target=task, args=(q,))
    c = Process(target=eat, args=(q,))
    c.daemon = True                 #之所以要把消费者变成守护,是因为生产者可以得到信号,自己结束,但消费者不知道什么时候结束,就会一直等待(从队列取空),变成守护后,下面的p.join()结束后,代表主进程结束,数据处理完了,消费者没必要存在了,正好主进程强制把守护关闭
    p.start()
    c.start()
    p.join()            #等待生产者结束的目的就是关闭守护(消费者没必要存在了)
原文地址:https://www.cnblogs.com/zhzhlong/p/9297377.html