并发编程

一, multiprocessing模块

  • python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程.Python提供了multiprocessing.
  • multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似.
  • multiprocessing模块的功能众多: 支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件.
  • 需要再次强调的一点是: 与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内.

二, Process类的介绍

  • 创建进程的类:

    Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)
    
    强调:
    1. 需要使用关键字的方式来指定参数
    2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
    
  • 参数介绍:

    • group参数未使用,值始终为None
    • target表示调用对象,即子进程要执行的任务
    • args表示调用对象的位置参数元组
    • kwargs表示调用对象的字典
    • name为子进程的名称
  • 方法介绍:

    • p.start(): 启动进程,并调用该子进程的p.run()
    • p.run(): 进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法
    • p.terminate(): 强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况.如果p还保存了一个锁那么也将不会被释放,进而导致死锁
    • p.is_alive(): 如果p仍然运行,返回True
    • p.join([timeout]): 主线程等待p终止(强调: 是主线程处于等的状态,而p是处于运行的状态).timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程.阻塞
  • 属性介绍:

    • p.daemon: 默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置

    • p.name: 进程的名称

    • p.pid: 进程的pid

      • 进程在内存中开启多个,操作系统如何区分这些进程,每个进程都有一个唯一标识

        # 在终端查看进程的pid  tasklist(查看现阶段运行的所有进程)
        # 一个程序可以有多个进程
        
        # 在终端查看指定的进程的pid  tasklist| findstr pycharm
        
        # 通过代码查看pid
        import os
        print(os.getpid())  # 当前进程id
        print(os.getppid())  # 查看父进程id
        
    • p.ppid: 父进程的pid

    • p.exitcode: 进程在运行时为None,如果为–N,表示被信号N结束

    • p.authkey: 进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串.这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功.

