队列,生产消费模型,进程池

1,队列:填值,取值 

  put,get

from multiprocessing import Queue
q = Queue(3) #不填默认
q.put(1)
q.put(2)
q.put(3)
# q.put(4) #Full ,填不了值,会一直阻塞
print(q.get())
print(q.get())
print(q.get())
# print(q.get()) #取不了值,会一直阻塞
# print(666) #

  put_nowait,get_nowait

import queue
from multiprocessing import Queue
q = Queue(10)
num = 0
while True:
    try:
        q.put_nowait(num)
        num += 1
    except queue.Full:
        print('填值完毕')
        break
while True:
    try:
        print(q.get_nowait())
    except queue.Empty:
        print('取值完毕')
        break

  

2.生产者消费模型:

import time
import random
from multiprocessing import Process,Queue
def customer(q,name):
    while True:
        f = q.get()
        print(f)
        if f is None:
            break
        time.sleep(random.uniform(0.5,0.7))
        print('%s吃了%s'%(name,f))
def producer(q,name,food):
    for i in range(10):
        time.sleep(random.uniform(0.1,0.5))
        print('%s生产了%s%s' % (name,i,food))
        q.put(str(i)+food)
if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=producer,args=(q,'狗子','骨头'))
    p1.start()
    p = Process(target=customer,args=(q,'小狗'))
    p.start()
    p1.join()
    q.put(None)

3:进程池:开启过多的进程并不能提高效率,反而会降低效率.

    过多的进程,会造成阻塞,占用内存,操作系统需要对这些进程分配到各个CPU中进行处理

    计算机两种类型:

      计算型:会充分占用CPU:写的程序是计算型程序,适合使用,多进程可以充分利用多核

      适合开启多进程,不适合开启超过CPU数量过多的进程

      IO型:大部分在阻塞队列里,而不是在运行状态中:大部分时间在等待(如爬虫,文件操作)

        不适合开启多进程

  

import time
from multiprocessing import Process,Pool
def func(i):
    time.sleep(0.1)
    print('第%s件衣服制作完毕'%i)
if __name__ == '__main__':
    p1 = Pool(8)   #开启多进程,关闭的时候,一次性全关闭:多进程,高计算,少阻塞时使用
    start = time.time()
    for i in range(1000):
        p1.apply_async(func,args=(i,))
    p1.close()
    p1.join()
    print(time.time() - start)

    # p2 = Process()#开启一个进程,执行完毕后关闭一个
    # p_lst = []
    # start = time.time()
    # for i in range(1000):
    #     p2 = Process(target=func,args=(i,))
    #     p2.start()
    #     p_lst.append(p2)
    # for p in p_lst:
    #     p2.join()
    # print(time.time() - start)

  进程池,同步提交

import time
import os
from multiprocessing import Pool
def task(num):
    time.sleep(0.5)
    print('%s:%s'%(num,os.getpid()))
if __name__ == '__main__':
    p = Pool()
    for i in range(40):
        p.apply(task,args=(i,)) #提交任务的方法 : 同步提交

  

主进程与子进程的数据是隔离的,但是进程池的类实现了返回值 :ipc机制

import time
import os
from multiprocessing import Pool
def task(num):
    time.sleep(0.5)
    print('%s:%s'%(num,os.getpid()))
    return num**2
if __name__ == '__main__':
    p = Pool()
    for i in range(40):
        res = p.apply(task,args=(i,)) #提交任务的方法 : 同步提交
                          #apply同步提交,一个一个顺序执行,没有并发的效果
print(res) #主进程与子进程的数据是隔离的,但是进程池的类实现了返回值 :ipc机制

  异步提交:apply_async

    没有返回值的情况下

import time
import os
from multiprocessing import Pool
def task(num):
    time.sleep(0.5)
    print('%s:%s'%(num,os.getpid()))
    return num**2
if __name__ == '__main__':
    p = Pool(os.cpu_count()+1)
    res_lst = []
    for i in range(40):
        res = p.apply_async(task,args=(i,)) #提交任务的方法 : 异步提交
        # print(res)  #主进程与子进程的数据是隔离的,但是进程池的类实现了返回值 :ipc机制
        # res_lst.append(res)
    # for res in res_lst:
    #     print(res.get())
    p.close()  #无法再提交,子进程继续执行
    p.join()

  有返回值,get不能再提交任务之后立刻执行,需要先提交所有的任务再通过get获取值

  

import time
import os
from multiprocessing import Pool
def task(num):
    time.sleep(0.5)
    print('%s:%s'%(num,os.getpid()))
    return num**2
if __name__ == '__main__':
    p = Pool(os.cpu_count()+1)
    res_lst = []
    for i in range(40):
        res = p.apply_async(task,args=(i,)) #提交任务的方法 : 异步提交
        res_lst.append(res)
    for res in res_lst:
        print(res.get())

  进程池中的map()方法,

  异步提交的简化版本,

  自带close 和join方法

  不带返回值,要带可以把返回值先放到队列中,再get获取

import time
import os
from multiprocessing import Pool
def task(num):
    time.sleep(0.5)
    print('%s:%s'%(num,os.getpid()))
    return num**2
if __name__ == '__main__':
    p = Pool()
    p.map(task,range(20))   

  

   

  

原文地址:https://www.cnblogs.com/lijinming110/p/9682864.html