Python3 从零单排24_多进程

  进程:正在进行的一个过程或者说一个任务。而负责执行任务则是cpu。 程序仅仅只是一堆代码而已,而进程指的是程序的运行过程

  1.开启进程的两种方式:

 1 from multiprocessing import Process
 2 import time
 3 import os
 4 
 5 # 方式一:
 6 def task(name):
 7     print("%s is running...,pid: %s; ppid: %s " % (name, os.getpid(), os.getppid()))  # 拿到进程pid以及父进程pid
 8     time.sleep(1)
 9     print("%s is done..." % name)
10 
11 # 方式二
12 class CreateProcess(Process):
13     def __init__(self, name):
14         super().__init__()
15         self.name = name  # 如果没指定名字,会有自己的名称,类名+编号
16 
17     def run(self):
18         print("%s is running...,pid: %s; ppid: %s " % (self.name, os.getpid(), os.getppid()))
19         time.sleep(3)
20         print("%s is done..." % self.name)
21 
22 
23 if __name__ == "__main__":
24     # 方式一:
25     p1 = Process(target=task, args=("方式一子进程",))
26     p1.start()  # 告诉操作系统帮忙开启新的子进程,操作系统开辟一个新的内存空间给子进程使用,并把主进程的内存数据复制到子进程的内存空间里
27     print(p1.name)  # 进程名称,Process-编号 也可以自己指定
28     p4 = Process(target=task, name="我是方式一的子进程4444", args=("方式一子进程4",))
29     print(p4.name)
30 
31     # 方式二:
32     p2 = CreateProcess("方式二子进程1")
33     p3 = CreateProcess("方式二子进程2")
34     p2.start()
35     p3.start()
36 
37     print(p1.is_alive())  # true
38     print(p3.is_alive())  # true
39     p3.terminate()  # 杀调子进程,将子进程变成僵尸进程。实质是发送杀掉进程命令给操作系统,操作系统回收内存资源,保留子进程状态信息
40     p1.join()  # 等待子进程p1运行结算才往下运行
41     print("after terminate p3:", p3.is_alive())  # false
42     print(p1.pid, p2.pid, p3.pid)  # 查看子进程的pid
43     print(p1.is_alive())  # 判断进程存活状态,因为上面join了,所以p1已经是结束了的,只是保留了状态,所以false
44     print("主进程结束! pid: %s" % os.getpid())  # 拿到进程pid
View Code

  僵尸进程:子进程运行完后,内存空间被操作系统收回,但是进程状态需要保留给父进程查看; 也就是说子进程实际上是死了,但是还留有状态信息给父进程可查,等父进程也运行完后,才会清理掉这个已经死掉的子进程状态信息。

  孤儿进程:父进程挂掉后,子进程还在运行,这个时候就由操作系统的 init(linux操作系统中国呢所有进程的父进程) 进程监管/回收这些子进程

  2.守护进程 :子进程设置为主进程的守护进程,主进程运行完(重点,主进程运行完!!!),子进程也跟着死掉。

 1 from multiprocessing import Process
 2 import time
 3 class my_process_thread(Process):
 4     def run(self):
 5         print("%s is running" % self.name)
 6         time.sleep(1)
 7         print("%s is done" % self.name)
 8 
 9 
10 if __name__ == "__main__":
11     p1 = my_process_thread()
12     p1.daemon = True  # 设置为守护线程后,这里主线程运行完,p1就死掉了,不会打印进程1
13     p1.start()
14     p2 = my_process_thread()
15     p2.start()
16     print("主线程 is over !")
View Code

  3.互斥锁

 1 # 问题:竞争资源会出现数据错乱,比如多个进程竞争终端,都要打印数据,大家都在打印,数据就乱了。
 2 from multiprocessing import Process
 3 import time
 4 class my_process_thread(Process):
 5     def run(self):
 6         print("%s is running 第一行" % self.name)
 7         time.sleep(1)
 8         print("%s is running 第二行" % self.name)
 9         time.sleep(1)
10         print("%s is running 第三行" % self.name)
11         time.sleep(1)
12         print("%s is done" % self.name)
13 
14 
15 if __name__ == "__main__":
16     for i in range(3):
17         p = my_process_thread()
18         p.start()
19     print("主线程 is over !")
20 
21 
22 
23 # 主线程 is over !
24 # my_process&thread-1 is running 第一行
25 # my_process&thread-2 is running 第一行
26 # my_process&thread-3 is running 第一行
27 # my_process&thread-1 is running 第二行
28 # my_process&thread-2 is running 第二行
29 # my_process&thread-3 is running 第二行
30 # my_process&thread-1 is running 第三行
31 # my_process&thread-3 is running 第三行Myprocess-2 is running 第三行
32 # 
33 # my_process&thread-1 is done
34 # my_process&thread-3 is done
35 # my_process&thread-2 is done
36 '''
37 
38 # 互斥锁的意义在于将并发变回串行,一个进程用完下个进程才可以用,数据就有序了,但是效率降低,取决你需要数据安全性还是效率。
39 '''
40 from multiprocessing import Process, Lock
41 import time
42 class my_process&thread(Process):
43     def __init__(self, lock):
44         super().__init__()
45         self.lock = lock
46 
47     def run(self):
48         self.lock.acquire()  # 申请锁
49         print("%s is running 第一行" % self.name)
50         time.sleep(1)
51         print("%s is running 第二行" % self.name)
52         time.sleep(1)
53         print("%s is running 第三行" % self.name)
54         time.sleep(1)
55         print("%s is done" % self.name)
56         self.lock.release()  # 释放锁
57 
58 
59 if __name__ == "__main__":
60     lock = Lock()   # 实例化锁,子进程拿到锁才可以运行
61     for i in range(3):
62         p = my_process&thread(lock)
63         p.start()
64     print("主线程 is over !")
65 
66 
67 # 主线程 is over !
68 # my_process&thread-1 is running 第一行
69 # my_process&thread-1 is running 第二行
70 # my_process&thread-1 is running 第三行
71 # my_process&thread-1 is done
72 # my_process&thread-2 is running 第一行
73 # my_process&thread-2 is running 第二行
74 # my_process&thread-2 is running 第三行
75 # my_process&thread-2 is done
76 # my_process&thread-3 is running 第一行
77 # my_process&thread-3 is running 第二行
78 # my_process&thread-3 is running 第三行
79 # my_process&thread-3 is done
View Code

  4.join和互斥锁的区别
  join只能让整个子进程串行,互斥锁可以让局部代码串行(比如修改数据部分串行,其他查询可以继续并行)

 1 from multiprocessing import Process,Lock
 2 import json
 3 import time
 4 
 5 def search(name):
 6     time.sleep(1)
 7     dic=json.load(open('db.txt','r',encoding='utf-8'))
 8     print('<%s> 查看到剩余票数【%s】' %(name,dic['count']))
 9 