三, Process类的使用

  1. 创建开启子进程的两种方式

    # 开启进程的2种方式
    # 第一种方式
    from multiprocessing import Process
    import time
    def task(name):
        print(f'{name} is running')
        time.sleep(9)
        print(f'{name} is done')
    
    if __name__ == '__main__':  # windows必须在main下开启多进程
        p = Process(target=task, args=('子进程',))    # args一定是一个元组
        p.start()   # 通知操作系统在内存种开辟一个空间,将p这个进程放进去,让cpu执行
        print('===>主进程')
    ----------------------------------------------------
    # 第二种方式
    from multiprocessing import Process
    import time
    class MyProcess(Process):
        def __init__(self, name):
            super().__init__()  # 必须继承父类的init
            self.name = name
    
        def run(self):  # 必须是run
            print(f'{self.name} is running')
            time.sleep(9)
            print(f'{self.name} is done')
            
    if __name__ == '__main__':
        p = MyProcess('子进程')
        p.start()
        print('===>主进程')
    
  2. 验证进程之间的空间隔离

    from multiprocessing import Process
    import time
    x = 1000
    def task():
        global x
        x = 2
    
    if __name__ == '__main__':
        p1 = Process(target=task)
        p1.start()
        time.sleep(3)
        print('主进程:', x)
    # 验证了数据隔离
    # 只有数字-5~256,True,False在不同进程之间驻留,沿用同一个内存地址
    
  3. 进程对象的join方法

    from multiprocessing import Process
    import time
    def task():
        print('子进程开始')
        time.sleep(3)
        print('子进程结束')
    
    if __name__ == '__main__':
        p1 = Process(target=task)
        p1.start()
        time.sleep(4)
        print(' in 主进程')
    
    # 上面的版本虽然达到了目的,但是真正的生产环境中,子进程结束的时间不定,需要用到join
    -------------------------------------------------------
    from multiprocessing import Process
    import time
    def task():
        print('子进程开始')
        time.sleep(3)
        print('子进程结束')
    
    if __name__ == '__main__':
        p1 = Process(target=task)
        p1.start()
        p1.join()  # 告知主进程,p1进程结束之后,主进程再运行
        print(' in 主进程')
    -------------------------------------------------------
    # 开启多个子进程去验证
    from multiprocessing import Process
    import time
    def task(n):
        print(f'{n}子进程开始')
        time.sleep(n)
        print(f'{n}子进程结束')
    
    if __name__ == '__main__':
        time_start = time.time()
        p1 = Process(target=task, args=(1,))
        p2 = Process(target=task, args=(2,))
        p3 = Process(target=task, args=(3,))
        p1.start()
        p2.start()
        p3.start()
        p1.join()
        p2.join()
        p3.join()
        print(f'主进程在{time.time() - time_start}后执行')
    # 验证了如此写join并不是串行,主进程会等最长的结束再执行
    # p1,p2,p3谁先运行并不确定
    ------------------------------------------------------
    # 下面的结果是什么? 为什么?
    from multiprocessing import Process
    import time
    def task(n):
        print(f'{n}子进程开始')
        time.sleep(n)
        print(f'{n}子进程结束')
    
    if __name__ == '__main__':
        time_start = time.time()
        p1 = Process(target=task, args=(1,))
        p2 = Process(target=task, args=(2,))
        p3 = Process(target=task, args=(3,))
        p1.start()
        p1.join()
        print(111)
        p2.start()
        p2.join()
        print(222)
        p3.start()
        p3.join()
        print(333)
        print(f'主进程在{time.time() - time_start}后执行')
    # p1,p2,p3是串行
    # join是阻塞,但是只会阻塞当前主进程,不会阻塞子进程的运行
    ------------------------------------------------------from multiprocessing import Process
    import time
    def task(n):
        print(f'{n}子进程开始')
        time.sleep(n)
        print(f'{n}子进程结束')
    
    if __name__ == '__main__':
        time_start = time.time()
        lst = []
        for el in range(1, 4):
            p = Process(target=task, args=(el,))
            lst.append(p)
            p.start()
        for p in lst:
            p.join()
        print(f'主进程在{time.time() - time_start}后执行')
    
  4. 进程对象的其他属性

    from multiprocessing import Process
    import time
    def task():
        print('子进程开始')
        time.sleep(3)
        print('子进程结束')
    
    if __name__ == '__main__':
        p1 = Process(target=task, name='任务1')   # name给进程对象设置name属性
        p1.start()
        print(p1.pid)  # 获取进程的pid号
        print(p1.name)
        p1.terminate()  # 终止(杀死)子进程,强制结束
        # terminate 与 start一样的工作原理:
        # 都是通知操作系统终止或开启一个子进程,内存中终止或开启,
        # 这是需要耗费时间的
        time.sleep(1)
        print(p1.is_alive())   # 判断子进程是否存活
        # is_alive 只是查看内存中子进程是否在运行
        print('In 主进程')
    
  5. 僵尸进程和孤儿进程

    • 只存在于UNIX系统中,windows中没有此概念

    • 产生背景:

      由于子进程的结束和父进程的运行是一个异步过程,即父进程永远无法预测子进程到底什么时候结束.那么会不会因为父进程太忙来不及wait子进程,或者说不知道子进程什么时候结束,而丢失子进程结束时的状态信息呢?不会.因为UNⅨ提供了一种机制可以保证只要父进程想知道子进程结束时的状态信息,就可以得到.这种机制就是: 在每个进程退出的时候,内核释放该进程所有的资源,包括打开的文件,占用的内存等.但是仍然为其保留一定的信息(包括进程号the process ID,退出状态the termination status of the process,运行时间the amount of CPU time taken by the process等).直到父进程通过wait/waitpid来取时才释放.但这样就导致了问题,如果进程不调用wait/waitpid的话,那么保留的那段信息就不会释放,其进程号就会一直被占用,但是系统所能使用的进程号是有限的,如果大量的产生僵尸进程,将因为没有可用的进程号而导致系统不能产生新的进程.此即为僵尸进程的危害,应当避免.

    • 僵尸进程:

      一个进程使用fork创建子进程,如果子进程退出,而父进程并没有调用wait()或waitpid()获取子进程的状态信息,那么子进程的进程描述符仍然保存在系统中.这种进程称之为僵死进程

    • 任何一个子进程(init除外)在exit()之后,并非马上就消失掉,而是留下一个称为僵尸进程(Zombie)的数据结构,等待父进程处理.这是每个子进程在结束时都要经过的阶段.如果子进程在exit()之后,父进程没有来得及处理,这时用ps命令就能看到子进程的状态是“Z”.如果父进程能及时 处理,可能用ps命令就来不及看到子进程的僵尸状态,但这并不等于子进程不经过僵尸状态.如果父进程在子进程结束之前退出,则子进程将由init接管.init将会以父进程的身份对僵尸状态的子进程进行处理.

    • 解决办法: 子进程结束后向父进程发送SIGCHILD信号,在信号处理函数中wait

    • 孤儿进程:

      一个父进程退出,而它的一个或多个子进程还在运行,那么那些子进程将成为孤儿进程.孤儿进程将被init进程(进程号为1)所收养,并由init进程对它们完成状态收集工作

    • 孤儿进程是无害的,因为一段时间后init会在他们运行结束后回收他们

  6. 守护进程

    • 守护进程会在主进程代码执行结束后就终止
    • 守护进程内无法再开启子进程
    # 子进程对父进程可以进行守护
    from multiprocessing import Process
    import time
    import os
    def task():
        print(f'子进程开始了:{os.getpid()}')
        time.sleep(50)
        print('子进程结束')
    
    if __name__ == '__main__':
        p1 = Process(target=task)
        p1.daemon = True  # 将p1子进程设置成守护进程,守护主进程
        # 只要主进程结束,子进程无论执行到哪,都会马上结束
        # 一定要在start之前
        p1.start()
        print(f'主进程开始了:{os.getpid()}')
    

