python 线程与进程

python多进程与多线程

  1 ##############################
  2 #多进程
  3 ##############################
  4 #import os
  5 '''
  6 print('1Process start, pid:',os.getpid())
  7 print('Process start, ppid:',os.getppid())
  8 '''
  9 '''
 10 #unix环境支持
 11 pid = os.fork()
 12 if pid == 0:
 13     print('child process,id:', os.getpid(), 'parent id is:', os.getppid())
 14 else:
 15     print('pid:', os.getpid(), 'create child process:', pid)
 16 '''
 17 
 18 #################
 19 #multiprocessing
 20 #################
 21 '''
 22 from multiprocessing import Process
 23 print('2Process start, pid:',os.getpid())
 24 def run_proc(name):
 25 #    while(1):
 26 #        pass
 27     print('run child process %s (%s)...' %(name, os.getpid()))
 28 
 29 print('4Process start, pid:',os.getpid())
 30 if __name__ == '__main__':
 31     print('1Parent process %s.' % os.getpid())
 32     p = Process(target=run_proc, args=('test',))
 33     print('Child process will start.', os.getpid())
 34     #启动子进程
 35     p.start()
 36     print('3Process start, pid:',os.getpid())
 37     #等待子进程结束, 类似于wait()
 38     p.join()
 39     print('Child process end.', os.getpid())
 40 '''
 41 
 42 #由上面这个例子可以看出,python的代码是顺序执行的。
 43 #执行p.start()方法后,开始执行子进程
 44 #子进程的执行范围是2、4之间的代码行
 45 #p.join()等待子进程返回
 46 #单纯从打印的顺序,无法准确的判断父子进程的执行顺序
 47 
 48 
 49 '''
 50 from multiprocessing import Pool
 51 import os, time, random
 52 def long_time_task(name):
 53     print('Run task %s (%s)...' % (name, os.getpid()))
 54     start = time.time()
 55     time.sleep(random.random()*3)
 56     end = time.time()
 57     print('Task %s runs %0.2f seconds.' % (name, (end - start)))
 58 
 59 if __name__=='__main__':
 60     print('Parent process %s.' % os.getpid())
 61     p = Pool(5)#设置最大线程格个数,默认值是cpu核心数
 62     for i in range(5):
 63         p.apply_async(long_time_task, args=(i,))#创建线程
 64     print('Waiting for all subprocesses done...')
 65     p.close()#防止将任何其他任务提交到池中。完成所有任务后,工作进程将退出。
 66     p.join()#调用前,必须调用close() or terminate()
 67     print('All subprocess done.')
 68 '''
 69 #win7下cmd输出内容如下
 70 '''
 71 Parent process 25380.
 72 Waiting for all subprocesses done...
 73 Run task 0 (26008)...
 74 Run task 1 (26216)...
 75 Run task 2 (28108)...
 76 Run task 3 (24732)...
 77 Run task 4 (25008)...
 78 Task 3 runs 0.15 seconds.
 79 Task 4 runs 1.79 seconds.
 80 Task 2 runs 2.45 seconds.
 81 Task 0 runs 2.69 seconds.
 82 Task 1 runs 2.98 seconds.
 83 All subprocess done.
 84 请按任意键继续. . .
 85 '''
 86 
 87 ###########
 88 #子进程
 89 ###########
 90 '''
 91 import subprocess #类似于exec()?
 92 
 93 print ('$ nslookup www.python.org')
 94 r = subprocess.call(['nslookup', 'www.python.org'])
 95 print('Exit code:', r)
 96 '''
 97 ''' win7输出
 98 $ nslookup www.python.org
 99 服务器:  clou-ad.szclou.com
100 Address:  10.98.94.5
101 
102 非权威应答:
103 名称:    dualstack.python.map.fastly.net
104 Addresses:  2a04:4e42:36::223
105           151.101.228.223
106 Aliases:  www.python.org
107 
108 Exit code: 0
109 请按任意键继续. . .
110 '''
111 '''
112 import subprocess
113 print('$ nslookup')
114 p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
115 output, err = p.communicate(b'set q=mx
python.org
exit
')
116 s='utf-8'#unix
117 s1='gbk'#win7系统
118 print(output.decode(s))
119 print('Exit code:', p.returncode)
120 '''
121 #相当于命令行下执行nslookup,然后输入
122 #set q=mx
123 #python.org
124 #exit
125 
126 '''win7输出
127 $ nslookup
128 默认服务器:  clou-ad.szclou.com
129 Address:  10.98.94.5
130 
131 > > 服务器:  clou-ad.szclou.com
132 Address:  10.98.94.5
133 
134 python.org      MX preference = 50, mail exchanger = mail.python.org
135 
136 mail.python.org internet address = 188.166.95.178
137 mail.python.org AAAA IPv6 address = 2a03:b0c0:2:d0::71:1
138 >
139 Exit code: 0
140 请按任意键继续. . .
141 '''
142 
143 ###########
144 #进程间通信
145 ###########
146 '''
147 from multiprocessing import Process, Queue
148 import os, time, random
149 
150 def write(q):
151     print('Process to write: %s' % os.getpid())
152     for value in ['A', 'B', 'C']:
153         print('put %s to queue.' % value)
154         q.put(value)
155         time.sleep(random.random())
156 
157 def read(q):
158     print('Process to read: %s' % os.getpid())
159     while True:
160         value = q.get(True)
161         print('Get %s from queue.' % value)
162 
163 if __name__=='__main__':
164     q=Queue()
165     pw = Process(target=write, args=(q,))
166     pr = Process(target=read, args=(q,))
167 
168     pw.start()
169     pr.start()
170     pw.join()
171     pr.terminate()
172 '''
173 '''
174 Process to write: 28536
175 Process to read: 27968
176 put A to queue.
177 Get A from queue.
178 put B to queue.
179 Get B from queue.
180 put C to queue.
181 Get C from queue.
182 请按任意键继续. . .
183 '''
184 
185 ##############################
186 #多线程:python的多线程是真正的Posix Thread,而不是模拟出来的(linux)
187 ##############################
188 
189 #Python的标准库提供了两个模块:_thread和threading,_thread是低级模块,
190 #threading是高级模块,对_thread进行了封装。绝大多数情况下,
191 #我们只需要使用threading这个高级模块。
192 '''
193 import time, threading
194 
195 print('thread %s is running...' % threading.current_thread().name)
196 
197 def loop():
198     print('thread %s is running...' % threading.current_thread().name)
199     n = 0
200     while n < 5:
201         n = n + 1
202         print('thread %s >>> %s' % (threading.current_thread().name, n))
203         time.sleep(1)
204     print('thread %s ended.' % threading.current_thread().name)
205     
206 
207 print('print something')
208 t=threading.Thread(target=loop, name='LoopThread')
209 t.start()
210 t.join()
211 print('thread %s ended.' % threading.current_thread().name)
212 '''
213 '''
214 thread MainThread is running...
215 thread LoopThread is running...
216 thread LoopThread >>> 1
217 thread LoopThread >>> 2
218 thread LoopThread >>> 3
219 thread LoopThread >>> 4
220 thread LoopThread >>> 5
221 thread LoopThread ended.
222 thread MainThread ended.
223 请按任意键继续. . .
224 '''
225 '''
226 任何进程默认启动一个进程,名字是MainThread。
227 threading.current_thread()返回当前线程的实例
228 线程创建时,如果不指定名称,默认是Thread-1,.......
229 '''
230 #定义锁
231 '''
232 lock = threading.Lock()
233 lock.acquire()
234 lock.release()
235 '''
236 
237 import time, threading, multiprocessing
238 '''
239 def loop():
240     x = 0
241     while True:
242         x = x ^ 1
243 
244 print('cpu:%s' % multiprocessing.cpu_count())
245 for i in range(multiprocessing.cpu_count()):
246     t = threading.Thread(target=loop)
247     t.start()
248 '''
249 
250 
251 ###########
252 #ThreadLocal:管理线程数据的全局变量
253 ###########
254 '''
255 import threading
256 #全局变量local_school,一个ThreadLocal对象
257 #但每个属性,如local_school.student都是线程的局部变量,不会相互干扰
258 
259 #ThreadLocal最常用的地方就是为每个线程绑定一个数据库连接,HTTP请求,
260 #用户身份信息等,这样一个线程的所有调用到的处理函数都可以非常方便地访问这些资源。
261 local_school = threading.local()
262 
263 def process_student():
264     std = local_school.student
265     print('Hello, %s (in %s)' % (std, threading.current_thread().name))
266 
267 def process_thread(name):
268     local_school.student = name
269     process_student()
270 
271 t1 = threading.Thread(target=process_thread, args=('Alice',), name='Thread-A')
272 t2 = threading.Thread(target=process_thread, args=('Bob',), name='Thread-B')
273 t1.start()
274 t2.start()
275 t1.join()
276 t2.join()
277 '''
278 
279 '''
280 计算密集型:cpu利用率高,C语言的运行效率最高,脚本语言效率低
281 IO密集型:脚本语言的开发效率高,c语言最差
282 协程:单线程的一步编程模型。有了协程的支持,就可以基于事件驱动编写高效的多任务程序
283 '''
284 
285 
286 ###########
287 #分布式进程
288 ###########
multiProcess.py

