桐花万里python路-高级篇-并发编程-02-进程

  • 主要概念
    • 正在进行的过程或任务
      • 单核+多道,实现多进程并发执行
    • 并发与并行
      • 并行:同时运行,只有具备多个cpu才能实现并行
      • 并发:伪并行,看起来是同时运行,单个cpu+多道技术。也属于并发
    • 同步与异步
      • 针对的是函数/任务的调用方式
      • 同步:就是当一个进程发起一个函数(任务)调用的时候,一直等到函数(任务)完成,而进程继续处于激活状态
      • 异步:是当一个进程发起一个函数(任务)调用的时候,不会等函数返回,而是继续往下执行当,函数返回的时候通过状态、通知、事件等方式通知进程任务完成
    • 阻塞与非阻塞
      • 针对的是进程或线程
      • 阻塞:是当请求不能满足的时候就将进程挂起
      • 非阻塞:则不会挂起(阻塞)当前进程
    • 层次结构
      • Windows 所有进程地位相同,没有层次概念
      • Unix/Linux 以init为根的树形结构
    • 状态
      • 运行
      • 阻塞
      • 就绪
  • 进程操作
    • 系统初始化,进程中开启子进程
      • subprocess()
      • os.fork()
    • 开启方式
      • from multiprocessing import Processing
      • p = Processing(target=func,args=(a,)) ; p.start()
      • 继承Process类,实现run()方法
      • Windows下需要在 if  __name__ == '__main__'下
      • 查看进程ID
        • os.getpid()
        • 父进程ID:os.getppid()
    • 进程关系
      • 串行 join
        • 主进程等待子进程结束 p.join()
      • 守护进程daemon
        • 主进程不等待子进程 p.daemon = True
        • 守护进程内无法再开启子进程
      • 僵尸进程
        • 有害
        • 子进程退出,而父进程并没有调用wait或waitpid获取子进程的状态信息,那么子进程的进程描述符仍然保存在系统中
        • p.is_alive() ; p.terminate()
      • 孤儿进程
        • 无害
        • 父进程退出,而它的一个或多个子进程还在运行
        • 被init进程(进程号为1)所收养,并由init进程对它们完成状态收集工作
  • 进程同步
    • 进程之间数据不共享
    • 互斥(同步)锁
      • 由并发变成了串行,牺牲了运行效率,但避免了竞争
      • from multiprocessing import Process,Lock
      • lock=Lock();lock.acquire();lock.release()
      • 效率低(共享数据基于文件,而文件是硬盘上的数据)
      • 需要自己加锁处理
    • 进程通信IPC
      • 队列
        • 底层是管道和锁定
          • Queue 先进先出
        • from multiprocessing import Process,Queue
          • q.put(block=True,timeout=3)
          • q.get(block=True,timeout=3)
        • 生产者消费者模型
          • 通过容器,解决生产数据和处理数据强耦合问题,平衡生产者和消费者的处理能力
          • 通过阻塞队列进行通讯,生产者put数据,消费者get数据
          • 发送结束信号 JoinableQueue
            • 允许数据的使用者通知生产者数据已处理完
            • 通知进程是使用共享的信号和条件变量来实现的
              from multiprocessing import Process,Queue
              import time,random,os
              def consumer(q):
                  while True:
                      res=q.get()
                      if res is None:break #收到结束信号则结束
                      time.sleep(random.randint(1,3))
                      print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))
              
              def producer(name,q):
                  for i in range(2):
                      time.sleep(random.randint(1,3))
                      res='%s%s' %(name,i)
                      q.put(res)
                      print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res))
              
              
              
              if __name__ == '__main__':
                  q=Queue()
                  #生产者们:即厨师们
                  p1=Process(target=producer,args=('包子',q))
                  p2=Process(target=producer,args=('骨头',q))
                  p3=Process(target=producer,args=('泔水',q))
              
                  #消费者们:即吃货们
                  c1=Process(target=consumer,args=(q,))
                  c2=Process(target=consumer,args=(q,))
              
                  #开始
                  p1.start()
                  p2.start()
                  p3.start()
                  c1.start()
              
                  p1.join() #必须保证生产者全部生产完毕,才应该发送结束信号
                  p2.join()
                  p3.join()
                  q.put(None) #有几个消费者就应该发送几次结束信号None
                  q.put(None) #发送结束信号
                  print('')
              View Code
      • 管道
        • PIPE 无竞争,无锁
        • 用于双向通信
      • 共享数据
        • from multiprocessing import Manager
          from multiprocessing import Manager,Process,Lock
          import os
          def work(d,lock):
              with lock: #不加锁而操作共享的数据,肯定会出现数据错乱
                  d['count']-=1
          
          if __name__ == '__main__':
              lock=Lock()
              with Manager() as m:
                  dic=m.dict({'count':100})
                  p_l=[]
                  for i in range(100):
                      p=Process(target=work,args=(dic,lock))
                      p_l.append(p)
                      p.start()
                  for p in p_l:
                      p.join()
                  print(dic)
                  #{'count': 94}
  • 进程池
    • 实现并发
      • 不可能无限开启进程,通常有几个核就开几个进程
      • 不会产生新的进程
    • 创建进程池
      • Pool([numprocess  [,initializer [, initargs]]])
        • initializer:是每个工作进程启动时要执行的可调用对象,默认为None
        • numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值
        • initargs:是要传给initializer的参数组
      • p.apply(func [, args [, kwargs]])
        • 在一个池工作进程中执行func(*args,**kwargs),然后返回结果
      • p.apply_async(func [, args [, kwargs]])异步
        • 结果是AsyncResult类的实例,callback是可调用对象,接收输入参数
        • 当func的结果变为可用时,将立即传递给callback
        • callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果
        • p.apply_async().get()
      • p.close() 关闭进程池
      • p.join() 等待所有工作进程退出。此方法只能在close()或teminate()之后调用
      • 回调方法
        • 进程池中任何一个任务一旦处理完了,就立即告知主进程调用一个函数去处理该结果
        • 新的进程
    • concurrent.futures  
      • ProcessPoolExecutor
      • ThreadPoolExecutor
原文地址:https://www.cnblogs.com/zhujingxiu/p/8579340.html