四, 互斥锁

  • 进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理

  • 并发运行,效率高,但竞争同一打印终端,带来了打印错乱

    from multiprocessing import Process
    import time
    import random
    def task1():
        print('task1开始打印')
        time.sleep(random.randint(1, 3))
        print('task1打印完成')
    
    def task2():
        print('task2开始打印')
        time.sleep(random.randint(1, 3))
        print('task2打印完成')
    
    def task3():
        print('task3开始打印')
        time.sleep(random.randint(1, 3))
        print('task3打印完成')
    
    if __name__ == '__main__':
        p1 = Process(target=task1)
        p2 = Process(target=task2)
        p3 = Process(target=task3)
        p1.start()
        p2.start()
        p3.start()
    
    # 多个进程共抢一个资源,你要是做到结果第一位,效率第二位
    # 此时应该牺牲效率,保求结果,串行.
    
  • 串行: 保证了顺序,但是没有实现公平

    # 版本二,串行
    from multiprocessing import Process
    import time
    import random
    def task1():
        print('task1开始打印')
        time.sleep(random.randint(1, 3))
        print('task1打印完成')
    
    def task2():
        print('task2开始打印')
        time.sleep(random.randint(1, 3))
        print('task2打印完成')
    
    def task3():
        print('task3开始打印')
        time.sleep(random.randint(1, 3))
        print('task3打印完成')
    
    if __name__ == '__main__':
        p1 = Process(target=task1)
        p2 = Process(target=task2)
        p3 = Process(target=task3)
        p1.start()
        p1.join()
        p2.start()
        p2.join()
        p3.start()
        p3.join()
    # 虽然说上面的版本完成了串行结果,保证了顺序,但是没有实现公平
    # 问题: 顺序事先设定,我们要做到他们公平的去抢占打印机资源,谁先抢到,先执行谁
    
  • 加锁: 由并发变成了串行,牺牲了运行效率,但避免了竞争

    # 版本三,锁
    from multiprocessing import Process
    from multiprocessing import Lock
    import time
    import random
    def task1(lock):
        lock.acquire()
        print('task1开始打印')
        time.sleep(random.randint(1, 3))
        print('task1打印完成')
        lock.release()
    
    def task2(lock):
        lock.acquire()
        print('task2开始打印')
        time.sleep(random.randint(1, 3))
        print('task2打印完成')
        lock.release()
    
    def task3(lock):
        lock.acquire()
        print('task3开始打印')
        time.sleep(random.randint(1, 3))
        print('task3打印完成')
        lock.release()
    if __name__ == '__main__':
        lock = Lock()
        p1 = Process(target=task1, args=(lock, ))
        p2 = Process(target=task2, args=(lock, ))
        p3 = Process(target=task3, args=(lock, ))
        p1.start()
        p2.start()
        p3.start()
    
    # 上锁: 一定要是同一把锁,上锁一次,解锁一次
    
  • 死锁: 上锁一次,又上锁一次

    # 死锁: 上锁一次,又上锁一次
    from multiprocessing import Process
    from multiprocessing import Lock
    import time
    import random
    def task1(lock):
        lock.acquire()
        lock.acquire()
        print('task1开始打印')
        time.sleep(random.randint(1, 3))
        print('task1打印完成')
        lock.release()
    
    def task2(lock):
        lock.acquire()
        print('task2开始打印')
        time.sleep(random.randint(1, 3))
        print('task2打印完成')
        lock.release()
    
    def task3(lock):
        lock.acquire()
        print('task3开始打印')
        time.sleep(random.randint(1, 3))
        print('task3打印完成')
        lock.release()
    if __name__ == '__main__':
        lock = Lock()
        p1 = Process(target=task1, args=(lock, ))
        p2 = Process(target=task2, args=(lock, ))
        p3 = Process(target=task3, args=(lock, ))
        p1.start()
        p2.start()
        p3.start()
    
  • 验证: 上锁后,遇到IO阻塞cup也会切换,但是发现需要同一把锁就不会执行

    from multiprocessing import Process
    from multiprocessing import Lock
    import time
    import random
    def task1(lock):
        lock.acquire()
        print('task1开始打印')
        time.sleep(random.randint(1, 3))
        print('task1打印完成')
        lock.release()
    
    def task2(lock):
        print('task2')
        lock.acquire()
        print('task2开始打印')
        time.sleep(random.randint(1, 3))
        print('task2打印完成')
        lock.release()
    
    def task3(lock):
        print('task3')
        lock.acquire()
        print('task3开始打印')
        time.sleep(random.randint(1, 3))
        print('task3打印完成')
        lock.release()
    if __name__ == '__main__':
        lock = Lock()
        p1 = Process(target=task1, args=(lock, ))
        p2 = Process(target=task2, args=(lock, ))
        p3 = Process(target=task3, args=(lock, ))
        p1.start()
        p2.start()
        p3.start()
    
  • 模拟抢票系统: 文件版

    # db文件内容: {"count": 80}
    from multiprocessing import Process
    from multiprocessing import Lock
    import time
    import random
    import json
    import os
    
    def search():
        time.sleep(random.random())
        dic = json.load(open('db'))
        print(f'剩余票数:{dic["count"]}')
    
    def get():
        dic = json.load(open('db'))
        time.sleep(random.random())
        if dic['count'] > 0:
            dic['count'] -= 1
            time.sleep(random.random())
            json.dump(dic, open('db', 'w'))
            print(f'{os.getpid()}用户购票成功')
        else:
            print('没票了......')
    def task(lock):
        search()
        lock.acquire()
        get()
        lock.release()
    if __name__ == '__main__':
        lock = Lock()
        for i in range(100):
            p = Process(target=task, args=(lock,))
            p.start()
    
  • 互斥锁与join的区别与共同点

    • 共同点: 都完成了进程之间的串行
    • 区别: join人为控制的进程串行,互斥锁是随机的抢占资源,保证了公平性
  • 思考

    # 加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全.
    虽然可以用文件共享数据实现进程间通信,但问题是:
    1.效率低(共享数据基于文件,而文件是硬盘上的数据)
    2.需要自己加锁处理
    
    # 因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题.这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道.
    队列和管道都是将数据存放于内存中
    队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性
    

