一个别人的线程池的编写

自定义了线程的start方法,当启动线程的时候才初始化线程池,并根据线程池定义的数量和任务数量取min,

而不是先开启定义的线程数等待命令,在一定程度上避免了空线程对内存的消耗。

并且引入了contextlib.contextmanager ,这个装饰器使得函数可用使用with方法

# -*- coding:utf-8 -*-
from queue import Queue
import contextlib
import threading

WorkerStop = object()


class ThreadPool:
    workers = 0

    threadFactory = threading.Thread
    currentThread = staticmethod(threading.currentThread)

    def __init__(self, maxthreads=20, name=None):

        self.q = Queue(0)
        self.max = maxthreads
        self.name = name
        self.waiters = []
        self.working = []

    def start(self):
        needsize = self.q.qsize()
        while self.workers < min(self.max, needsize):
            self.startAWorker()

    def startAWorker(self):
        self.workers += 1
        name = "PoolThread-%s-%s" % (self.name or id(self), self.workers)
        newThread = self.threadFactory(target=self._worker, name=name)
        newThread.start()

    def callInThread(self, func, *args, **kw):
        self.callInThreadWithCallback(None, func, *args, **kw)

    def callInThreadWithCallback(self, onResult, func, *args, **kw):
        o = (func, args, kw, onResult)
        self.q.put(o)

    @contextlib.contextmanager
    def _workerState(self, stateList, workerThread):
        stateList.append(workerThread)
        try:
            yield
        finally:
            stateList.remove(workerThread)

    def _worker(self):
        ct = self.currentThread
        o = self.q.get()
        while o is not WorkerStop:
            with self._workerState(self.working, ct):
                function, args, kwargs, onResult = o
                del o
                try:
                    result = function(*args, **kwargs)
                    success = True
                except:
                    success = False
                    if onResult is None:
                        pass

                    else:
                        pass

                del function, args, kwargs

                if onResult is not None:
                    try:
                        onResult(success, result)
                    except:
                        # context.call(ctx, log.err)
                        pass

                del onResult, result

            with self._workerState(self.waiters, ct):
                o = self.q.get()

    def stop(self):
        while self.workers:
            self.q.put(WorkerStop)
            self.workers -= 1

#下面使用例子
def show(arg):
    import time
    time.sleep(1)
    print(arg)


pool = ThreadPool(20)

for i in range(3):
    pool.callInThread(show, i)

pool.start()
pool.stop()
原文地址:https://www.cnblogs.com/mmyy-blog/p/9568209.html