分布式进程示例:

 1 #task_master.py
 2 import random, time, queue
 3 from multiprocessing.managers import BaseManager
 4 #发送任务队列
 5 task_queue = queue.Queue()
 6 #接收结果队列
 7 result_queue = queue.Queue()
 8 
 9 #从BaseManager继承的QueueManager
10 class QueueManager(BaseManager):
11     pass
12 
13 # 把两个Queue都注册到网络上, callable参数关联了Queue对象:
14 QueueManager.register('get_task_queue', callable=lambda:task_queue)
15 QueueManager.register('get_result_queue', callable=lambda:result_queue)
16 
17 #创建实例对象
18 #绑定端口5000, 设置验证码 'abc'
19 manager = QueueManager(address=('', 5000), authkey=b'abc')
20 manager.start()
21 #获得通过网络访问的Queue对象
22 task = manager.get_task_queue()
23 result = manager.get_result_queue()
24 
25 time.sleep(10)
26 #队列中添加任务
27 for i in range(10):
28     n = random.randint(0, 10000)
29     print('Put task %d...' % n)
30     task.put(n)
31 #读取结果
32 print('Try get results...')
33 for i in range(10):
34     r = result.get(timeout=10)
35     print('Result:%s' % r)
36 
37 manager.shutdown()
38 print('master exit.')
 1 #task_worker.py
 2 import queue, time, sys
 3 from multiprocessing.managers import BaseManager
 4 
 5 class QueueManager(BaseManager):
 6     pass
 7 
 8 QueueManager.register('get_task_queue')
 9 QueueManager.register('get_result_queue')