五, 队列

  • 进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的

  • 队列就是存在于内存中的一个容器,最大的一个特点:队列的特性就是FIFO,完全支持先进先出的原则.

  • 创建队列的类(底层就是以管道和锁定的方式实现):

    • Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递.

    • maxsize是队列中允许最大项数,省略则无大小限制.

  • 方法介绍:

1 q.put方法用以插入数据到队列中,put方法还有两个可选参数: blocked和timeout.如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间.如果超时,会抛出Queue.Full异常.如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常
2 q.get方法可以从队列读取并且删除一个元素.同样,get方法有两个可选参数: blocked和timeout.如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常.如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
3 q.get_nowait(): 同q.get(False)
4 q.put_nowait(): 同q.put(False)
5 q.empty(): 调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目
6 q.full(): 调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走
7 q.qsize(): 返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
  • 利用队列进行进程之间通信: 简单,方便,不用自己手动加锁,队列自带阻塞,可持续化取数据

    from multiprocessing import Queue
    q = Queue(3)  # 可以设置元素个数
    
    def func():
        print('in func')
    q.put('asd')
    q.put({'count': 1})
    q.put(func)
    # q.put(func)  # 当队列的数据已经达到上限,再插入数据的时候,程序就会夯住
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())  # 当队列的数据已经取空,再取数据的时候,程序就会夯住
    ---------------------------------------------------
    # 模拟一个实例
    # 小米: 抢手环4  预发售10个
    # 有100个人去抢
    from multiprocessing import Process
    from multiprocessing import Queue
    import os
    def tack(q):
        try:
            q.put(f'{os.getpid()}用户', block=False)
        except Exception:
            return False
    
    if __name__ == '__main__':
        q = Queue(10)
        for i in range(100):
            p = Process(target=tack, args=(q,))
            p.start()
            
        for el in range(1, 11):
            print(f'第{el}个用户:', q.get())
    
  • 生产者消费者模型

    • 在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题.该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度.

    • 为什么要使用生产者和消费者模式
      在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程.在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据.同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者.为了解决这个问题于是引入了生产者和消费者模式.

    • 基于队列实现生产者消费者模型

      # 以吃包子举例,厨师生产出包子,不可能塞你嘴里,放在一个盆中
      # 三个主体: (生产者)厨师,(容器队列)盆,(消费者)吃包子的人
      # 如果没有容器,生产者与消费者强耦合型,所以要有一个容器,缓冲区.平衡了生产力与消费力
      # 生产者消费者模型多应用与并发
      from multiprocessing import Process
      from multiprocessing import Queue
      import time
      import random
      import os
      
      def producer(q):
          for i in range(1, 11):
              res = f'包子{i}'
              time.sleep(random.randint(1, 3))
              q.put(res)
              print(f'生产者{os.getpid()}制作了{res}')
      
      def consumer(q):
          while 1:
              try:
                  res = q.get(timeout=4)
                  time.sleep(random.randint(1, 3))
                  print(f'消费者{os.getpid()}吃了{res}')
              except Exception:
                  break
      
      if __name__ == '__main__':
          q = Queue()
          p1 = Process(target=consumer, args=(q,))
          p2 = Process(target=producer, args=(q,))
          p1.start()
          p2.start()
      
      # 生产者消费者模型:
      # 合理的调控多个进程去生产数据以及提取数据,中间有个必不可少的环节(容器队列)
      

