0704 Process继承实现多进程、Pool进程池,进程间通过队列通信,Pool实现多进程实现复制文件

通过继承的方式,实现Process多进程

 1 from multiprocessing import Process
 2 import time
 3 
 4 class MyNewProcess(Process):
 5     def run(self):
 6         for i in range(10):
 7             print("----run----")
 8             time.sleep(1)
 9 
10 
11 if __name__ == "__main__":
12     p = MyNewProcess()
13 
14     p.start()        # Process 中的 start 方法会主动调用 run 方法
15 for i in range(10): print("---main---") time.sleep(1)

Pool 进程池实现多进程

 1 import time
 2 from multiprocessing import Pool
 3 
 4 def worker():
 5     for i in range(10):
 6         print("From worker %s"%i)
 7         time.sleep(0.5)
 8 
 9 def foo():
10     for i in range(10):
11         print("From foo %s"%i)
12         time.sleep(0.5)
13 
14 def bar():
15     for i in range(10):
16         print("From bar %s"%i)
17         time.sleep(0.5)
18 
19 if __name__ == "__main__":
20     pool = Pool(3)            # 创建三个 进程
21     pool.apply_async(worker)
22     pool.apply_async(foo)
23     pool.apply_async(bar)
24 
25     pool.close()                  # 关闭进程池,禁止添加任务
26     pool.join()          # 等待子进程结束后,主进程才往下走
27     print("Is done...")
28 
29 
30 
31 # process and Pool 最后都是调用 fork
32 # 通常情况下,主进程一半用来等低啊,,,,真正的任务子进程中执行

Queue队列的简单使用

 1 from multiprocessing import Queue
 2 
 3 q = Queue(3)    # 初始化一个Queue对象,最多可以put三条信息,如果不写3,那么久无限制
 4 
 5 q.put("Message01")         # 添加信息的方法
 6 q.put("Message02")
 7 print(q.full())          # 查看 队列 是否满了的方法
 8 
 9 q.put("Message03")
10 print(q.full())
11 
12 # 因为队列已经满了,所以下面的消息会出现异常,第一个 try 会等待2秒后再抛出异常,
13 # 第二个 try 会立刻抛出异常
14 try:
15     q.put("Message04", True, 2)
16 except:
17     print("消息队列已满,现有消息数量:%s"%q.qsize())
18 
19 try:
20     q.put_nowait("Message04")
21 except:
22     print("消息队列已满,现有消息数量:%s"%q.qsize())
23 
24 # 推荐使用的方式,先判断队列是否已满,再写入
25 if not q.full():
26     q.put_nowait("Message04")
27 
28 # 读取消息的时候,先判断消息队列是否为空,再读取
29 if not q.empty():
30     for i in range(q.qsize()):
31         print(q.get_nowait())

Process配合Queue实现进程间通信

 1 from multiprocessing import Process, Queue
 2 import time, random
 3 
 4 # 写数据进程执行的代码
 5 def write(q):
 6     for value in ['a', 'b', 'c']:
 7         print("Put %s to queue..."%value)
 8         q.put(value)
 9         time.sleep(random.random())
10 
11 # 读数据进程的代码
12 def read(q):
13     while True:
14         if not q.empty():
15             value = q.get(True)
16             print("Get %s from queue..."%value)
17             time.sleep(random.random())
18         else:
19             break
20 
21 if __name__ == "__main__":
22     # 父进程创建 Queue, 并传给各个子进程
23     q = Queue()
24     pw = Process(target=write, args=(q, ))
25     pr = Process(target=read, args=(q, ))
26 
27     # 启动写入子进程,并等待结束
28     pw.start()
29     pw.join()
30 
31     # 启动读取子进程,并等待结束
32     pr.start()
33     pr.join()

进程池与队列合作实现进程间通信

 1 # 修改 import 中的 Queue 为 Manager
 2 from multiprocessing import Manager, Pool
 3 import os
 4 
 5 
 6 def reader(q):
 7     print("reader启动(%s),父进程为(%s)"%(os.getpid(), os.getppid()))
 8     for i in range(q.qsize()):
 9         print("reader从Queue获取到消息:%s"%q.get(True))
10 
11 def writer(q):
12     print("writer启动(%s),父进程为(%s)"%(os.getpid(), os.getppid()))
13     for i in "Always":
14         q.put(i)
15 
16 
17 if __name__ == "__main__":
18     print("(%s) start"%os.getpid())
19     q = Manager().Queue()  # 使用Manager中的Queue来初始化
20     po = Pool()
21     # 使用阻塞模式创建进程,这样就不需要咋reader中使用死循环了,可以让writer完全执行后,再reader
22     po.apply(writer, (q,))
23     po.apply(reader, (q,))
24 
25     po.close()
26     po.join()
27     print("(%s) End" % os.getpid())

利用Pool进程池实现简单的文件复制

 1 import os
 2 import time
 3 from multiprocessing import Pool
 4 
 5 def copyFile(oldPath, newPath, fileName):
 6     print("%s 准备复制中。。。"%fileName)
 7     with open("%s\%s"%(oldPath, fileName), 'r') as fr, open("%s\%s"%(newPath, fileName), 'w') as fw:
 8         for line in fr:
 9             fw.write(line)
10     time.sleep(1)
11 
12 
13 if __name__ == "__main__":
14     oldPath = r"file"
15     newPath = r"file-副本"
16 
17     os.mkdir(newPath)
18 
19     pool = Pool(5)
20 
21     fileList = os.listdir(oldPath)
22 
23     for fileName in fileList:
24         pool.apply_async(copyFile, (oldPath, newPath, fileName))
25 
26     pool.close()
27 
28     pool.join()
29 
30     print("文件复制完成....")
原文地址:https://www.cnblogs.com/alwaysInMe/p/7119021.html