进程模块multiprocessing

进程模块multiprocessing

进程的创建 -用Process

  • 注册进程Precess类

      #导入Process类
      from multiprocessing import Process
      
      #创建一个函数
      def fun():pass	 
      #将这个函数的运行注册进一个新的进程中
      p = Process(target=fun)	#注册进程,传入的是地址
      p.start()	#开始这个进程
    

进程号的查看 -os.getpid() -os.getppid()

  • 查看进程的进程号os.getpid()

      import os
      from multiprocessing improt Process
      #创建一个函数,获取这个进程的进程号
      def func():
      	print('子进程:'os.getpid())
    
      p = Process(target=fun)	#注册进程
      p.start	#开始进程
      print('父进程: ', os.getpid())	#打印主进程的进程号
      
      #结果
      子进程: 1234
      父进程: 6789
    
  • 查看父进程os.getppid()

      import os
      from multiprocessing improt Process
      #创建一个函数,获取这个进程的进程号
      def func():
      	print('子进程的父进程:'os.getpiid())
    
      p = Process(target=fun)	#注册进程
      p.start	#开始进程
      print('父进程的父进程: ', os.getpiid())	#打印主进程的进程号
      
      #结果
      子进程的父进程: 6789	#当前进程 
      父进程的父进程: 1470	#pycharm运行的进程
    

给进程传入参数 -args=(tuple)

  • 新进程给函数传参数

      '''注册进程'''
      from multiprocessing import Process
      def fun(args):pass		#函数需要传入一个参数 
      
      p = Process(target=fun, args=(canshu1,))	#给args传入就行,但是传入的是一个元祖	
      	
      p.start()	#开始这个进程
    

进程的开始结束 -主进程和子进程的存活关系

  • 默认情况下

    • 个进程互不干扰,主进程的结束不影响子进程的结束

        import time
        from multiprocessing import Process
        def fun():
            time.sleep(3)
            print('子进程结束')
        if __name__ == '__main__':	#启动进程必须判断这个
        	p = Process(target=fun)
        	p.start()
        	
        	print('主进程结束')
        #结果
        主进程结束
        子进程结束
      
  • join()待子进程结束后程序继续进行

      import time
      from multiprocessing import Process
      def fun():
          print(1234)
          time.sleep(3)
      if __name__ == '__main__':    
      	p = Process(target=fun)
      	p.start()
      	p.join()	#这里是子进程的结束位置,父进程等待子进程结束后继续运行
      	print('您好')
      #结果
      1234
      您好	#停顿3秒后您好才被显示出来
    

开始多个子进程

  • 开始多个子进程

      from multiprocessing import Process
      def fun(num):
      	print('进程{}正在运行'.format(num))
      	
      if __name__ == '__main__':	
      	for i in range(3):
      		p = Process(target=fun, i)
      		p.start()
      #结果
      进程0正在运行
      进程1正在运行
      进程2正在运行
    
  • 等待全部子进程结束后运行下面的代码

      from multiprocessing import Process
      def fun(num):
      	print('进程{}正在运行'.format(num))
      	
      if __name__ == '__main__':
      	p_list = []
      	for i in range(3):
      		p = Process(target=fun, i)
      		p_list.append(p)
      		p.start()
      	[p.join() for p in p_list]	#关键之处
      	prnt('所有的子进程全部结束')
      #结果	#__结果的输出进程不一定是按照顺序输出的__
      进程2正在运行
      进程0正在运行
      进程1正在运行
      所有的子进程全部结束
    

利用类创建进程

	from multiprocessing import Process
	import os
	class MyProcess(Process):
		def __init__(self, arg1, arg2):
			super().__init__()	#必须继承父类的__init__()
			self.arg1 = arg1
			self.arg2 = arg2
		
	    def run(self):	#必须有一个run()方法
	        print(os.getpid())
	if __name__ == '__main__':
		p = MyProcess()
		p.start()		#调用run()方法

多进程之间的数据隔离问题

  • 父进程和子进程之间的数据是隔离开的

守护进程

  • 子进程-->守护进程 -设置daemon = True
    • 守护进程:主进程结束,守护进程强制被结束
    • 守护进程结束看主进程是否结束,不看其他子进程是否结束

