39 线程池 同一进程间的队列

主要内容

1 . 队列

from multiprocessing import Queue# 是用于多进程的队列,就是专门用来做进程间通信(IPC)。
import queue# 是用于同一进程内的队列,不能做多进程之间的通信
q = queue.Queue() # 先进先出
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())

q = queue.LifoQueue()# 后进先出的队列
q.put(1)
q.put(2)
q.put(3)
print(q.get())

q = queue.PriorityQueue()
# 优先级队列,put()方法接收的是一个元组(),第一个位置是优先级,第二个位置是数据
# 优先级如果是数字,直接比较数值
# 如果是字符串,是按照 ASCII 码比较的。当ASCII码相同时,会按照先进先出的原则
# q.put((1,'abc'))
# q.put((5,'qwe'))
# q.put((-5,'zxc'))
# print(q.get())
# print(q.get())
# print(chr(48))

2 . 线程池

a : 定义 : 在一个池子里,放固定数量的线程,这些线程等待任务,一旦有任务来,就有线程自发的去执行任务。

b :  concurrent.futures 这个模块是异步调用的机制,  concurrent.futures 提交任务都是用submit

c : shutdown 是等效于Pool中的close+join,是指不允许再继续向池中增加任务,然后让父进程(线程)等待池中所有进程执行完所有任务。

d : 把多个任务扔进池中的方法, 及拿结果的方式

     @ for + submit  ,拿结果用result方法,

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
from multiprocessing import Pool
def func(num):
    sum = 0
    for i in range(num):
        sum = sum + i ** 2
    return sum
if __name__ == '__main__':
    li = []
    t = ThreadPoolExecutor(20)
    for i in range(100):         # for + submit方式执行任务.
        re = t.submit(func, i)   #<Future at 0x1c31cccb128 state=finished returned int>
        li.append(re)   
    t.shutdown()    #相当于close+ join 
    [print(re.result()) for re in li]

  @map(func , iterable) 方式去提交多个任务,结果是一个生成器, 采用__next的方法拿结果.

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
from multiprocessing import Pool
def func(num):
    sum = 0
    for i in range(num):
        sum = sum + i ** 2
    return sum
if __name__ == '__main__':
    t = ThreadPoolExecutor(20)

    re = t.map(func, range(100))   #<generator object Executor.map.<locals>.result_iterator at 0x000001B12D88BFC0>
    print(re)
    t.shutdown()
    print(re.__next__())    # 通过__next取值.
    print(re.__next__())
    print(re.__next__())

e : 进程池线程池的效率对比

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import time
def func(num):
    sum = 0
    for i in range(num):
        for j in range(i):
            for x in range(j):
                sum += x ** 2
    print(sum)
if __name__ == '__main__':
# pool的进程池的效率演示
    p = Pool(5)
    start = time.time()
    for i in range(100):
        p.apply_async(func,args=(i,))
    p.close()
    p.join()
    print('Pool进程池的效率时间是%s'%(time.time() - start))
# 多进程的效率演示
    tp = ProcessPoolExecutor(5)
    start = time.time()
    for i in range(100):
        tp.submit(func, i)
    tp.shutdown()  # 等效于 进程池中的 close + join
    print('进程池的消耗时间为%s' % (time.time() - start))
# 多线程的效率
    tp = ThreadPoolExecutor(20)
    start = time.time()
    for i in range(100):
        tp.submit(func,i)
    tp.shutdown()# 等效于 进程池中的 close + join
    print('线程池的消耗时间为%s'%(time.time() - start))

  结果 : 针对计算密集的程序来说

     不管是pool的进程池还是ProcessPoolExecutor()的进程池, 执行效率相当

     ThreadPoolExecutor 的效率要差好多.

        所以, 当计算密集时, 使用多进程.

f : 线程池中的回调函数

from threading import Thread
from threading import current_thread
from concurrent.futures import ThreadPoolExecutor
import time
def func(num):
    time.sleep(1)
    print('这是在子线程中 ', current_thread())
    return num
def call_back(re):
    time.sleep(1)
    print('这是在回调函数中', current_thread())
if __name__ == '__main__':
    t = ThreadPoolExecutor(5)
    for i in range(100):
        t.submit(func, i).add_done_callback(call_back)
    t.shutdown()
    print('这是在主线程中', current_thread())

# 线程池中的回调函数是子线程调用的,跟父进程没有关系, 不可以用os.getpid()查看, 因为多个线程共享同一个pid, 用current_thread实现

 无论是Processpoolexecutor 还是 pool中的回调函数都是父进程调用的, 跟子进程没有关系.(可以用os.getpid()查看)

  

原文地址:https://www.cnblogs.com/gyh412724/p/9544295.html