Python 实现线程池

前言:
关于线程池(thread pool)的概念请参考http://en.wikipedia.org/wiki/Thread_pool_pattern。在Python中使用线程是有硬伤的,因为Python(这里指C语言实现的Python)的基本调用都最后生成对应C语言的函 数调用,因此Python中使用线程 的开销太大,不过可以使用Stackless Python(Python的一个修改 版)来增强Python中使用线程 的表现。
同时由于Python中 GIL的存在,导制在使用多CPU时Python无 法充分利用多个CPU,目前pysco这个模块可以针对多CPU提高Python的效率。

在C语言里要实现个线程池,就要面对一堆的指针,还有pthread这个库中那些看起来很让人头痛的一些函数:
int pthread_create(pthread_t *restrict thread,
const pthread_attr_t *restrict attr,
void *(*start_routine)(void*), void *restrict arg);

而如果用Python来实现一个 线程池的话就好多了,不仅结构十分清晰,而且代码看起来会很优美


PYTHON:
代码
  1 
  2 
  3    1.
  4       import  threading
  5    2.
  6       from  time  import  sleep
  7    3.
  8        
  9    4.
 10       class  ThreadPool:
 11    5.
 12        
 13    6.
 14            """Flexible  thread  pool  class.   Creates  a  pool  of  threads,  then
 15    7.
 16           accepts  tasks  that  will  be  dispatched  to  the  next  available
 17    8.
 18           thread."""
 19    9.
 20            
 21   10.
 22            def  __init__(self,  numThreads):
 23   11.
 24        
 25   12.
 26                """Initialize  the  thread  pool  with  numThreads  workers."""
 27   13.
 28                
 29   14.
 30                self.__threads  =  []
 31   15.
 32                self.__resizeLock  =  threading.Condition(threading.Lock())
 33   16.
 34                self.__taskLock  =  threading.Condition(threading.Lock())
 35   17.
 36                self.__tasks  =  []
 37   18.
 38                self.__isJoining  =  False
 39   19.
 40                self.setThreadCount(numThreads)
 41   20.
 42        
 43   21.
 44            def  setThreadCount(self,  newNumThreads):
 45   22.
 46        
 47   23.
 48                """  External  method  to  set  the  current  pool  size.   Acquires
 49   24.
 50               the  resizing  lock,  then  calls  the  internal  version  to  do  real
 51   25.
 52               work."""
 53   26.
 54                
 55   27.
 56                #  Can't  change  the  thread  count  if  we're  shutting  down  the  pool!
 57   28.
 58                if  self.__isJoining:
 59   29.
 60                    return  False
 61   30.
 62                
 63   31.
 64                self.__resizeLock.acquire()
 65   32.
 66                try:
 67   33.
 68                    self.__setThreadCountNolock(newNumThreads)
 69   34.
 70                finally:
 71   35.
 72                    self.__resizeLock.release()
 73   36.
 74                return  True
 75   37.
 76        
 77   38.
 78            def  __setThreadCountNolock(self,  newNumThreads):
 79   39.
 80                
 81   40.
 82                """Set  the  current  pool  size,  spawning  or  terminating  threads
 83   41.
 84               if  necessary.   Internal  use  only;  assumes  the  resizing  lock  is
 85   42.
 86               held."""
 87   43.
 88                
 89   44.
 90                #  If  we  need  to  grow  the  pool,  do  so
 91   45.
 92                while  newNumThreads >  len(self.__threads):
 93   46.
 94                    newThread  =  ThreadPoolThread(self)
 95   47.
 96                    self.__threads.append(newThread)
 97   48.
 98                    newThread.start()
 99   49.
