对列 、生产者与消费者

一、对列的特点:先进先出   put()和get()方法

from multiprocessing import Process,Queue
q = Queue()
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())

结果:

1
2
3
二、q.get_nowait()和q.put_nowait() 的用法
2.1 当对列中有值,get()直接取值,对列没有值,get()就会阻塞,get_nowait()会直接报错。需要用到 try...except
from multiprocessing import Process,Queue
import queue
q = Queue()
q.put(1)
try:
    print(q.get_nowait())  # 有值取值,没值则queue.Empty报错
except queue.Empty:
    print('empty')

结果:

有值:
1
没值:
empty  

2.2 对列是有长度的,Queue(2)设置长度为2的对列。当持续put(),长度超多时,使用put()程序会阻塞,数据无法放进去,当使用put_nowait(),则直接报错,数据无法放进去。由于数据可能会丢失,所以很少用到这个put_nowait()。

from multiprocessing import Process,Queue
q = Queue(2)
q.put(1)
q.put(2)
q.put_nowait(3)    # 直接报错

三、判断对列是否为空、满

from multiprocessing import Process,Queue
import queue
# 先进先出
q = Queue(2)
q.put(1)
q.put(2)
print(q.empty())
print(q.full())

结果:
False
True

但是在多进程中,这个判断不准

 四、使用对列在进程间的相互通讯

from multiprocessing import Process,Queue
def consume(q):
    print('son:',q.get())
    q.put('asd')
if __name__ == '__main__':
    q = Queue()
    p = Process(target= consume,args=(q,))
    p.start()
    q.put({'123':123})
    p.join()
    print('Foo:',q.get())

结果:
son: {'123': 123}
Foo: asd

五、生产者与消费者

import time
import random
from multiprocessing import Process,Queue   # 默写
def consumer(q,name):
  # 消费者
while True: food = q.get() if food is None:break time.sleep(random.uniform(0.3,0.8)) print('%s吃了一个%s'%(name,food)) def producer(q,name,food):
  # 生产者
for i in range(10): time.sleep(random.uniform(0.3,0.8)) print('%s生产了%s%s'%(name,food,i)) q.put(food + str(i)) if __name__ == '__main__': q = Queue() c1 = Process(target=consumer,args=(q,'alex')) c1.start() p1 = Process(target=producer, args=(q, '杨忠和','泔水')) p1.start() p1.join() q.put(None)

结果为:

杨忠和生产了泔水0
杨忠和生产了泔水1
alex吃了一个泔水0
杨忠和生产了泔水2
alex吃了一个泔水1
杨忠和生产了泔水3
alex吃了一个泔水2
杨忠和生产了泔水4
alex吃了一个泔水3
杨忠和生产了泔水5
alex吃了一个泔水4
杨忠和生产了泔水6
alex吃了一个泔水5
杨忠和生产了泔水7
alex吃了一个泔水6
杨忠和生产了泔水8
alex吃了一个泔水7
杨忠和生产了泔水9
alex吃了一个泔水8
alex吃了一个泔水9

5.2 JoinableQueue 类

import time
import  random
from multiprocessing import Process,JoinableQueue
def consumer(jq,name):
    # 处理数据
    while True:
        food = jq.get()
        time.sleep(random.uniform(0.3,0.8))
        print('%s吃了一个%s'%(name,food))
        jq.task_done()
def producer(jq,name,food):
    # 获取数据
    for i in range(10):
        time.sleep(random.uniform(0.3,0.8))
        print('%s生产了%s%s'%(name,food,i))
        jq.put(food + str(i))
if __name__ == '__main__':
    jq = JoinableQueue()
    c1 = Process(target=consumer,args=(jq,'alex'))
    c2 = Process(target=consumer,args=(jq,'wuser'))
    c1.daemon = True
    c2.daemon = True
    c1.start()
    c2.start()
    p1 = Process(target=producer, args=(jq, '杨宗和','泔水'))
    p2 = Process(target=producer, args=(jq, '何思浩','鱼刺鱼骨头'))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    jq.join()
JoinableQueue
put
get
task_done 通知对列已经有一个数据被处理了
q.join() 阻塞直到放入对列中所有的数据都被处理掉(有多少个数据就接受多少个taskdone)


原文地址:https://www.cnblogs.com/youhongliang/p/9681386.html