Python_Example_Process 进程 学习/经验/示例

 Author: 楚格

2018-11-16   20:08:32

IDE: Pycharm2018.02   Python 3.7   

KeyWord :  进程 multiprocess Process

Explain:  

--------

  1 # coding=utf-8
  2 #---------------------------------
  3 '''
  4 # Author  : chu ge 
  5 # Function:  进程 学习
  6 #
  7 '''
  8 #---------------------------------
  9 '''
 10 # --------------------------------
 11 # 导入模块 
 12 # 1.系统库
 13 # 2.第三方库
 14 # 3.相关定义库
 15 # --------------------------------
 16 '''
 17 # 1.系统库
 18 import sys
 19 import os
 20 import time
 21 import random
 22 
 23 #2.第三方库
 24 from multiprocessing import Process
 25 from multiprocessing import Pool
 26 from multiprocessing import Queue
 27 from multiprocessing import Manager
 28 
 29 
 30 
 31 #
 32 '''
 33 #》》》》》》》》》》
 34 进程
 35 
 36 ----------------------------------------------
 37 1.进程的创建 
 38 -fork 即 ret  = os.fork()
 39 # window中没有fork,Linux中才有fork
 40 
 41 -----------------------
 42 1.1进程VS程序
 43 编写完毕的代码,在没有运行的时候,称为程序;正在运行的代码,成为进程;
 44 进程,除了包含代码之外,还有需要运行环境等,所以和程序有区别
 45 
 46 -----------------------
 47 1.2 fork()。python的OS模块,包括fork. 
 48 
 49 e.g:
 50     pid = os.fork()
 51         if pid == 0:
 52             print("1")
 53         else:
 54             print("2")
 55 说明:
 56 程序执行到os.fork()时,操作系统会创建一个新的进程(子进程),
 57 然后复制父进程的所有信息到子进程中
 58 然后父进程和子进程都会从fork()函数中得到一个返回值,
 59 在子进程中这个值一定是0,而父进程中是子进程的ID号
 60 
 61 在Linux中,fork()系统函数,非常特殊。
 62 普通的函数调用,调用一次,但fork()调用一次,返回二次,
 63 因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),
 64 然后,分别在父进程和子进程内返回。
 65 
 66 -----------------------
 67 1.3子进程永远返回0,而父进程返回子进程的ID
 68     主进程即父进程
 69         ret > 0
 70     新建进程即子进程
 71         ret = 0
 72 e.g:
 73     ret = os.fork()
 74     print(ret)
 75 result:
 76     2506
 77     0
 78     
 79 -----------------------
 80 1.4一个父进程可以fork出很多子进程,所以父进程要记下每个子进程的ID,
 81 而子进程只需要调用.getpid()就可以拿到父进程的ID
 82 getpid() # 获取当前进程的值,子进程获取主进程的ID
 83 getppid()   #获取父进程的ID
 84 
 85 e.g:
 86     ret = os.fork()
 87     print(ret)
 88     if ret > 0:
 89         print("父进程---")
 90         pritn("%d"%os.getpid())
 91     else:
 92         print("子进程--")   
 93         pritn("%d - %d"%(os.getpid(),os.getppid()))
 94     
 95 result:
 96      20570     # 父进程
 97      父进程---  # 父进程中fork的返回值,
 98      20569     # 就是刚刚创建出的子进程的ID
 99      0         # 子进程
100      子进程--
101      20570 - 20569    
102 
103 -----------------------
104 1.5主进程结束不会因为子进程没有结束,而等待子进程
105 e.g:
106     ret = os.fork()
107     
108     if ret == 0:
109         print("--子进程--")
110         time.sleep(5)
111         print("--子进程 over ---",end="")
112     else:
113         print("---父进程")
114         time.sleep(3)
115 
116 -----------------------
117 1.6全局变量在多个进程中,不共享多进程修改全局变量
118 e.g:
119     g_num = 100
120     ret = os.fork()
121     
122     if ret == 0:
123         print("--process 1--")
124         g_num +=1
125         print("--process 1 ==%d"% g_num)
126     else:
127         time.sleep(3)
128         print("---process 2 ----")
129         print("--process 2 ==%d"% g_num)
130     
131 result:
132     --process 1--
133     --process 2 ==101
134     ---process 2 ----
135     --process 2 ==100
136     
137 -----------------------
138 1.7进程内容互不影响
139 进程间通信 :
140             管道和消息队列 同一台电脑
141             网络         不同台电脑
142 
143 多次fork的问题
144 # 父进程
145     ret = os.fork()
146     ret = os.fork()
147     ret = os.fork()
148 并列子进程 2^n = 2^3 = 8
149 
150 e.g:
151     # 父进程
152     ret = os.fork()
153     if ret == 0:
154         print("1)    #子进程
155     else:
156         print("2")  #父进程
157     
158     # 父子进程
159     ret = os.fork()
160     if ret == 0:
161         print("11")
162     else:
163         print("22")
164 result:
165     2
166     22
167     11    
168     1
169     22
170     11
171 一共4个进程
172 
173 e.g:
174     ret = os.fork()
175     if ret == 0:
176         print("1)
177     else:
178         print("2")
179         ret = os.fork()
180         if ret == 0:
181             print("11")
182         else:
183             print("22")
184 result:
185     2
186     22
187     11
188     1
189 一共三个进程
190 
191 ----------------------------------------------
192 2. multiprocessing模块     Windows环境下使用!!!
193 
194 -----------------------
195 2.1 提供了一个process类代表一个进程对象
196 
197 创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,
198 用start()方法启动,这样创建进程比fork()还要简单。
199 join()方法等待子进程结束后,在继续下运行,通常用于进程间的同步
200 
201 2.1.1 Process语法结构如下:
202 Process([group [,target [,name [,args [,kwargs]]]]])
203 group:  大多数情况下用不到
204 target: 表示这个进程实例所调用的对象
205 name:   未当前进程实例的别名
206 args:   表示调用对象的位置参数元组
207 kwargs: 表示调用对象的关键字参数字典
208 
209 2.1.2 Process类常用方法:
210 is_alive():         判断进程实例是否还在执行;
211 join([timeout]):    是否等待进程实例执行结束,或者等待多少秒
212 start():            启动进程实例(创建子进程)
213 run():  如果没有给定target参数,对这个对象调用start()方法时,
214         就将执行对象中的run()方法
215 terminate():    不管任务是否完成,立即终止。
216 
217 2.1.3 Process类常用属性:
218 name:   当前进程实例别名,默认Process-N ,N从1开始递增
219 pid:    当前进程实例的PID值。(python id)
220 
221 
222 -----------------------
223 2.2进程的创建 -Process 子类
224 继承Process类
225 创建新的进程还能使用类的方式,可以自定义一个类,继承Process类,
226 每次实例化这个类的时候,就等同于实例化一个进程对象。
227 
228 
229 ----------------------------------------------
230 3. 进程池 Pool
231 
232 -----------------------
233 3.1 当需要创建的子进程数量不多时,
234 可以直接利用multiprocessing中的Process动态生成多个进程,
235 但是如果是上百个甚至上千个目标,手动的去创建进程工作量巨大,
236 此时就可以用multiprocessing模块提供的Pool方法。
237 
238 -----------------------
239 3.2初始化Pool时,可以指定一个最大进程数,
240 当有新的请求提交到Pool中时,
241 如果池还没有满,那么就会创建一个新的进程用来执行该请求;
242 但如果池中的进程数已经达到指定最大值,那么该请求就会等待,
243 直到池中有进程结束,才会创建新的进程来执行
244 
245 -----------------------
246 3.3 比较
247 apply():
248 apply是阻塞的。首先主进程开始运行,碰到子进程,操作系统切换到子进程,
249 等待子进程运行结束后,在切换到另外一个子进程,直到所有子进程运行完毕。
250 然后在切换到主进程,运行剩余的部分。
251 这样跟单进程串行执行没什么区别,子进程是顺序执行的,且子进程全部执行完毕后才继续执行主进程。
252 apply_async():         # 建议使用此进程池方式创建实例
253 apply_async 是异步非阻塞的。
254 即不用等待当前进程执行完毕,随时根据系统调度来进行进程切换。
255 首先主进程开始运行,碰到子进程后,主进程仍可以先运行,
256 等到操作系统进行进程切换的时候,在交给子进程运行。
257 可以做到不等待子进程执行完毕,主进程就已经执行完毕,并退出程序。
258 
259 -----------------------
260 此进程池作为主要创建进程的方式
261 -----------------------
262 
263 --------------------------------------------
264 4进程间通信-Queue
265 
266 -----------------------
267 4.1 Queue的使用
268 可以使用multiprocessing模块的Queue实现多进程之间的数据传递,
269 Queue本身是一个消息队列程序,
270 
271 》》队列:先进先出       栈:先进后出
272 
273 说明:
274 初始化Queue()对象时(q=Queue)
275 若括号中没有指定最大可接受的消息数量或者数量为负值,
276 那么就代表可接受的消息数量没有上限(直到内存的尽头)
277 Queue.qsize():返回当前队列包含的消息数量。
278 Queue.empty():如果队列为空,返回True
279 Queue.full() :如果队列满了,返回True
280 
281 Queue.get([block [, timeout]]):获取队列中的一条消息,然后将其从队列移除,block默认值为True。
282     1)如果block使用默认值,且没有设置timeout(),消息队列如果为空,
283       此时程序将被阻塞(停在读取状态),直到从消息队列读到消息为止。
284       如果设置了timeout,则会等待timeout秒,若还没有读取到任何消息,
285       则抛出“Queue.Empty”异常。
286     2)如果block值为False,消息队列如果为空,则会立刻抛出“Queue.Empty”异常。
287 Queue.get_nowait(): 相当于Queue.get(False)
288 
289 Queue.put(item,[block [, timeout]]):将item消息写入队列,block默认值为True
290     1)如果block使用默认值,且没有设置timeout(),消息队列如果已经没有空间可写入,
291       此时程序将被阻塞(停在写入状态),直到从消息队列腾出空间为止。
292       如果设置了timeout,则会等待timeout秒,若还没有空间,
293       则抛出“Queue.Full”异常。
294     2)如果block值为False,消息队列如果没有空间写入,则会立刻抛出“Queue.Full”异常。
295 Queue.put_nowait(item): 相当于Queue.put(item,False)  
296 
297 -----------------------
298 4.2 进程池中的Queue
299 如果要是有Pool创建进程,就要使用multiprocess.manager()的Queue()。
300 而不是multiprocess.Queue(),否则会得到错误消息。
301 
302 -----------------------
303 
304 
305 ------------------------------------------
306 
307 -------------------------------------------------------
308 >>>>   重要
309 -------------------------------------------------------
310 
311 2.Process 创建进程
312 2.1例程 进程
313 
314 # 1.系统库
315 import sys
316 import os
317 import time
318 import random
319 
320 #2.第三方库
321 from multiprocessing import Process
322 from multiprocessing import Pool
323 from multiprocessing import Queue
324 
325 -----------------------------
326 
327 单进程 示例
328 
329 # 任务1
330 def test():
331     while True:
332         print("--- test function ---")
333         time.sleep(1)
334         
335 if __name__ == "__main__":
336     pool = Process(target=test)     # 实例对象
337     pool.start()                    # 让这个进程开始执行
338                                     # test函数里的代码
339     # 主进程
340     while True:
341         print("---main---")
342         time.sleep(1)
343 
344 -----------------------------
345 多进程
346 
347 def Function_Test_A():
348     print("-A1")
349     while True:
350         print("-Function_Test_A---")
351         print("-A2")
352         time.sleep(1)
353         print("-A3")
354 
355 def Function_Test_B():
356     print("--B1")
357     while True:
358         print("--Function_Test_B---")
359         print("--B2")
360         time.sleep(2)
361         print("--B3")
362 
363 def Function_Test_C():
364     print("---C1")
365     while True:
366         print("---Function_Test_C---")
367         print("---C2")
368         time.sleep(4)
369         print("---C3")
370 
371 
372 if __name__ == "__main__":
373 
374     pool_A = Process(target=Function_Test_A)
375     pool_A.start()  # 让这个进程开始执行实例函数里的代码
376     pool_B = Process(target=Function_Test_B)
377     pool_B.start()  # 让这个进程开始执行实例函数里的代码
378     pool_C = Process(target=Function_Test_C)
379     pool_C.start()  # 让这个进程开始执行实例函数里的代码
380 
381     var = 1000
382     print("M1")
383     # 主进程
384     while True:
385         print("Main=============")
386         print("M2")
387         print(var)
388         var += 1
389         print("M3")
390         time.sleep(0.5)
391         print("M4")
392 
393 说明:
394 主进程等待Process子进程先结束,即主进程最后结束.(注释主进程,可以体现此现象)
395 
396 -------------------------
397 
398 等待join() 等待超时时间
399 
400 def Function_Test_D():
401     print("----D1")
402     print("----Function_Test_D----")
403     print("----D2")
404     for i in range(random.randint(1, 5)):
405         print("----D = [%d]" % i)
406         print("----D3")
407         time.sleep(0.1)
408         print("----D4")
409     print("----D5")
410 
411 if __name__ == "__main__":
412     pool_D = Process(target=Function_Test_D)
413     pool_D.start()  # 让这个进程开始执行实例函数里的代码
414     pool_D.join()   # 等待此对象实例子进程结束为止
415 
416     print("main")
417 
418 说明:pool.join() #堵塞 子进程结束后才能执行 print("main")
419 -------------------------
420 
421 继承Process类 创建进程
422 
423 class Process_Class(Process):
424     # 因为Process类本身也有__init__方法,这个子类相当于重新写了这个方法。
425     # 但是这样带来新的问题,我们并没有完全的初始化一个Process类,
426     # 所有就不建议使用。
427     # 最好的方法就是将继承本身传递给Process.__init__方法,
428     # 完成这些初始化。
429 
430     def __init__(self, interval):
431         Process.__init__(self)
432         self.interval = interval
433 
434     # 重新写了Process类的方法run()方法
435     def run(self):
436         print("子进程(%s)开始执行,父进程(%s)" % (os.getpid(), os.getppid()))
437         t_start = time.time()
438         time.sleep(self.interval)
439         t_stop = time.time()
440         print("(%s)执行结束,耗时%0.2f秒" % (os.getpid(), (t_stop - t_start)))
441 
442 if __name__ == "__main__":
443     t_start = time.time()
444     print("当前程序进程(%s)" % (os.getpid()))
445     pool_E = Process_Class(2)
446     # target: 表示这个进程实例所调用的对象
447     # start():            启动进程实例(创建子进程)
448     # run():  如果没有给定target参数,对这个对象调用start()方法时,
449     #         就将执行对象中的run()方法
450     # terminate():    不管任务是否完成,立即终止。
451 
452     #对一个不包含target属性的Process类执行start()就会运行这个类中run()
453     #
454     pool_E.start()
455     pool_E.join()
456     t_stop = time.time()
457     print("(%s)执行结束,耗时%0.2f秒" % (os.getpid(), (t_stop - t_start)))
458 
459 result:
460 当前程序进程(24264)
461 子进程(24296)开始执行,父进程(24264)
462 (24296)执行结束,耗时2.00秒
463 (24264)执行结束,耗时2.23秒
464 
465 说明:类对象创建进程,基于上述创建,只是时间不同。
466     interval = 2s 间隔时间 2秒
467 
468 -------------------------
469 
470 进程池
471 
472 # 进程池任务
473 def Function_Test_F(num):
474     # random.random()随机生成0-1之间的浮点数
475     for i in range(2):
476         print("===pid=%d==num=%d="%(os.getpid(), num))
477         time.sleep(1)
478 
479 if __name__ == "__main__":
480     # 定义一个进程池,最大进程数
481     # 3表示:进程池中最多有3个进程一起执行
482     pool_name = Pool(3)
483 
484     for i in range(5):
485         print("---%d---"%i)
486         #向进程池中添加任务
487         #注意:如果添加的任务数量,超过了进程池中进程的最大个数的话,
488         #     那么不会导致添加不进入,添加到进程中的任务.
489         #     如果还没有被执行的话,那么此时他们会等待进程池中的,
490         #     进程完成一个任务之后,会自动的去用刚刚的那个进程,
491         #     完成当前的新任务
492         #pool_F.apply_async(要调用的目标,(传递给目标的参数()元组,))
493         pool_name.apply_async(Function_Test_F, (i,))
494 
495     # 关闭进程池,相当于不能够再次添加新任务了,pool_F不再接收新的请求
496     pool_name.close()
497     #等待pool_F中所有子进程执行完成,必须放在close语句之后
498     pool_name.join()
499     # 主进程 创建/添加 任务后,主进程默认不会等待进程池中的任务执行完后才结束
500     # 而是当主进程的任务做完之后 立马结束,
501     # 如果这个地方没join,会导致进程池中的任务不会执行
502 
503 result:
504 ---0---
505 ---1---
506 ---2---
507 ---3---
508 ---4---
509 ===pid=25644==num=0=
510 ===pid=26392==num=1=
511 ===pid=25992==num=2=
512 ===pid=25644==num=0=
513 ===pid=26392==num=1=
514 ===pid=25992==num=2=
515 ===pid=25644==num=3=
516 ===pid=26392==num=4=
517 ===pid=25644==num=3=
518 ===pid=26392==num=4=
519 
520 说明:可以做创建进程标准模式
521 规定一定数量进程后,不再考虑进程堵塞问题
522 
523 此进程池作为主要创建进程的方式
524 
525 -------------------------
526 
527 进程间通信
528 
529 if __name__ == "__main__":
530     # 初始化一个Queue对象,最多可接受三条put消息
531     queue = Queue(3)
532     queue.put("message 1 ")
533     queue.put("message 2 ")
534     print(queue.full())    # False
535     queue.put("message 3 ")
536     print(queue.full())    # True
537     
538     # 因为消息队列已满,下面的try都会抛出异常,
539     # 第一个等待2秒再抛出异常
540     # 第二个等待4秒再抛出异常
541     try:
542         queue.put("message 4 ",True,2)
543     except Exception:
544         print("message full,current number: [%s]" % (queue.qsize()))
545     
546     try:
547         queue.put("message 5 ",True,2)
548     except Exception:
549         print("message full,current number: [%s]" % (queue.qsize()))
550     
551     # 推荐的方式,先判断消息队列是否已满,在写入
552     if not queue.full():
553         queue.put("message 6 ")
554     
555     # 读取消息时,先判断消息队列是否为空,在读取
556     if not queue.empty():
557         for var in range(queue.qsize()):
558             print(queue.get_nowait())
559 
560 result:
561     False
562     True
563     message full,current number: [3]
564     message full,current number: [3]
565     message 1 
566     message 2 
567     message 3 
568 
569 -------------------------
570 
571 Queue 进程间通信 读写数据
572 
573 # 写数据进程执行的代码
574 def Function_Write(queue_name_data):
575     local_var_queue_data = queue_name_data
576     for value in ["A", "B", "C", "D"]:
577         print("put [%s] to queue " % (value))
578         local_var_queue_data.put(value)
579         time.sleep(random.random())
580 
581 # 读取数据进程执行的代码:
582 def Function_Read(queue_name_data):
583     local_var_queue_data = queue_name_data
584     while True:
585         if not local_var_queue_data.empty():
586             value = local_var_queue_data.get(True)
587             print("get [%s] from queue " % (value))
588             time.sleep(random.random())
589         else:
590             break
591             
592 if __name__ == "__main__":
593 
594     # 父进程创建Queue,并传给各个子进程: 实例 Queue读写数据
595     queue_name = Queue()
596     proc_write = Process(target = Function_Write, args = (queue_name,))
597     proc_read = Process(target = Function_Read,  args = (queue_name,))
598 
599     proc_write.start()  # 启动子进程pw,写入
600     proc_write.join()   # 等待pw结束
601     proc_read.start()  # 启动子进程pr,写入
602     proc_read.join()   # 等待pr结束
603 
604     # pr进程是死循环,无法等待其结束,只能强行终止
605     print("...")
606     print("所有数据都写入并且读完")
607     
608 result:
609     put [A] to queue 
610     put [B] to queue 
611     put [C] to queue 
612     put [D] to queue 
613     get [A] from queue 
614     get [B] from queue 
615     get [C] from queue 
616     get [D] from queue 
617     ...
618     所有数据都写入并且读完
619 
620 -------------------------
621 # 进程池中通信
622 def Function_reader(queue_name_data):
623     local_var_queue_data = queue_name_data
624     print(" reader 启动(%s),父进程为(%s)" % (os.getpid(),os.getppid()))
625     for i in range(local_var_queue_data.qsize()):
626         print(" Function_reader 从Queue获取到消息: %s " % (local_var_queue_data.get(True)))
627 
628 
629 def Function_writer(queue_name_data):
630     local_var_queue_data = queue_name_data
631     print(" writer 启动(%s),父进程为(%s)" % (os.getpid(), os.getppid()))
632     for var in "dongGe":
633         local_var_queue_data.put(var)
634 
635 if __name__ == "__main__":
636     # 进程池中Queue 通信
637     print("(%s) start" % (os.getpid()))
638     queue_name = Manager().Queue() # 使用Manager中的Queue来初始化
639     pool_name = Pool()
640     # 使用阻塞模式创建进程,这样就不需要再reader中使用死循环
641     pool_name.apply(Function_writer,(queue_name,))
642     pool_name.apply(Function_reader,(queue_name,))
643     pool_name.close()
644     pool_name.join()
645     print("(%s) end" % (os.getpid()))
646 
647 result:
648      (36900) start
649      writer 启动(37760),父进程为(36900)
650      reader 启动(37020),父进程为(36900)
651      Function_reader 从Queue获取到消息: d 
652      Function_reader 从Queue获取到消息: o 
653      Function_reader 从Queue获取到消息: n 
654      Function_reader 从Queue获取到消息: g 
655      Function_reader 从Queue获取到消息: G 
656      Function_reader 从Queue获取到消息: e 
657      (36900) end
658 
659 -------------------------
660 
661 ======================================================
662 
663 线程
664 
665 1.如果多个线程执行的都是同一个函数的话,
666 各自之间互不影响,各自执行
667 
668 
669 
670 
671 
672 ======================================================
673 '''
674 
675 
676 '''
677 # ============================================================================
678 # Function:  
679 # Explain :  输入参数   
680 #         :  输出参数  
681 # ============================================================================
682 '''
683 
684 #-------------------------
685 # 单进程任务
686 def Function_Test_A():
687     print("-A1")
688     while True:
689         print("-Function_Test_A---")
690         print("-A2")
691         time.sleep(1)
692         print("-A3")
693 
694 #-------------------------
695 # 多进程任务
696 def Function_Test_B():
697     print("--B1")
698     while True:
699         print("--Function_Test_B---")
700         print("--B2")
701         time.sleep(2)
702         print("--B3")
703 
704 #-------------------------
705 # 多进程任务
706 def Function_Test_C():
707     print("---C1")
708     while True:
709         print("---Function_Test_C---")
710         print("---C2")
711         time.sleep(4)
712         print("---C3")
713 
714 #-------------------------
715 # 多进程任务,join()
716 def Function_Test_D():
717     print("----D1")
718     print("----Function_Test_D----")
719     print("----D2")
720     for i in range(random.randint(1, 5)):
721         print("----D = [%d]" % i)
722         print("----D3")
723         time.sleep(0.1)
724         print("----D4")
725     print("----D5")
726 
727 #-------------------------
728 # 继承Process类 新建进程
729 class Process_Class(Process):
730     # 因为Process类本身也有__init__方法,这个子类相当于重新写了这个方法。
731     # 但是这样带来新的问题,我们并没有完全的初始化一个Process类,
732     # 所有就不建议使用。
733     # 最好的方法就是将继承本身传递给Process.__init__方法,
734     # 完成这些初始化。
735 
736     def __init__(self, interval):
737         Process.__init__(self)
738         self.interval = interval
739 
740     # 重新写了Process类的方法run()方法
741     # start()调用run()方法
742     def run(self):
743         print("子进程(%s)开始执行,父进程(%s)" % (os.getpid(), os.getppid()))
744         t_start = time.time()
745         time.sleep(self.interval)
746         t_stop = time.time()
747         print("(%s)执行结束,耗时%0.2f秒" % (os.getpid(), (t_stop - t_start)))
748 
749 #-------------------------
750 # 进程池任务
751 def Function_Test_F(num):
752     # random.random()随机生成0-1之间的浮点数
753     for i in range(2):
754         print("===pid=%d==num=%d="%(os.getpid(), num))
755         time.sleep(1)
756 
757 #-------------------------
758 # 写数据进程执行的代码
759 def Function_Write(queue_name_data):
760     local_var_queue_data = queue_name_data
761     for value in ["A", "B", "C", "D"]:
762         print("put [%s] to queue " % (value))
763         local_var_queue_data.put(value)
764         time.sleep(random.random())
765 
766 # 读取数据进程执行的代码:
767 def Function_Read(queue_name_data):
768     local_var_queue_data = queue_name_data
769     while True:
770         if not local_var_queue_data.empty():
771             value = local_var_queue_data.get(True)
772             print("get [%s] from queue " % (value))
773             time.sleep(random.random())
774         else:
775             break
776 #-------------------------
777 # 进程池中通信
778 def Function_reader(queue_name_data):
779     local_var_queue_data = queue_name_data
780     print(" reader 启动(%s),父进程为(%s)" % (os.getpid(),os.getppid()))
781     for i in range(local_var_queue_data.qsize()):
782         print(" Function_reader 从Queue获取到消息: %s " % (local_var_queue_data.get(True)))
783 
784 
785 def Function_writer(queue_name_data):
786     local_var_queue_data = queue_name_data
787     print(" writer 启动(%s),父进程为(%s)" % (os.getpid(), os.getppid()))
788     for var in "dongGe":
789         local_var_queue_data.put(var)
790 
791 
792 #-------------------------
793 
794 # ============================================================================
795 '''
796 # ============================================================================
797 #   测试专用
798 # ============================================================================
799 '''
800 if __name__ == "__main__":
801 
802     # #-------------------------
803     # # 单/多进程
804     # pool_A = Process(target=Function_Test_A)
805     # pool_A.start()  # 让这个进程开始执行实例函数里的代码
806     # pool_B = Process(target=Function_Test_B)
807     # pool_B.start()  # 让这个进程开始执行实例函数里的代码
808     # pool_C = Process(target=Function_Test_C)
809     # pool_C.start()  # 让这个进程开始执行实例函数里的代码
810     # var = 1000
811     # print("M1")
812     # while True:
813     #     print("Main=============")
814     #     print("M2")
815     #     print(var)
816     #     var += 1
817     #     print("M3")
818     #     time.sleep(0.5)
819     #     print("M4")
820 
821 
822     # #-------------------------
823     # #添加join()
824     # pool_D = Process(target=Function_Test_D)
825     # pool_D.start()  # 让这个进程开始执行实例函数里的代码
826     # pool_D.join()   # 等待此对象实例子进程结束为止
827     # print("main")
828 
829 
830     # #-------------------------
831     # #继承Process类 新建进程
832     # t_start = time.time()
833     # print("当前程序进程(%s)" % (os.getpid()))
834     # # interval = 2s 间隔时间 2秒
835     # pool_E = Process_Class(2)
836     # # target :  表示这个进程实例所调用的对象
837     # # start(): 启动进程实例(创建子进程)
838     # # run()  :   如果没有给定target参数,对这个对象调用start()方法时,
839     # #          就将执行对象中的run()方法
840     # # terminate(): 不管任务是否完成,立即终止。
841     # # 对一个不包含target属性的Process类执行start()就会运行这个类中run()
842     # pool_E.start() # start()调用run()方法
843     # pool_E.join()
844     # t_stop = time.time()
845     # print("(%s)执行结束,耗时%0.2f秒" % (os.getpid(), (t_stop - t_start)))
846 
847 
848     # #-------------------------
849     # # 进程池
850     # # 定义一个进程池,最大进程数
851     # # 3表示:进程池中最多有3个进程一起执行
852     # pool_name = Pool(3)
853     #
854     # for i in range(5):
855     #     print("---%d---"%i)
856     #     #向进程池中添加任务
857     #     #注意:如果添加的任务数量,超过了进程池中进程的最大个数的话,
858     #     #     那么不会导致添加不进入,添加到进程中的任务.
859     #     #     如果还没有被执行的话,那么此时他们会等待进程池中的,
860     #     #     进程完成一个任务之后,会自动的去用刚刚的那个进程,
861     #     #     完成当前的新任务
862     #     #pool_F.apply_async(要调用的目标,(传递给目标的参数()元组,))
863     #     pool_name.apply_async(Function_Test_F, (i,))
864     #
865     # # 关闭进程池,相当于不能够再次添加新任务了,pool_F不再接收新的请求
866     # pool_name.close()
867     # #等待pool_F中所有子进程执行完成,必须放在close语句之后
868     # pool_name.join()
869     # # 主进程 创建/添加 任务后,主进程默认不会等待进程池中的任务执行完后才结束
870     # # 而是当主进程的任务做完之后 立马结束,
871     # # 如果这个地方没join,会导致进程池中的任务不会执行
872 
873 
874 
875     # # #-------------------------
876     # # 初始化一个Queue对象,最多可接受三条put消息
877     # queue = Queue(4)
878     # queue.put("message 1 ")
879     # queue.put("message 2 ")
880     # print(queue.full())    # False
881     # queue.put("message 3 ")
882     # print(queue.full())    # True
883     #
884     # # 因为消息队列已满,下面的try都会抛出异常,
885     # # 第一个等待2秒再抛出异常
886     # # 第二个等待4秒再抛出异常
887     # try:
888     #     queue.put("message 4 ",True,2)
889     # except Exception:
890     #     print("message full,current number: [%s]" % (queue.qsize()))
891     #
892     # try:
893     #     queue.put("message 5 ",True,2)
894     # except Exception:
895     #     print("message full,current number: [%s]" % (queue.qsize()))
896     #
897     # # 推荐的方式,先判断消息队列是否已满,在写入
898     # if not queue.full():
899     #     queue.put("message 6 ")
900     #
901     # # 读取消息时,先判断消息队列是否为空,在读取
902     # if not queue.empty():
903     #     print(queue.qsize())
904     #     for var in range(queue.qsize()):
905     #         print(queue.get_nowait())
906     #
907 
908 
909     # # #-------------------------
910     # # 父进程创建Queue,并传给各个子进程: 实例 Queue读写数据
911     # queue_name = Queue()
912     # proc_write = Process(target = Function_Write, args = (queue_name,))
913     # proc_read = Process(target = Function_Read,  args = (queue_name,))
914     #
915     # proc_write.start()  # 启动子进程pw,写入
916     # proc_write.join()   # 等待pw结束
917     # proc_read.start()  # 启动子进程pr,写入
918     # proc_read.join()   # 等待pr结束
919     #
920     # # pr进程是死循环,无法等待其结束,只能强行终止
921     # print("...")
922     # print("所有数据都写入并且读完")
923 
924 
925     # # #-------------------------
926     # # 进程池中Queue 通信
927     # print("(%s) start" % (os.getpid()))
928     # queue_name = Manager().Queue() # 使用Manager中的Queue来初始化
929     # pool_name = Pool()
930     # # 使用阻塞模式创建进程,这样就不需要再reader中使用死循环
931     # pool_name.apply(Function_writer,(queue_name,))
932     # pool_name.apply(Function_reader,(queue_name,))
933     # pool_name.close()
934     # pool_name.join()
935     # print("(%s) end" % (os.getpid()))
936 
937 
938     print(" learn finish")
939 
940 
941     

--

 finish

--------

原文地址:https://www.cnblogs.com/caochucheng/p/9971302.html