100                #  If  we  need  to  shrink  the  pool,  do  so
101   50.
102                while  newNumThreads  <len(self.__threads):
103   51.
104                    self.__threads[0].goAway()
105   52.
106                    del  self.__threads[0]
107   53.
108        
109   54.
110            def  getThreadCount(self):
111   55.
112        
113   56.
114                """Return  the  number  of  threads  in  the  pool."""
115   57.
116                
117   58.
118                self.__resizeLock.acquire()
119   59.
120                try:
121   60.
122                    return  len(self.__threads)
123   61.
124                finally:
125   62.
126                    self.__resizeLock.release()
127   63.
128        
129   64.
130            def  queueTask(self,  task,  args=None,  taskCallback=None):
131   65.
132        
133   66.
134                """Insert  a  task  into  the  queue.   task  must  be  callable;
135   67.
136               args  and  taskCallback  can  be  None."""
137   68.
138                
139   69.
140                if  self.__isJoining  ==  True:
141   70.
142                    return  False
143   71.
144                if  not  callable(task):
145   72.
146                    return  False
147   73.
148                
149   74.
150                self.__taskLock.acquire()
151   75.
152                try:
153   76.
154                    self.__tasks.append((task,  args,  taskCallback))
155   77.
156                    return  True
157   78.
158                finally:
159   79.
160                    self.__taskLock.release()
161   80.
162        
163   81.
164            def  getNextTask(self):
165   82.
166        
167   83.
168                """  Retrieve  the  next  task  from  the  task  queue.   For  use
169   84.
170               only  by  ThreadPoolThread  objects  contained  in  the  pool."""
171   85.
172                
173   86.
174                self.__taskLock.acquire()
175   87.
176                try:
177   88.
178                    if  self.__tasks  ==  []:
179   89.
180                        return  (None,  None,  None)
181   90.
182                    else:
183   91.
184                        return  self.__tasks.pop(0)
185   92.
186                finally:
187   93.
188                    self.__taskLock.release()
189   94.
190            
191   95.
192            def  joinAll(self,  waitForTasks  =  True,  waitForThreads  =  True):
193   96.
194        
195   97.
196                """  Clear  the  task  queue  and  terminate  all  pooled  threads,
197   98.
198               optionally  allowing  the  tasks  and  threads  to  finish."""
199   99.
200                
201  100.
202                #  Mark  the  pool  as  joining  to  prevent  any  more  task  queueing
203  101.
204                self.__isJoining  =  True
205  102.
206        
207  103.
208                #  Wait  for  tasks  to  finish
209  104.
210                if  waitForTasks:
211  105.
212                    while  self.__tasks  !=  []:
213  106.
214                        sleep(.1)
215  107.
216        
217  108.
218                #  Tell  all  the  threads  to  quit
219  109.
220                self.__resizeLock.acquire()
221  110.
222                try:
223  111.
224                    self.__setThreadCountNolock(0)
225  112.
226                    self.__isJoining  =  True
227  113.
228        
229  114.
230                    #  Wait  until  all  threads  have  exited
231  115.
232                    if  waitForThreads:
233  116.
234                        for  t  in  self.__threads:
235  117.
236                            t.join()
237  118.
238                            del  t
239  119.
240        
241  120.
242                    #  Reset  the  pool  for  potential  reuse
243  121.
244                    self.__isJoining  =  False
245  122.
246                finally:
247  123.
248                    self.__resizeLock.release()
249  124.
250              
251  125.
252       class  ThreadPoolThread(threading.Thread):
253  126.
254             """  Pooled  thread  class.  """
255  127.
256            
257  128.
258            threadSleepTime  =  0.1
259  129.
260        
261  130.
262            def  __init__(self,  pool):
263  131.
264        
265  132.
266                """  Initialize  the  thread  and  remember  the  pool.  """
267  133.
268                
269  134.
270                threading.Thread.__init__(self)
271  135.
272                self.__pool  =  pool
273  136.
274                self.__isDying  =  False
275  137.
276                
277  138.
278            def  run(self):
279  139.
280        
281  140.
282                """  Until  told  to  quit,  retrieve  the  next  task  and  execute
283  141.
284               it,  calling  the  callback  if  any.   """
285  142.
286                
287  143.
288                while  self.__isDying  ==  False:
289  144.
290                    cmd,  args,  callback  =  self.__pool.getNextTask()
291  145.
292                    #  If  there's  nothing  to  do,  just  sleep  a  bit
293  146.
294                    if  cmd  is  None:
295  147.
296                        sleep(ThreadPoolThread.threadSleepTime)
297  148.
298                    elif  callback  is  None:
299  149.
300                        cmd(args)
301  150.
302                    else:
303  151.
304                        callback(cmd(args))
305  152.
306            
307  153.
308            def  goAway(self):
309  154.
310        
311  155.
312                """  Exit  the  run  loop  next  time  through."""
313  156.
314                
315  157.
316                self.__isDying  =  True
317 
318 


这段100多行的代码完成了一个可动态改变的线程池,并且包含了详细的注释,这里是代码的出处。我觉得这段代码比 Python官方给出的那 个还要好些。他们实现的原理都是一样的,使用了一个队列(Queue)来存储任务。

关于Python中线程同步的问题,这里有不错的介绍。

 

原文地址:https://www.cnblogs.com/nsnow/p/1706794.html