Manage,管道的简单应用,进程池,队列的简单应用

day37---Manage,管道的简单应用,进程池,队列的简单应用

今日内容:

1 生产者消费者模型
主要是为解耦
借助队列来实现生产者消费者模型

栈:先进后出(First In Last Out 简称 FILO)
队列: 先进先出(First In First Out 简称 FIFO)


import queue # 不能进行多进程之间的数据传输
(1)from multiprocessing import Queue 借助Queue解决生产者消费者模型
队列是安全的。
q = Queue(num)
num : 队列的最大长度
q.get()# 阻塞等待获取数据,如果有数据直接获取,如果没有数据,阻塞等待
q.put()# 阻塞,如果可以继续往队列中放数据,就直接放,不能放就阻塞等待

q.get_nowait()# 不阻塞,如果有数据直接获取,没有数据就报错
q.put_nowait()# 不阻塞,如果可以继续往队列中放数据,就直接放,不能放就报错

(2)from multiprocessing import JoinableQueue#可连接的队列
JoinableQueue是继承Queue,所以可以使用Queue中的方法
并且JoinableQueue又多了两个方法
q.join()# 用于生产者。等待 q.task_done的返回结果,通过返回结果,生产者就能获得消费者当前消费了多少个数据
q.task_done() # 用于消费者,是指每消费队列中一个数据,就给join返回一个标识。

2 管道(了解)
from multiprocessing import Pipe
con1,con2 = Pipe()
管道是不安全的。
管道是用于多进程之间通信的一种方式。
如果在单进程中使用管道,那么就是con1收数据,就是con2发数据。
如果是con1发数据,就是con2收数据

如果在多进程中使用管道,那么就必须是父进程使用con1收,子进程就必须使用con2发
父进程使用con1发,子进程就必须使用con2收
父进程使用con2收,子进程就必须使用con1发
父进程使用con2发,子进程就必须使用con1收
在管道中有一个著名的错误叫做EOFError。是指,父进程中如果关闭了发送端,子进程还继续接收数据,那么就会引发EOFError。
from multiprocessing import Pipe

con1, con2 = Pipe()

con1.send('abc')
print(con2.recv())
con2.send(123)
print(con1.recv())
单线程内的管道


from multiprocessing import Pipe, Process


def func(con):
    con1, con2 = con
    con1.close()  # 子进程使用con2和父进程通信,所以
    while 1:
        try:
            print(con2.recv())  # 当主进程的con1发数据时,子进程要死循环的去接收。
        except EOFError:  # 如果主进程的con1发完数据并关闭con1,子进程的con2继续接收时,就会报错,使用try的方式,获取错误
            con2.close()  # 获取到错误,就是指子进程已经把管道中所有数据都接收完了,所以用这种方式去关闭管道
            break


if __name__ == '__main__':
    con1, con2 = Pipe()
    p = Process(target=func, args=((con1, con2),))
    p.start()
    con2.close()  # 在父进程中,使用con1去和子进程通信,所以不需要con2,就提前关闭
    for i in range(10):  # 生产数据
        con1.send(i)  # 给子进程的con2发送数据
    con1.close()  # 生产完数据,关闭父进程这一端的管道
多进程下的管道


3 进程之间的共享内存
from multiprocessing import Manager,Value
m = Manager()
num = m.dict({键 : 值})
num = m.list([1,2,3])
# Manager  子进程和主进程之间共享内存的
from multiprocessing import Process, Manager


def func(num):
    num[0] -= 1
    print('子进程中的num的值是', num)


if __name__ == '__main__':
    m = Manager()                       # 实例化一个Manager 对象
    num = m.list([1, 2, 3])
    print(num)
    p = Process(target=func, args=(num,))   #  开启一个进程
    p.start()                               # 执行进程
    p.join()                                # 进程设置为同步
    print('父进程中的num的值是', num)
Manager共享内存


4 进程池
进程池:一个池子,里边有固定数量的进程。这些进程一直处于待命状态,一旦有任务来,马上就有进程去处理。
因为在实际业务中,任务量是有多有少的,如果任务量特别的多,不可能要开对应那么多的进程数
开启那么多进程首先就需要消耗大量的时间让操作系统来为你管理它。其次还需要消耗大量时间让
cpu帮你调度它。
进程池还会帮程序员去管理池中的进程。
from multiprocessing import Pool
p = Pool(os.cpu_count() + 1)

进程池有三个方法:
map(func,iterable)
func:进程池中的进程执行的任务函数
iterable: 可迭代对象,是把可迭代对象中的每个元素依次传给任务函数当参数

apply(func,args=()): 同步的效率,也就是说池中的进程一个一个的去执行任务
func:进程池中的进程执行的任务函数
args: 可迭代对象型的参数,是传给任务函数的参数
同步处理任务时,不需要close和join
同步处理任务时,进程池中的所有进程是普通进程(主进程需要等待其执行结束)

apply_async(func,args=(),callback=None): 异步的效率,也就是说池中的进程一次性都去执行任务
func:进程池中的进程执行的任务函数
args: 可迭代对象型的参数,是传给任务函数的参数
callback: 回调函数,就是说每当进程池中有进程处理完任务了,返回的结果可以交给回调函数,由回调函数进行进一步的处理,回调函数只有异步才有,同步是没有的
异步处理任务时,进程池中的所有进程是守护进程(主进程代码执行完毕守护进程就结束)
异步处理任务时,必须要加上close和join


回调函数的使用:
进程的任务函数的返回值,被当成回调函数的形参接收到,以此进行进一步的处理操作
回调函数是由主进程调用的,而不是子进程,子进程只负责把结果传递给回调函数

原文地址:https://www.cnblogs.com/kcwxx/p/9548918.html