进程类的其他方法及其属性

方法

  • p.is_alive() 检测子进程是否存在
  • p.terminate() 结束一个子进程,并不是立即结束,需要操作系统响应
  • p.start() 开始一个进程

属性

  • p.name 查看进程的名字
  • p.pid 查看进程号
  • p.daemon = Ture 将进程转换成守护进程

进程锁 - lock模块 -涉及数据安全问题

  • 给一段代码加锁之后只能被一个进程同时调用

      import lock
      lock = lock() 
      
      lock.acquire()	#钥匙
      '''被加锁的代码块'''
      lock.release()	#上交钥匙
    

进阶系列

进程的进阶

信号量 -multiprocessing.Semaphore()

  • 限制进程访问的代码块

  • 有多把钥匙的房间

      from multiprocessing import Semaphore
      from multiprocessing import Process
      from time import sleep
      
      def func(i, sem):
          sem.acquire()
          print('{}进去唱歌了'.format(i))
          sleep(1)
          print('{}唱歌结束出来了'.format(i))
          sem.release()
      
      if __name__ == '__main__':
          sem = Semaphore(4)
          for i in range(10):
              p = Process(target=func, args=(i, sem))
              p.start()
      #结果
      0进去唱歌了
      1进去唱歌了
      2进去唱歌了
      3进去唱歌了
      0唱歌结束出来了
      4进去唱歌了
      1唱歌结束出来了
      5进去唱歌了
      2唱歌结束出来了
      6进去唱歌了
      3唱歌结束出来了
      7进去唱歌了
      4唱歌结束出来了
      8进去唱歌了
      5唱歌结束出来了
      9进去唱歌了
      6唱歌结束出来了
      7唱歌结束出来了
      8唱歌结束出来了
      9唱歌结束出来了
    

事件 -multiprocessing.Event()

  • 一个信号可以使得所有的进程进入阻塞状态

  • 也可以控制所有的进程接触阻塞

  • 一个事件被创建出来默认是阻塞状态

      #事件创建默认是阻塞的
      from multiprocessing import Event
      
      event = Event()	#创建一个事件叫event
      print(event.is_set())	#检查事件的阻塞,False为阻塞,True为非阻塞状态
      event.wait()	#阻塞状态会等待,非阻塞状态会运行下面代码
      print('这句话不能被打印')
      #结果
      False
      ————————————————————————————————————————————————
      #下面是通过设置改变阻塞状态
    
    • .is_set()用来查看事件的阻塞状态
  • .set().is_set()设置成True,变成非阻塞状态

      #关闭事件的阻塞
      from multiprocessing import Event
      
      event = Event()
      event.set()
      event.wait()
      print(event.is_set())
      print('现在阻塞被关闭')
      #结果
      True
      现在阻塞被关闭
    
  • enent.clear() 再次打开阻塞

      #再次打开阻塞
      from multiprocessing import Event
      
      event = Event()
      event.set()
      event.wait()
      print('正常打印消息')
      
      event.clear()	#开启阻塞
      event.wait()
      print('这条消息将不被输出')
      #结果
      正常打印消息	
    

红绿灯案例

	from multiprocessing import Event, Process
	import time, random
	
	def light(e,):
	    while 1:
	        if e.is_set():
	            e.clear()  # 开启阻塞
	            print('33[31mO33[0m')  # 红灯
	        else:
	            e.set()
	            print('33[32mO33[0m')  # 绿灯
	
	        time.sleep(2)
	
	
	def car(e, i):
	    if not e.is_set():
	        print('{}正在等待'.format(i))
	        e.wait()
	    print('{}通过路口'.format(i))
	
	if __name__ == '__main__':
	
	    e = Event()
	    tra = Process(target=light, args=(e,))
	    tra.start()
	    i = 0
	    while 1:
	        p = Process(target=car, args=(e, i))
	        p.start()
	        time.sleep(random.random())
	        i += 1
    #结果
    绿灯
	0通过路口
	1通过路口
	2通过路口
	3通过路口
	4通过路口
	红灯
	5正在等待
	6正在等待
	7正在等待
	8正在等待
	绿灯
	5通过路口
	7通过路口
	8通过路口
	6通过路口
	9通过路口
	10通过路口
	11通过路口
	12通过路口
	红灯
	13正在等待
	14正在等待
	15正在等待
	16正在等待

