Python之并发编程(二)进程

进程

  1. multiprocessing模块介绍

    1. python中的多线程无法利用CPU资源,在python中大部分情况使用多进程。python中提供了非常好的多进程包multiprocessing。
    2. multiprocessing模块用来开启子进程,并在子进程中执行功能(函数),该模块与多线程模块threading的编程接口类似。
    3. multiprocessing的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。
  2. Process类的介绍

    1. 创建进程的类:

      Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)
      
      强调:
      1. 需要使用关键字的方式来指定参数
      2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
      
    2. 参数介绍:

      group参数未使用,值始终为None
      
      target表示调用对象,即子进程要执行的任务
      
      args表示调用对象的位置参数元组,args=(1,2,'egon',)
      
      kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}
      
      name为子进程的名称
      
    3. 方法介绍:

      p.start():启动进程,并调用该子进程中的p.run() 
      p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法  
      
      p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。
      如果p还保存了一个锁那么也将不会被释放,进而导致死锁
      p.is_alive():如果p仍然运行,返回True
      
      p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,
      需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
      
    4. 属性介绍:

      p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置
      
      p.name:进程的名称
      
      p.pid:进程的pid
      
      p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)
      
      p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)
      
  3. Process类的使用

    1. 一定要把开进程的代码写在if name=='main':下面

      开一个进程和主进程是并发的关系,我start一下就是先告诉操作系统我要开一个进程
      ,然而它不会等待,他会去执行下面的代码,完了他吧进程开始后,就开始执行了

      strat():方法的功能

        1.开启进程
        2.执行功能

  4. 开启进程的两种方式:

    1. 第一种:

      from multiprocessing import Process
      import time
      import random
      def piao(name):
          print('%s is piaoing'%name)
          time.sleep(random.randint(1,3))
          print('%s is piao end'%name)
      if __name__ =='__main__':
          p1 = Process(target=piao,kwargs={'name':'alex'})# 创建一个进程对象
          p2 = Process(target=piao,kwargs={'name':'alex'})
          p3 = Process(target=piao,kwargs={'name':'alex'})
          p1.start() # 只是向操作系统发出一个开辟子进程的信号,然后就执行下一行了.
          # 这个信号操作系统接收到之后,会从内存中开辟一个子进程空间,然后在将主进程所有数据copy加载到子进程,然后在调用cpu去执行.
          # 开辟子进程开销是很大的
          p2.start()
          p3.start()
          print('主进程')
      
      #第一种方式
      
    2. 第二种:

      from multiprocessing import Process
      import time
      import random
      import os
      class Piao(Process):
          def __init__(self,name):
              super().__init__() #必须继承父类的一些属性
              self.name = name
          def run(self):  #必须得实现一个run方法
              print(os.getppid(),os.getpid())
              print('%s is piaoing'%self.name)
              time.sleep(random.randint(1,3))
              print('%s is piao end'%self.name)
      if __name__ =='__main__':
          p1 = Piao('alex')
          p2 = Piao('wupeiqi')
          p3 = Piao('yuanhao')
          p1.start()
          p2.start()
          p3.start()
          print('主进程',os.getpid())
      
      #第二种方式
      
  5. 验证进程之间的空间隔离:

    # 接下来我们验证一下进程之间的互相隔离。
    
    # 在一个进程中
    x = 1000
    
     def task():
         global x
         x = 2
    
     task()
     print(x)
    # 在不同的进程中:
     from multiprocessing import Process
     import time
     x = 1000
    
     def task():
         global x
         x = 2
    
     if __name__ == '__main__':
         p = Process(target=task)
         p.start()
         time.sleep(3)
         print(x)
    
    #代码验证
    
  6. 进程对象的join方法:

    from multiprocessing import Process
    import time
    
    # 父进程等待子进程结束之后在执行
    # 方法一 加sleep 不可取!
    
     def task(n):
         time.sleep(3)
         print('子进程结束....')
    
     if __name__ == '__main__':
         p = Process(target=task,args=('太白金星',))
         p.start()
         time.sleep(5)
         print('主进程开始运行....')
    
    # 这样虽然达到了目的,
    # 1,但是你在程序中故意加sleep极大影响程序的效率。
    # 2,sleep(3)只是虚拟子进程运行的时间,子进程运行完毕的时间是不固定的。
    
    
    # 方法二: join
    
     from multiprocessing import Process
     import time
    
    
     def task(n):
         time.sleep(3)
         print('子进程结束....')
    
    
     if __name__ == '__main__':
         p = Process(target=task,args=('太白金星',))
         p.start()
         p.join()  # 等待p这个子进程运行结束之后,在执行下面的代码(主进程).
         print('主进程开始运行....')
    
    # 接下来我要开启十个子进程,先看看效果
    
     from multiprocessing import Process
     import time
    
     def task(n):
         print('%s is running' %n)
    
     if __name__ == '__main__':
         for i in range(1, 11):
             p = Process(target=task,args=(i,))
             p.start()
    #         '''
    #         我这里是不是运行十个子进程之后,才会运行主进程?当然不会!!!
    #         1,p.start()只是向操作系统发送一个请求而已,剩下的操作系统在内存开启进程空间,运行进程程序不一定是马上执行。
    #         2,开启进程的开销是比较大的。
    #         '''
         print('主进程开始运行....')
        
    # 那么有人说,老师我对这个不理解,我给你拆解开来。
    
     from multiprocessing import Process
     import time
    
     def task(n):
         print('%s is running' %n)
    
     if __name__ == '__main__':
         p1 = Process(target=task,args=(1,))
         p2 = Process(target=task,args=(2,))
         p3 = Process(target=task,args=(3,))
         p4 = Process(target=task,args=(4,))
         p5 = Process(target=task,args=(5,))
    
         p1.start()
         p2.start()
         p3.start()
         p4.start()
         p5.start()
    
         print('主进程开始运行....')
    
    # 接下来 实现起多子个进程,然后等待这些子进程都结束之后,在开启主进程。
    
     from multiprocessing import Process
     import time
    
     def task(n):
         time.sleep(3)
         print('%s is running' %n)
    
     if __name__ == '__main__':
         start_time = time.time()
         p1 = Process(target=task,args=(1,))
         p2 = Process(target=task,args=(2,))
         p3 = Process(target=task,args=(3,))
    #     # 几乎同一个时刻发送三个请求
         p1.start()
         p2.start()
         p3.start()
    #     # 对着三个自己成使用三个join
    
         p1.join()
         p2.join()
         p3.join()
    
         print(time.time() - start_time,'主进程开始运行....')
         # 3s 多一点点这是来回切换的所用时间。
    
    # 那么在进行举例:
    
     from multiprocessing import Process
     import time
    
     def task(n):
         time.sleep(n)
         print('%s is running' %n)
    
     if __name__ == '__main__':
         start_time = time.time()
         p1 = Process(target=task,args=(1,))
         p2 = Process(target=task,args=(2,))
         p3 = Process(target=task,args=(3,))
         # 几乎同一个时刻发送三个请求
         p1.start()
         p2.start()
         p3.start()
         # 对着三个自己成使用三个join
    
         p1.join()  # 1s
         p2.join()  # 2s
         p3.join()  # 3s
    
         print(time.time() - start_time,'主进程开始运行....')
        # 3s 多一点点这是来回切换的所用时间。
        
    # 利用for循环精简上面的示例:
    
    
     from multiprocessing import Process
     import time
    
     def task(n):
         time.sleep(1)
         print('%s is running' %n)
    
     if __name__ == '__main__':
         start_time = time.time()
          for i in range(1,4):
              p = Process(target=task,args=(i,))
              p.start()
              p.join()
    
         p1 = Process(target=task,args=(1,))
         p2 = Process(target=task,args=(2,))
         p3 = Process(target=task,args=(3,))
    #     # 几乎同一个时刻发送三个请求
         p1.start()
         p1.join()
         p2.start()
         p2.join()
         p3.start()
         p3.join()
         # 上面的代码,p1.join()他的作用:你的主进程代码必须等我的p1子进程执行完毕之后,在执行
    #     # p2.start()这个命令是主进程的代码。
    #     # 而 如果你这样写:
    #     '''
    #     p1.join()
    #     p2.join()
     #    p3.join()
    #     '''
    
         print(time.time() - start_time,'主进程开始运行....')
    
    # 所以你上面的代码应该怎么写?
    
    
     from multiprocessing import Process
     import time
    
     def task(n):
         time.sleep(3)
         print('%s is running' %n)
    
     if __name__ == '__main__':
         p_l = []
         start_time = time.time()
         for i in range(1,4):
             p = Process(target=task,args=(i,))
             p.start()
             p_l.append(p)
         # 对着三个自己成使用三个join
         for i in p_l:
             i.join()
         print(time.time() - start_time,'主进程开始运行....')
    
  7. 进程对象的其他属性(了解):

    # from multiprocessing import Process
    # import time
    # import os
    #
    # def task(n):
    #     time.sleep(3)
    #     print('%s is running' %n,os.getpid(),os.getppid())
    #
    # if __name__ == '__main__':
    #     p1 = Process(target=task,args=(1,),name = '任务1')
    #     # print(p1.name) # 给子进程起名字
    #     # for i in range(3):
    #     #     p = Process(target=task, args=(1,))
    #     #     print(p.name)  # 给子进程起名字
    #     p1.start()
    #     # p1.terminate()
    #     # time.sleep(2)  # 睡一会,他就将我的子进程杀死了。
    #     # print(p1.is_alive())  # False
    #     print(p1.pid)
    #     # print('主')
    #     print(os.getpid())
    
  8. 守护进程

    1. 守护进程的概念:

      # 守护进程:
      # 古时候 太监守护这个皇帝,如果皇帝驾崩了,太监直接也就死了.
      # 子进程守护着主进程,只要主进程结束,子进程跟着就结束,
      
    2. 代码示例:

      from multiprocessing import Process
      import time
      
      def task(name):
          print(f'{name} is running')
          time.sleep(2)
          print(f'{name} is gone')
      
      if __name__ == '__main__':
          p = Process(target=task, args=('立业',))
          p.daemon = True #将子进程p设置成守护进程,只要主进程结束,守护进程也马上结束
          p.start()
          time.sleep(1)
          print('==主进程执行')
      #打印结果
      # 立业 is running  #主进程结束,子进程也随之结束,
      # ==主进程执行
      
  9. 僵尸进程与孤儿进程:

    1. 僵尸进程和孤儿进程只会出现在Unix,linux,macos上面,windows上不会出现

    2. 进程的概念:

      #1:主进程需要等待子进程结束之后,主进程才结束
      	主进程时刻监视子进程的运行状态,当子进程结束之后,一段时间之内,将子进程进行回收
      
    3. 为什么主进程不在子进程结束后马上对其进行回收

      #1:主进程和子进程时异步关系,主进程无法马上捕获子进程什么时候结束
      #2:如果子进程结束后马上在内存中释放资源,主进程就没有办法监视子进程的状态了
      
    4. Unix针对上面的问题,提供了一个机制

      #所有的子进程结束之后,立马会释放文件的操作链接,内存中的大部分数据,但会保留一些内容:进程号,结束时间,运行状态,等待主进程监测,回收
      
    5. 僵尸进程:

      #所有的子进程在结束之后,在被主进程回收之前都会进入僵尸状态
      
    6. 僵尸进程是有害的:

      #如果父进程不对僵尸进程进行回收,产生大量的僵尸进程,这样就会占用内存,占用进程pid号
      
    7. 僵尸进程如何解决:

      #父进程产生了大量子进程,但是不回收,这样就会形成大量的僵尸进程,解决方式就是直接杀死父进程,将所有的僵尸进程变成孤儿进程进程,由init进行回收
      
    8. 孤儿进程:

      #父进程由于某种原因结束了,但是你的子进程还在运行,这样你的这些子进程就变成了孤儿进程,你的父进程如果结束了,你的所有的孤儿进程,就会被init进程回收,init就变成了父进程,对孤儿进程进行回收
      
  10. 互斥锁:

    1. 什么是互斥锁:

      #互斥锁就是在保证子进程串行的同时,也保证了子进程执行顺序的随机性,以及数据的安全性
      
      使用互斥锁的注意点:
          使用互斥锁不能连续锁,不然会造成阻塞和死锁
      
    2. 代码示例:

      from multiprocessing import Process
      from multiprocessing import Lock
      import time
      import os
      import random
      import sys
      def task1(lock):
          lock.acquire()
          print(f'{os.getpid()}开始打印了')
          time.sleep(random.randint(1,3))
          print(f'{os.getpid()}打印结束了')
          lock.release()
      
      def task2(lock):
          lock.acquire()
          print(f'{os.getpid()}开始打印了')
          time.sleep(random.randint(1, 3))
          print(f'{os.getpid()}打印结束了')
          lock.release()
      
      def task3(lock):
          lock.acquire()
          print(f'{os.getpid()}开始打印了')
          time.sleep(random.randint(1, 3))
          print(f'{os.getpid()}打印结束了')
          lock.release()
      
      if __name__ == '__main__':
          lock=Lock()
          for i in  ['task1','task2','task3']:
              p=Process(target=getattr(sys.modules[__name__],i),args=(lock,))
              p.start()
      
    3. 示意图:

    4. join和Lock的区别:

      #共同点:
      	都可以把并发改成串行,保证了执行的顺序
      #不同点:
      	join是人为的设置顺序,Lock锁时让其争抢顺序,保证了公平性
      
    5. 基于文件的进程之间的通信:

      #进程在内存级别是隔离的,但是文件在磁盘上
      #使用lock锁实现文件上的通讯
      from multiprocessing import Process
      from multiprocessing import Lock
      import json
      import time
      import os
      import random
      # 当多个进程共抢一个数据时,如果要保证数据的安全,必须要串行.
      # 要想让购买环节进行串行,我们必须要加锁处理.
      
      def search():
          time.sleep(random.randint(1,3))  # 模拟网络延迟(查询环节)
          with open('ticket.json',encoding='utf-8') as f1:
              dic = json.load(f1)
              print(f'{os.getpid()} 查看了票数,剩余{dic["count"]}')
      def paid():
          with open('ticket.json', encoding='utf-8') as f1:
              dic = json.load(f1)
          if dic['count'] > 0:
              dic['count'] -= 1
              time.sleep(random.randint(1,3))  # 模拟网络延迟(购买环节)
              with open('ticket.json', encoding='utf-8',mode='w') as f1:
                  json.dump(dic,f1)
              print(f'{os.getpid()} 购买成功')
      def task(lock):
          search()
          lock.acquire()
          paid()
          lock.release()
      
      if __name__ == '__main__':
          mutex = Lock()
          for i in range(6):
              p = Process(target=task,args=(mutex,))
              p.start()
      #当很多进程共强一个资源(数据)时, 你要保证顺序(数据的安全),一定要串行.
      #互斥锁: 可以公平性的保证顺序以及数据的安全.
      
    6. 基于文件的进程通信的特点

      #1:效率低
      #2:自己加锁麻烦而且容易出现死锁
      
  11. 队列

    1. 队列的特点:

      #1:把队列当成一个容器,这个容器可以承载一些数据
      #2:队列的特性是数据保持先进先出原则,FIFO
      
    2. 队列示例代码:

      from multiprocessing import Queue
      q = Queue(3)
      
      q.put(1)
      q.put('alex')
      q.put([1,2,3])
      q.put(5555)  # 当队列满了时,在进程put数据就会阻塞.
      
      print(q.get())
      print(q.get())
      print(q.get())
      print(q.get())  # 当数据取完时,在进程get数据也会出现阻塞,直到某一个进程put数据.
      
      
      from multiprocessing import Queue
      q = Queue(3)  # maxsize
      
      q.put(1)
      q.put('alex')
      q.put([1,2,3])
      # q.put(5555,block=False)
      #
      print(q.get())
      print(q.get())
      print(q.get())
      print(q.get(timeout=3))  # 阻塞3秒,3秒之后还阻塞直接报错.
      # print(q.get(block=False))
      
      # block=False 只要遇到阻塞就会报错.
      
    3. 使用基于文件和队列的进程通信方式的优缺点:

      #基于文件的进程通讯方式,在写入文件的时候耗费性能,效率低,在枷锁的时候,又可能出现锁死或递归锁的情况,相比之下队列的效率更高,也不会出现锁死或者递归锁的情况
      
原文地址:https://www.cnblogs.com/zhangdadayou/p/11431904.html