六, 管道

进程间通信(IPC)方式二: 管道

#创建管道的类:
Pipe([duplex]):在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道
#参数介绍:
dumplex:默认管道是全双工的,如果将duplex射成False,conn1只能用于接收,conn2只能用于发送。
#主要方法:
    conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。
    conn1.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象
 #其他方法:
conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法
conn1.fileno():返回连接使用的整数文件描述符
conn1.poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达。
 
conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。
conn.send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收    
 
conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。
from multiprocessing import Process,Pipe

import time,os
def consumer(p,name):
    left,right=p
    left.close()
    while True:
        try:
            baozi=right.recv()
            print('%s 收到包子:%s' %(name,baozi))
        except EOFError:
            right.close()
            break
def producer(seq,p):
    left,right=p
    right.close()
    for i in seq:
        left.send(i)
        # time.sleep(1)
    else:
        left.close()
if __name__ == '__main__':
    left,right=Pipe()

    c1=Process(target=consumer,args=((left,right),'c1'))
    c1.start()


    seq=(i for i in range(10))
    producer(seq,(left,right))

    right.close()
    left.close()

    c1.join()
    print('主进程')

# 基于管道实现进程间通信(与队列的方式是类似的,队列就是管道加锁实现的)
# 但是,管道是有问题的,管道会造成数据的不安全,官方给予的解释是管道有可能会造成数据损坏。