10 
11 server_addr = '127.0.0.1'
12 print('Connect to server %s...' % server_addr)
13 m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
14 m.connect()
15 task = m.get_task_queue()
16 result = m.get_result_queue()
17 
18 '''
19 for i in range(10):
20     try:
21         n = task.get(timeout=1)
22         print('run task %d * %d...' % (n, n))
23         r = '%d * %d = %d' % (n, n, n*n)
24         time.sleep(1)
25         result.put(r)
26     except queue.Empty:
27         print('task queue is empty.')
28 '''
29 t1 = time.time()
30 while 1:
31 #    if not isinstance(task, queue.Queue()):
32 #        raise ValueError('invalid Queue:%s' % task)
33     try:
34         if task.empty():
35             time.sleep(1)
36             print('queue is empty')
37             continue
38         n = task.get(timeout=1)
39         print('run task %d * %d...' % (n, n))
40         r = '%d * %d = %d' % (n, n, n*n)
41         result.put(r)
42         time.sleep(1)
43         if time.time() - t1 > 60:
44             print('over 60s')
45             break
46     except BaseException as f:
47         print('Error:', f)
48         break;
49 
50 print('worker exit.')

可以多次执行work文件,创建多个进程,各个work进程中处理的数据之和是master进程中的数据,且各work处理的数据不会重复。

目前只能在Ubuntu12.04上执行,win7下可能由于权限问题,无法正常执行。

原文地址:https://www.cnblogs.com/mofei004/p/9438264.html