Python 线程池

python默认没有提供线程池的功能,所以要想使用线程池,就必要使用第三方的模块或者自定义线程

线程并不是越多越好,线程的上下文切换会影响到服务器的性能

线程池:一个容器,有最大数,取一个少一个,无线程时等待,线程执行完毕,交还线程

__author__ = 'alex'
#coding:utf-8
import queue
import threading
import time

class ThreadPool:
    def __init__(self,maxsize=5):
        self.maxsize = maxsize
        self._q = queue.Queue()
        for i in range(5):
            self._q.put(threading.Thread)

    def get_thread(self):
        return self._q.get()

    def add_thread(self):
        self._q.put(threading.Thread)

pool = ThreadPool(5)

def task(args,p):
    print (args )
    time.sleep(2)
    p.add_thread()


for i in range(100):
    t = pool.get_thread()
    obj = t(target=task,args =(i,pool))
    obj.start()

这个简单的程序实现了线程池的基本功能,每次只能有5个线程产生,但是也有很大的局限性,1,线程不能回收回来,每次产生了5个线程,但是线程确不能收回来(垃圾回收机制回收回来),每5个线程结束以后,只是重新生成了新的线程;2,如果需要的线程数range(100)小于我们定义的创建池的数值(5),那就浪费了,根本没有必要初始就创立5个线程(线程池一下子就开到了最大)。

 第二个版本的线程池:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import queue
import threading
import contextlib
import time

StopEvent = object()


class ThreadPool(object):

    def __init__(self, max_num, max_task_num = None):
        if max_task_num:
            self.q = queue.Queue(max_task_num)
        else:
            self.q = queue.Queue()
        self.max_num = max_num
        self.cancel = False
        self.terminal = False
        self.generate_list = []
        self.free_list = []

    def run(self, func, args, callback=None):
        """
        线程池执行一个任务
        :param func: 任务函数
        :param args: 任务函数所需参数
        :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
        :return: 如果线程池已经终止,则返回True否则None
        """
        if self.cancel:
            return
        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
            self.generate_thread()
        w = (func, args, callback,)
        self.q.put(w)
        # print ("------->"+self.q)

    def generate_thread(self):
        """
        创建一个线程
        """
        t = threading.Thread(target=self.call)
        t.start()

    def call(self):
        """
        循环去获取任务函数并执行任务函数
        """
        current_thread = threading.currentThread()
        self.generate_list.append(current_thread)

        event = self.q.get()
        while event != StopEvent:

            func, arguments, callback = event
            try:
                result = func(*arguments)
                success = True
            except Exception as e:
                success = False
                result = None

            if callback is not None:
                try:
                    callback(success, result)
                except Exception as e:
                    pass

            with self.worker_state(self.free_list, current_thread):
                if self.terminal:
                    event = StopEvent
                else:
                    event = self.q.get()
        else:

            self.generate_list.remove(current_thread)

    def close(self):
        """
        执行完所有的任务后,所有线程停止
        """
        self.cancel = True
        full_size = len(self.generate_list)
        while full_size:
            self.q.put(StopEvent)
            full_size -= 1

    def terminate(self):
        """
        无论是否还有任务,终止线程
        """
        self.terminal = True

        while self.generate_list:
            self.q.put(StopEvent)

        self.q.queue.clear()

    @contextlib.contextmanager
    def worker_state(self, state_list, worker_thread):
        """
        用于记录线程中正在等待的线程数
        """
        state_list.append(worker_thread)
        try:
            yield
        finally:
            state_list.remove(worker_thread)



# How to use


pool = ThreadPool(5)

def callback(status, result):
    # status, execute action status
    # result, execute action return value
    pass


def action(i):
    print(i)

for i in range(10):
    ret = pool.run(action, (i,), callback)

time.sleep(5)
print(len(pool.generate_list), len(pool.free_list))
print(len(pool.generate_list), len(pool.free_list))
pool.close()
# pool.terminate()

如何使用第三方的线程池:

1.不适用线程池的情况

#!/usr/bin/env python
import threadpool
import time,random

def func(str):
    time.sleep(1)
    # return str
    print (str)

data = []
for i in range(10):
    data.append(random.randint(1,10))

start = time.clock()

for l in range(len(data)):
    func(str(data[l]))

stop = time.clock()
end = stop - start
print ("Spend time %f",end)

上述执行过程需要约10秒钟,我们使用线程池改写程序

#!/usr/bin/env python
import threadpool
import time,random

def func(str):
    time.sleep(1)
    print (str)
# def print_result(request, result):
#     print ("the result is %s %r" % (request.requestID, result))

data = []
for i in range(10):
    data.append(random.randint(1,10))
# data = [random.randint(1,10) for i in range(20)]

start = time.clock()
pool = threadpool.ThreadPool(5)
# requests = threadpool.makeRequests(hello, data, print_result)
requests = threadpool.makeRequests(func, data)
# # [pool.putRequest(req) for req in requests]
for req in requests:
    pool.putRequest(req)
pool.wait()
stop = time.clock()
end = stop - start
print ("Spend time %f",end)

改写后程序执行约为2秒钟(跟线程池里可分配得线程数相关),同时要注意上面的[]里面的写法。

原文地址:https://www.cnblogs.com/python-study/p/5828017.html