进程之间的通信 -队列和管道

进程间的通信 -IPC

队列

  • 先进先出
  • 满了或者队列空了都会发生阻塞
    • from multiprocessing import Queue 导入队列

    • q = Queue() 创建队列对象

    • .put() 放数据,满了就阻塞

    • .get() 取数据,取空就阻塞

    • .full() 判断队列是否满了

    • .enpty() 判断队列是否为空

    • .put_nowait() 向队列放元素,队列满了就报错

    • .get_nowait() 取元素,取空就报错

        from multiprocessing import Queue
        
        q = Queue(5)	#队列允许放置5个元素,可选参数
        q.put(1)
        q.put(2)
        q.put(3)
        q.put(4)
        q.put(5)
        print(q.pull())	#判断队列是否满了
        print(q.empty())	#判断队列是否空了
        for i in range(5):
        	print(q.get())
      

利用队列完成两个进程的数据交换 -IPC

from multiprocessing import Queue, Process

def put_thing(q):
    for i in range(5):
        q.put(i)

def get_thing(q): 
    print(q.get())

if __name__ == '__main__':

    q = Queue()
    p_put = Process(target=put_thing, args=(q,))
    p_put.start()

    for i in range(5):
        p_get = Process(target=get_thing, args=(q,))
        p_get.start()

队列模型 -生产者消费者模型

模型一 Queue模块

  • 需要向消费者告知结束关键字

      from multiprocessing import Process, Queue
      import time
      import random
      
      def producer(q, name, food):
          for i in range(3):
              time.sleep(random.random())
              msg = '{}生产了第{}泡{}'.format(name, i, food)
              print(msg)
              q.put(i)
      
      def customer(q, name, food):
          while 1:
              i = q.get()
              print(i)
              if i == 'nihao':
                  print('getNoneR')
                  break
      
              time.sleep(random.random())
              print('{}消费了第{}泡{}'.format(name, i, food))
      
      if __name__ == '__main__':
          q = Queue()
          pname = 'Alex'
          cname = '你'
          food = '狗屎'
          produ_proce = Process(target=producer, args=(q, pname, food))
          cust_proce = Process(target=customer, args=(q, cname, food))
      
          produ_proce.start()
          cust_proce.start()
          produ_proce.join()
          q.put('nihao')  #生产者消费结束向队列添加一个结束关键字 
    

模型二 -改进版JoinableQueue()

  • 改进版结合守护进程使用
  • JoinableQueue()Queue()的区别
    • JoinableQueue()加入了两个方法
      • .task_done() 消费者每消费一次就要运行一次,用来减去模块中的计数
      • .join() 等待队列中元素被取完,否则阻塞
    • from multiprocessing import Process, JoinableQueue
      import time
      import random
      	
      def producer(q, name, food):
          for i in range(10):
              time.sleep(random.random())
              msg = '33[31m{}生产了第{}泡{}33[0m'.format(name, i, food)
              print(msg)
              q.put(i)
          q.join()	#判断队列是否被取完,取完后才执行,否则阻塞
      	
      def customer(q, name, food):
          while 1:
              i = q.get()
              time.sleep(random.random())
              print('{}消费了第{}泡{}'.format(name, i, food))
              q.task_done()	#每取一个元素,让队列的计数器减1
      	
      if __name__ == '__main__':
          q = JoinableQueue()
          pname = 'Alex'
          cname = '你'
          food = '狗屎'
          produ_proce = Process(target=producer, args=(q, pname, food))
          cust_proce = Process(target=customer, args=(q, cname, food))
          produ_proce.daemon = True	#将子进程设置成守护进程
          cust_proce.daemon = True	#将子进程设置成守护进程
          
          produ_proce.start()
          cust_proce.start()
      	
          produ_proce.join()
原文地址:https://www.cnblogs.com/liliudong/p/9732488.html