七, 进程池和multiprocessing.Pool

  • 进程池的概念:

  • 定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果

  • multiprocessing.Pool模块:

    创建进程池的类,如果指定numprocess为3,则进程池会从无到有创建三个进程,然后自始至终使用这三个进程去执行所有任务,不会开启其他进程,提高操作系统效率,减少空间的占用等.

    Pool(numprocess, initializer, initargs):创建进程池
    numprocess: 要创建的进程数,如果省略,将默认使用cpu_count()的值
    initializer: 是每个工作进程启动时要执行的可调用对象,默认为None
    initargs: 是要传给initializer的参数组
    # 常用方法
    1. p.apply(func, args, kwargs): 在一个池工作进程中执行func(*args, **kwargs),然后返回结果.需要强调的是: 此操作并不会在所有池工作进程中并执行func函数,如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()
    
    2. p.apply_async(func, args, kwargs): 在一个池工作进程中执行func(*args,**kwargs),然后返回结果,异步发布
    
    3. p.close(): 关闭进程池,防止进一步操作.如果所有操作持续挂起,它们将在工作进程终止前完成
    4. P.jion(): 等待所有工作进程退出.此方法只能在close()或terminate()之后调用
    
    方法apply_async()和map_async()的返回值是AsyncResul的实例obj.实例具有以下方法:
    1. obj.get(): 返回结果,如果有必要则等待结果到达.timeout是可选的,如果在指定时间内还没有到达,将引发异常.如果远程操作中引发了异常,它将在调用此方法时再次被引发.
    obj.ready(): 如果调用完成,返回True
    obj.successful(): 如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常
    obj.wait([timeout]): 等待结果变为可用
    obj.terminate(): 立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作.如果p被垃圾回收,将自动调用此函数
    
  • 进程池的简单应用,与多进程的效率对比

    from multiprocessing import Pool
    from multiprocessing import Process
    import time
    def func(n):
        print(n)
    def func2(n):
        for i in range(3):
            print(n - 1)
    if __name__ == '__main__':
        start_time = time.time()
        p = Pool(5)  # 创建含有5个进程的进程池
        # p.map(func, range(100))  # 异步调用进程,开启100个任务,map自带join的功能
        # map只限于接收一个可迭代的数据类型参数,列表啊,元祖啊等等.
        # 如果想做其他的参数之类的操作,需要用后面的方法。
        for i in range(100):
            p.apply_async(func, (i,))
        p.close()
        p.join()
        print('耗时:', time.time() - start_time)
        # 耗时: 0.24491620063781738
        start_time = time.time()
        p_lst = []
        for i in range(100):
            p = Process(target=func, args=(i,))
            p_lst.append(p)
            p.start()
        for i in p_lst:
            i.join()
        print('耗时:', time.time() - start_time)
        # 耗时: 4.492773771286011
    
  • 同比,异步调用:

    import os, time
    from multiprocessing import Pool
    
    def work(n):
        print('%s run' % os.getpid())
        time.sleep(1)
        return n**2
    
    if __name__ == '__main__':
        p = Pool(3) # 进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
    ------------------------------------------------------
    # 同步调用
        p_lst = []
        for i in range(10):
            res = p.apply(work, (i,))
            # 同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程中可能有阻塞也可能没有阻塞
            # 但不管该任务是否存在阻塞,同步调用都会在原地等着
            p_lst.append(res)
        print(res_l)
    ------------------------------------------------------
    # 异步调用
        p_lst = []
        for i in range(10):
            res = p.apply_async(work, (i,))
            # 异步运行,根据进程池中有的进程数,每次最多3个子进程在异步执行,并且可以执行不同的任务,传送任意的参数了.
            # 返回结果之后,将结果放入列表,归还进程,之后再执行新的任务
            # 需要注意的是,进程池中的三个进程不是同时开启或者同时结束
            # 而是执行完一个就释放一个进程,这个进程就去接收新的任务
            p_lst.append(res)
        # 异步apply_async用法: 如果使用异步提交的任务,主进程需要使用join,等待进程池内任务都处理完,然后可以用get收集结果
        # 否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了
        p.close()  # 不是关闭进程池,而是结束进程池接收任务,确保没有新任务再提交过来
        p.join()   # 感知进程池中的任务已经执行结束,只有当没有新的任务添加进来的时候,才能感知到任务结束了,所以在join之前必须加上close方法
        for i in p_lst:
            print(i.get())
        # 使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get
    
  • 回调函数:

    回调函数的形参只有一个,如果执行函数有多个返回值,那么也可以被回调函数的这一个形参接收,接收的是一个元祖,包含着执行函数的所有返回值

    # requests简单介绍
    import requests
    response = requests.get('http://www.baidu.com')
    print(response)
    print(response.status_code) # 200正常,404找不到网页,503等5开头的是网站内部错误
    print(response.content.decode('utf-8'))
    -------------------------------------------------------
    import requests
    from multiprocessing import Pool
    import os
    def get_page(url):
        print(f'{os.getpid()}进程在爬取:{url}')
        respone = requests.get(url)
        if respone.status_code == 200:
            return url, respone.text
    
    def pasrse_page(res):
        print(f'{os.getpid()}进程在在分析:{res[0]}的结果')
        print(f'{res[0]}的结果:{len(res[1])}')
    
    if __name__ == '__main__':
        urls = [
            'https://www.baidu.com',
            'https://www.python.org',
            'https://www.openstack.org',
            'https://help.github.com/',
            'http://www.sina.com.cn/'
        ]
        p = Pool(3)
        res_lst = []
        for url in urls:
            res = p.apply_async(get_page, args=(url,), callback=pasrse_page)
            res_lst.append(res)
        p.close()
        p.join()
    
  • 如果在主进程中等待进程池中所有任务都执行完毕后,再统一处理结果,则无需回调函数

    from multiprocessing import Pool
    import time,random,os
    
    def work(n):
        time.sleep(1)
        return n**2
    if __name__ == '__main__':
        p = Pool(5)
    
        res_l = []
        for i in range(10):
            res = p.apply_async(work, args=(i,))
            res_l.append(res)
    
        p.close()
        p.join() # 等待进程池中所有进程执行完毕
    
        nums = []
        for res in res_l:
            nums.append(res.get()) # 拿到所有结果
        print(nums) # 主进程拿到所有的处理结果,可以在主进程中进行统一进行处理
    
    
  • 进程池和信号量的区别:

    • 进程池是多个需要被执行的任务在进程池外面排队等待获取进程对象去执行自己,而信号量是一堆进程等待着去执行一段逻辑代码
    • 信号量不能控制创建多少个进程,但是可以控制同时多少个进程能够执行,但是进程池能控制你可以创建多少个进程
原文地址:https://www.cnblogs.com/zyyhxbs/p/11233251.html