版本一:
1 # -*- coding:utf-8 -*-
2 import Queue
3 import threading
4
5
6 class ThreadPool(object):
7
8 def __init__(self, max_num=20):
9 self.queue = Queue.Queue(max_num)
10 for i in xrange(max_num):
11 self.queue.put(threading.Thread)
12
13 def get_thread(self):
14 return self.queue.get()
15
16 def add_thread(self):
17 self.queue.put(threading.Thread)
18
19 """
20 pool = ThreadPool(10)
21
22 def func(arg, p):
23 print arg
24 import time
25 time.sleep(2)
26 p.add_thread()
27
28
29 for i in xrange(30):
30 thread = pool.get_thread()
31 t = thread(target=func, args=(i, pool))
32 t.start()
33 """
版本二:
1 # -*- coding:utf-8 -*-
2
3 import queue
4 import threading
5 import contextlib
6 import time
7
8 StopEvent = object()
9
10
11 class ThreadPool(object):
12
13 def __init__(self, max_num, max_task_num = None):
14 if max_task_num:
15 self.q = queue.Queue(max_task_num)
16 else:
17 self.q = queue.Queue()
18 self.max_num = max_num
19 self.cancel = False
20 self.terminal = False
21 self.generate_list = []
22 self.free_list = []
23
24 def run(self, func, args, callback=None):
25 """
26 线程池执行一个任务
27 :param func: 任务函数
28 :param args: 任务函数所需参数
29 :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
30 :return: 如果线程池已经终止,则返回True否则None
31 """
32 if self.cancel:
33 return
34 if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
35 self.generate_thread()
36 w = (func, args, callback,)
37 self.q.put(w)
38
39 def generate_thread(self):
40 """
41 创建一个线程
42 """
43 t = threading.Thread(target=self.call)
44 t.start()
45
46 def call(self):
47 """
48 循环去获取任务函数并执行任务函数
49 """
50 current_thread = threading.currentThread()
51 self.generate_list.append(current_thread)
52
53 event = self.q.get()
54 while event != StopEvent:
55
56 func, arguments, callback = event
57 try:
58 result = func(*arguments)
59 success = True
60 except Exception as e:
61 success = False
62 result = None
63
64 if callback is not None:
65 try:
66 callback(success, result)
67 except Exception as e:
68 pass
69
70 with self.worker_state(self.free_list, current_thread):
71 if self.terminal:
72 event = StopEvent
73 else:
74 event = self.q.get()
75 else:
76
77 self.generate_list.remove(current_thread)
78
79 def close(self):
80 """
81 执行完所有的任务后,所有线程停止
82 """
83 self.cancel = True
84 full_size = len(self.generate_list)
85 while full_size:
86 self.q.put(StopEvent)
87 full_size -= 1
88
89 def terminate(self):
90 """
91 无论是否还有任务,终止线程
92 """
93 self.terminal = True
94
95 while self.generate_list:
96 self.q.put(StopEvent)
97
98 self.q.queue.clear()
99
100 @contextlib.contextmanager
101 def worker_state(self, state_list, worker_thread):
102 """
103 用于记录线程中正在等待的线程数
104 """
105 state_list.append(worker_thread)
106 try:
107 yield
108 finally:
109 state_list.remove(worker_thread)
110
111
112
113 # How to use
114
115
116 pool = ThreadPool(5)
117
118 def callback(status, result):
119 # status, execute action status
120 # result, execute action return value
121 pass
122
123
124 def action(i):
125 print(i)
126
127 for i in range(30):
128 ret = pool.run(action, (i,), callback)
129
130 time.sleep(5)
131 print(len(pool.generate_list), len(pool.free_list))
132 print(len(pool.generate_list), len(pool.free_list))
133 # pool.close()
134 # pool.terminate()