10 
11 def get(name):
12     time.sleep(1)
13     dic=json.load(open('db.txt','r',encoding='utf-8'))
14     if dic['count'] > 0:
15         dic['count']-=1
16         time.sleep(3)
17         json.dump(dic,open('db.txt','w',encoding='utf-8'))
18         print('<%s> 购票成功' %name)
19     else:
20         print('<%s> 购票失败' %name)
21 
22 def task(name,):
23     search(name)
24     # mutex.acquire()
25     get(name)
26     # mutex.release()
27 
28 if __name__ == '__main__':
29     # mutex=Lock()
30     for i in range(10):
31         p=Process(target=task,args=('路人%s' %i,))
32         p.start()
33         p.join()
View Code

  5.队列

  进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的
  创建队列的类(底层就是以管道和锁定的方式实现):
  Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。

  maxsize是队列中允许最大项数,省略则无大小限制。但需要明确:
    1、队列内存放的是消息而非大数据
    2、队列占用的是内存空间,因而maxsize即便是无大小限制也受限于内存大小

 1 from multiprocessing import Queue
 2 q = Queue(3)
 3 
 4 q.put(123)  # 往队列里塞一个数据,可以是任何数据类型
 5 q.put([4, 5, 6])
 6 print(q.full())  # 判断队列大小是否已经达到最大项数,这里才塞了俩数据,所以false
 7 q.put({"a": "apple"})
 8 print(q.full())  # 已经达到最大项数了,True
 9 print(q.empty())  # 判断队列里的是否不存在数据了,这里队列了有三数据,所以为false
10 
11 print(q.get())  # 根据先进先出原则,从队列里取一个数据
12 print(q.get())
13 print(q.get())
14 print(q.empty())  # True
15 
16 
17 # 生产者/消费者模型
18 from multiprocessing import Queue
19 from multiprocessing import Process
20 import time
21 
22 
23 def producer(name, q):
24     for i in range(5):
25         time.sleep(0.5)
26         res = "%s生产的馒头%s" % (name, i)
27         q.put(res)
28         print(res)
29 
30 
31 def consumer(name, q):
32     while True:
33         time.sleep(0.2)
34         res = q.get()
35         if not res:
36             break
37         print("%s 吃了 %s" % (name, res))
38 
39 
40 if __name__ == "__main__":
41     q = Queue()
42     p_lis = []
43     for i in range(3):
44         p = Process(target=producer, args=("xg%s" % i, q))
45         p_lis.append(p)
46         p.start()
47     s = Process(target=consumer, args=("yy", q))
48     s.start()
49     for p in p_lis:
50         p.join()
51     q.put(None)  # 注意,有几个消费者就要发送几个None
52     print("over")
View Code

  6.JoinableQueue 的使用 可以用join方法的Queue

  JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
  q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
  q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止

 1 from multiprocessing import JoinableQueue
 2 from multiprocessing import Process
 3 import time
 4 
 5 
 6 def producer(name, q):
 7     for i in range(5):
 8         time.sleep(0.5)
 9         res = "%s生产的馒头%s" % (name, i)
10         q.put(res)
11         print(res)
12     q.join()  # 等待q结束(即q队列数据全部调用了q.task_done()方法,才往下继续运行代码)
13 
14 
15 def consumer(name, q):
16     while True:
17         time.sleep(0.2)
18         res = q.get()
19         print("%s 吃了 %s" % (name, res))
20         q.task_done()  # 每消费一个数据,调用一次task_done方法,为了给join方法用
21 
22 
23 if __name__ == "__main__":
24     q = JoinableQueue()
25     p_lis = []
26     for i in range(3):
27         p = Process(target=producer, args=("xg%s" % i, q))
28         p_lis.append(p)
29         p.start()
30     s = Process(target=consumer, args=("yy", q))
31     s.daemon = True  # 主线程运行完,就销毁掉生产者的进程
32     s.start()
33     for p in p_lis:
34         p.join()
35     print("over")  # 走到这一步说明,生产者已经运行完了,生产者运行完,代表队列里的数据都调用了task_done方法,也就是消费者运行也结束了
View Code
原文地址:https://www.cnblogs.com/znyyy/p/10174878.html