线程池

定义:

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆​栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

功能:

应用程序可以有多个线程,这些线程在休眠状态中需要耗费大量时间来等待事件发生。其他线程可能进入睡眠状态,并且仅定期被唤醒以轮循更改或更新状态信息,然后再次进入休眠状态。为了简化对这些线程的管理,框架为每个进程提供了一个线程池,一个线程池有若干个等待操作状态,当一个等待操作完成时,线程池中的辅助线程会执行回调函数。线程池中的线程由系统管理,程序员不需要费力于线程管理,可以集中精力处理应用程序任务。

应用范围:

1、需要大量的线程来完成任务,且完成任务的时间比较。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。

2、对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。

3、接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,并出现"OutOfMemory"的错误。

一个简单的例子:

(需要手动添加池线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import  Queue
import threading
 
class ThreadPool(object):
    def __init__(self,max_num=20):
        self.queque=Queue.Queue(max_num)
        for i in xrange(max_num):
            self.queque.put(threading.Thread)   #指向类,不创建对象,省内存空间,本段代码唯一可取的地方。
    def get_thread(self):
        return self.queque.get()
    def add_thread(self):
        self.queque.put(threading.Thread)   #low就low在这儿,得手动添加池线程
 
pool = ThreadPool(10)     #创建最多接受10个元素的队列
 
def func(arg,p):
    print arg
    import time
    time.sleep(2)
    p.add_thread()       #完成之后再往线程池里加一个线程
 
for i in xrange(30):
    thread = pool.get_thread()
    t= thread(target=func,args=(i,pool))     #threading.Thread(......)

线程池预备知识:

with+contextmanager:上下文管理

1
2
3
4
5
6
7
8
9
10
import contextlib
 
@contextlib.contextmanager  #这个装饰器和with配合,完成上下文切换。
def show():
    print 'before'
    yield
    print 'after'
 
with show():
    print 'with in'

运行结果:

1
2
3
before
with in
after

with监控doing列表+多线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import contextlib
import threading
import time
import random
 
doing = []
def num(list2):     #启一个线程,始终在打印doing列表中的数量(正在执行的线程)
    while True:
        print len(list2)
        time.sleep(1)
t = threading.Thread(target=num, args=(doing,))
t.start()     #<Thread(Thread-1, started 2368)>
 
@contextlib.contextmanager
def show(l1,item):
    doing.append(item)
    yield
    doing.remove(item)
 
def task(i):
    flag = threading.current_thread()
    #print "flag:",flag  #flag: <Thread(Thread-2, started 6544)>......
    with show(doing,flag):
        import time
        time.sleep(random.randint(1,4))
 
for i in range(20):    #创建20个线程,每个线程都去执行task函数
    temp = threading.Thread(target=task,args=(i,))
    temp.start()

twisted中的threadpool源码:(精简版)

1、创建一个队列,队列里是(func,args,kwargs,onResult),本例中func=show,args=i

2、start方法设定线程数量workers,根据min(池线程数量,任务数)而定

3、start调用startAWorker方法启动线程,执行_worker方法

4、_worker方法去队列里取任务,执行任务,取任务,执行任务........

5、stop方法给_worker发送一个停止信号(队列中取的),这个线程结束

6、每发送一个停止信号(workstop),线程池中的线程(workers)就减1

7、这个停止信号是put到任务队列里的,所以stop方法是在所有任务最后。

可取之处:上下文管理,停止信号与队列的结合。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
from Queue import Queue
import contextlib
import threading
 
WorkerStop = object()
 
class ThreadPool:
 
    workers = 0
    threadFactory = threading.Thread
    currentThread = staticmethod(threading.currentThread)
 
    def __init__(self, maxthreads=20, name=None):
 
        self.q = Queue(0)
        self.max = maxthreads
        self.name = name
        self.waiters = []
        self.working = []
 
    def start(self):
        needsiZe = self.q.qsize()
        while self.workers < min(self.max, needSize):
            self.startAWorker()
 
    def startAWorker(self):
        self.workers += 1
        name = "PoolThread-%s-%s" % (self.name or id(self), self.workers)
        newThread = self.threadFactory(target=self._worker, name=name)
        newThread.start()
 
    def callInThread(self, func, *args, **kw):
        self.callInThreadWithCallback(None, func, *args, **kw)
 
    def callInThreadWithCallback(self, onResult, func, *args, **kw):
        o = (func, args, kw, onResult)
        self.q.put(o)
 
    @contextlib.contextmanager
    def _workerState(self, stateList, workerThread):
        stateList.append(workerThread)
        try:
            yield
        finally:
            stateList.remove(workerThread)
 
    def _worker(self):
        ct = self.currentThread()
        o = self.q.get()
        while o is not WorkerStop:
            with self._workerState(self.working, ct):
                function, args, kwargs, onResult = o
                del o
                try:
                    result = function(*args, **kwargs)
                    success = True
                except:
                    success = False
                    if onResult is None:
                        pass
                    else:
                        pass
                del function, args, kwargs
 
                if onResult is not None:
                    try:
                        onResult(success, result)
                    except:
                        #context.call(ctx, log.err)
                        pass
                del onResult, result
 
            with self._workerState(self.waiters, ct):
                o = self.q.get()
 
    def stop(self):
        while self.workers:
            self.q.put(WorkerStop)
            self.workers -= 1
 
"""
def show(arg):
    import time
    time.sleep(1)
    print arg
 
 
pool = ThreadPool(20)
 
for i in range(500):
    pool.callInThread(show, i)
 
pool.start()
pool.stop()
"""

部分来源:http://www.cnblogs.com/wupeiqi/articles/4839959.html























原文地址:https://www.cnblogs.com/daliangtou/p/5125674.html