生产者消费者模型 线程池

1.生产者消费者模型

  主要是为解耦

  借助队列来实现生产者消费这模型

  栈:先进后出(First In Last Out 简称:FILO)

  队列:先进先出(FIFO)

import queue

  from multiprocessing import Queue 借助Queue解决生产者消费这模型队列是安全的

  q=Queue(m)

  

q = Queue(num)
num : 队列的最大长度
q.get()# 阻塞等待获取数据,如果有数据直接获取,如果没有数据,阻塞等待
q.put()# 阻塞,如果可以继续往队列中放数据,就直接放,不能放就阻塞等待

q.get_nowait()# 不阻塞,如果有数据直接获取,没有数据就报错
q.put_nowait()# 不阻塞,如果可以继续往队列中放数据,就直接放,不能放就报错

(2)from multiprocessing import JoinableQueue#可连接的队列
JoinableQueue是继承Queue,所以可以使用Queue中的方法
并且JoinableQueue又多了两个方法
q.join()# 用于生产者。等待 q.task_done的返回结果,通过返回结果,生产者就能获得消费者当前消费了多少个数据
q.task_done() # 用于消费者,是指每消费队列中一个数据,就给join返回一个标识。

2.管道

from multiprocessing import Pipe
con1,con2 = Pipe()
管道是不安全的。
管道是用于多进程之间通信的一种方式。
如果在单进程中使用管道,那么就是con1收数据,就是con2发数据。
如果是con1发数据,就是con2收数据

如果在多进程中使用管道,那么就必须是父进程使用con1收,子进程就必须使用con2发
父进程使用con1发,子进程就必须使用con2收
父进程使用con2收,子进程就必须使用con1发
父进程使用con2发,子进程就必须使用con1收
在管道中有一个著名的错误叫做EOFError。是指,父进程中如果关闭了发送端,子进程还继续接收数据,那么就会引发EOFError。

3 进程之间的共享内存
from multiprocessing import Manager,Value
m = Manager()
num = m.dict({键 : 值})
num = m.list([1,2,3])

4 进程池
进程池:一个池子,里边有固定数量的进程。这些进程一直处于待命状态,一旦有任务来,马上就有进程去处理。
因为在实际业务中,任务量是有多有少的,如果任务量特别的多,不可能要开对应那么多的进程数
开启那么多进程首先就需要消耗大量的时间让操作系统来为你管理它。其次还需要消耗大量时间让
cpu帮你调度它。
进程池还会帮程序员去管理池中的进程。
from multiprocessing import Pool
p = Pool(os.cpu_count() + 1)

进程池有三个方法:
map(func,iterable)
func:进程池中的进程执行的任务函数
iterable: 可迭代对象,是把可迭代对象中的每个元素依次传给任务函数当参数

apply(func,args=()): 同步的效率,也就是说池中的进程一个一个的去执行任务
func:进程池中的进程执行的任务函数
args: 可迭代对象型的参数,是传给任务函数的参数
同步处理任务时,不需要close和join
同步处理任务时,进程池中的所有进程是普通进程(主进程需要等待其执行结束)

apply_async(func,args=(),callback=None): 异步的效率,也就是说池中的进程一次性都去执行任务
func:进程池中的进程执行的任务函数
args: 可迭代对象型的参数,是传给任务函数的参数
callback: 回调函数,就是说每当进程池中有进程处理完任务了,返回的结果可以交给回调函数,由回调函数进行进一步的处理,回调函数只有异步才有,同步是没有的
异步处理任务时,进程池中的所有进程是守护进程(主进程代码执行完毕守护进程就结束)
异步处理任务时,必须要加上close和join

回调函数的使用:
进程的任务函数的返回值,被当成回调函数的形参接收到,以此进行进一步的处理操作
回调函数是由主进程调用的,而不是子进程,子进程只负责把结果传递给回调函数

原文地址:https://www.cnblogs.com/wszxdzd/p/9519815.html