Python多进程的简单笔记。

首先是通过os.fork创建多进程:

参考链接:

https://www.liaoxuefeng.com/wiki/1016959663602400/1017628290184064

https://www.cnblogs.com/Lin-Yi/p/7360855.html

import os
print('主进程的pid:',os.getpid())
pid1 = os.fork()
print('子进程pid1:',pid1, os.getpid(),'',os.getppid())
pid2 = os.fork()
print('孙子进程pid2:',pid2,os.getpid(),'',os.getppid())
print(123)
主进程的pid: 44648
子进程pid1: 44649 44648  1670
孙子进程pid2: 44650 44648  1670
123
子进程pid1: 0 44649  44648
孙子进程pid2: 0 44650  44648
123
孙子进程pid2: 44651 44649  44648
123
孙子进程pid2: 0 44651  44649
123

 上面的代码,主要是分析os.fork创建子进程的方式,代码里面我两次os.fork进行了自进程的创建,打印了4次123,从进程编号可以看出来了。

但pid1或pid2输出为0的时候,说明子进程或者孙子进程在工作。从打印出来的消息可以看出pid 44648创建了儿子进程44649与孙子44650,儿子44649创建了儿子44651。

所以这里一共有4个进程并行在跑。

os.fork虽然创建进程非常方便,但网上的资料也比较少,我看了很久也只琢磨了一点。重要一点,子进程必须要再父进程之前完成,后面我简单写一个。

import os
import time

print(f'主进程:{os.getpid()}')
pid = os.fork()
if pid:
    print(f'主进程在运行{os.getpid()},子进程为{pid}')
else:
    # time.sleep(1)
    print(f'子进程{os.getpid()}在运行,父进程为{os.getppid()}')
主进程:44723
主进程在运行44723,子进程为44724
子进程44724在运行,父进程为44723
import os
import time

print(f'主进程:{os.getpid()}')
pid = os.fork()
if pid:
    print(f'主进程在运行{os.getpid()},子进程为{pid}')
else:
    time.sleep(1)
    print(f'子进程{os.getpid()}在运行,父进程为{os.getppid()}')
主进程:44726
主进程在运行44726,子进程为44727

 从运行可以看出,当子进程休息1秒的时间,主进程已经结束,主进程一旦结束,子进程将不在执行,由于该模块用的比较少,我就不仔细研究如何子进程守护了。

后面主要介绍multiprocessing的使用,及通过Queue进行通讯。

subprocess(独立开辟一个进程执行一些os的操作,暂时用不到,参考链接。)

参考链接:https://www.520mwx.com/view/62337

subprocess模块用于执行系统命令<!--more-->,其实有一个模块也支持执行系统命令,那个模块就是sys.system,但他执行系统命令会直接通过主进程去执行命令,那假如,该命令的执行需要耗费一个小时,那么主进程会卡一个小时,而不会去干别的事,这样就会导致程序的运行效率低下。

如果由subprocess执行系统命令的时候并不会让主进程去执行,而是主进程会开辟出一个子进程去执行,并不会影响到主进程的运行,主进程该干嘛就干嘛,那么又有个问题,大家都知道进程之间的内存空间是独立的,也就是说进程之间是不能相互访问的,那么在subprocess中,有个管道的概念,既然固定死了进程之间不能相互访问,那么可以将执行命令的结果输出到管道里,该管道其实就是一块共享的内存空间,可以让主进程获取到该共享内存空间存放的数据

参考:https://www.cnblogs.com/jiangfan95/p/11439207.html(感觉写的很仔细,比廖大的详细)。

 创建进程的类:

Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)

强调:
1. 需要使用关键字的方式来指定参数
2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
复制代码
复制代码

1 group参数未使用,值始终为None
2 target表示调用对象,即子进程要执行的任务
3 args表示调用对象的位置参数元组,args=(1,2,'anne',)
4 kwargs表示调用对象的字典,kwargs={'name':'anne','age':18}
5 name为子进程的名称

复制代码
复制代码

创建并开启进程的两种方法

第一种通过实例化的方式。

import random
import time
import os
from multiprocessing import Process

def demo(name):
time.sleep(random.random())
print(f'进程:{name}正在执行,pid为{os.getpid()}, 父pid为{os.getppid()}')


for i in range(5):
t = Process(target=demo,args=(i,),name=f'进程{i}') # 参数为元祖
print(t.name)
t.start()

print(f'主进程为{os.getpid()}')
/usr/local/bin/python3.7 /Users/shijianzhong/study/multi_processing/n1.py
进程0
进程1
进程2
进程3
进程4
主进程为44853
进程:4正在执行,pid为44858, 父pid为44853
进程:2正在执行,pid为44856, 父pid为44853
进程:1正在执行,pid为44855, 父pid为44853
进程:0正在执行,pid为44854, 父pid为44853
进程:3正在执行,pid为44857, 父pid为44853

Process finished with exit code 0

 第二种通过继承的方法编写多进程。

import random
from multiprocessing import Process
import time
import os

class MyProcess(Process):
    def __init__(self,name):
        super(MyProcess, self).__init__()
        self.name = f'我是进程{name}'

    def run(self) -> None:
        time.sleep(random.random())
        print(f'进程:{self.name}正在执行,pid为{os.getpid()}, 父pid为{os.getppid()}')


for i in range(5):
    t = MyProcess('a' + str(i))
    print(t.name)
    t.start()


print(f'主进程为{os.getpid()}')
/usr/local/bin/python3.7 /Users/shijianzhong/study/multi_processing/n2.py
我是进程a0
我是进程a1
我是进程a2
我是进程a3
我是进程a4
主进程为44956
进程:我是进程a1正在执行,pid为44958, 父pid为44956
进程:我是进程a4正在执行,pid为44961, 父pid为44956
进程:我是进程a2正在执行,pid为44959, 父pid为44956
进程:我是进程a0正在执行,pid为44957, 父pid为44956
进程:我是进程a3正在执行,pid为44960, 父pid为44956

Process finished with exit code 0

 join让子进程产生阻塞,默认join为无限阻塞,但可以传入时间,设置阻塞的超时时间。

import random
from multiprocessing import Process
import time
import os

class MyProcess(Process):
    def __init__(self,name):
        super(MyProcess, self).__init__()
        self.name = f'我是进程{name}'

    def run(self) -> None:
        time.sleep(random.random())
        print(f'进程:{self.name}正在执行,pid为{os.getpid()}, 父pid为{os.getppid()}')

l_process = []
for i in range(5):
    t = MyProcess('a' + str(i))
    print(t.name)
    t.start()
    l_process.append(t)

for m in l_process:
    m.join()         # join可以等待子进程结束后再继续往下运行,通常用于进程间的同步。


print(f'主进程为{os.getpid()}')
/usr/local/bin/python3.7 /Users/shijianzhong/study/multi_processing/n2.py
我是进程a0
我是进程a1
我是进程a2
我是进程a3
我是进程a4
进程:我是进程a4正在执行,pid为45013, 父pid为45008
进程:我是进程a1正在执行,pid为45010, 父pid为45008
进程:我是进程a0正在执行,pid为45009, 父pid为45008
进程:我是进程a3正在执行,pid为45012, 父pid为45008
进程:我是进程a2正在执行,pid为45011, 父pid为45008
主进程为45008

Process finished with exit code 0

 这里上一个daemon守护进程的实例,还有如何在运行的进程中,获取进程的名称:

#主进程代码运行完毕,守护进程就会结束
from multiprocessing import Process
from multiprocessing import current_process
import time

def foo():
    print(current_process().name)     # 调试的时候,通过current_process().name返回进程的姓名
    print(123)
    time.sleep(1)
    print("end123")                   # 没有输出

def bar():
    print(456)
    time.sleep(3)
    print("end456")


p1=Process(target=foo,name='foo')      # 给予姓名
p2=Process(target=bar)

p1.daemon=True                    # 默认为False作为非守护进程,设置为True为守护进程。守护进程会在主程序退出之前自动终止。
p1.start()
p2.start()
time.sleep(0.5)                 # 阻塞主进行0.5秒,要不然,p1关闭了进程保护,都没机会执行任何代码。
print("main-------") 
/usr/local/bin/python3.7 /Users/shijianzhong/study/multi_processing/n3.py
foo
123
456
main-------
end456

Process finished with exit code 0

终止进程,terminate终止进程。

import multiprocessing
import time

def foo():
    print('Starting worker')
    time.sleep(1)
    print('Finished worker')


t1 = multiprocessing.Process(target=foo)
t1.start()
print(t1,'is_alive',t1.is_alive())
t1.terminate()
# time.sleep(0.2)            # 手动设置阻塞也可以,但决定是join好。
print(t1,'is_alive',t1.is_alive())
t1.join()            # 终止进程要使用Join等待进程退出,使进程有足够的时间更新对象的状态,可以反应进程状态
print(t1,'is_alive',t1.is_alive())
/usr/local/bin/python3.7 /Users/shijianzhong/study/multi_processing/n4.py
<Process(Process-1, started)> is_alive True
<Process(Process-1, started)> is_alive True
<Process(Process-1, stopped[SIGTERM])> is_alive False

Process finished with exit code 0

 多个进程操作同个资源,也会如果不加锁,可能会引发冲突。

import multiprocessing
import time
import random

lock = multiprocessing.Lock()

def work():
    print(multiprocessing.current_process().name + '开始正在工作')
    time.sleep(random.random())
    print(multiprocessing.current_process().name + '已经结束工作')

def farm():
    print(multiprocessing.current_process().name + '开始正在工作')
    time.sleep(random.random())
    print(multiprocessing.current_process().name + '已经结束工作')

for i in range(5):
    t1 = multiprocessing.Process(target=work, name=f'工人{i}')
    t2 = multiprocessing.Process(target=farm, name=f'农民{i}')
    t1.start()
    t2.start()
/usr/local/bin/python3.7 /Users/shijianzhong/study/multi_processing/n5.py
工人0开始正在工作
农民0开始正在工作
工人1开始正在工作
农民1开始正在工作
工人2开始正在工作
农民2开始正在工作
工人3开始正在工作
农民3开始正在工作
农民3已经结束工作
工人4开始正在工作
农民4开始正在工作
工人2已经结束工作
农民0已经结束工作
农民4已经结束工作
农民1已经结束工作
工人0已经结束工作
工人3已经结束工作
工人1已经结束工作
农民2已经结束工作
工人4已经结束工作

Process finished with exit code 0

 上面是不加锁的情况下,工作会进行穿插。

import multiprocessing
import time
import random

lock = multiprocessing.Lock()

def work():
    with lock:    # 用with写,不用怕死锁
        print(multiprocessing.current_process().name + '开始正在工作')
        time.sleep(random.random())
        print(multiprocessing.current_process().name + '已经结束工作')

def farm():
    lock.acquire()
    try:          # 用try写。
        print(multiprocessing.current_process().name + '开始正在工作')
        time.sleep(random.random())
        print(multiprocessing.current_process().name + '已经结束工作')
    finally:
        lock.release()

for i in range(5):
    t1 = multiprocessing.Process(target=work, name=f'工人{i}')
    t2 = multiprocessing.Process(target=farm, name=f'农民{i}')
    t1.start()
    t2.start()
/usr/local/bin/python3.7 /Users/shijianzhong/study/multi_processing/n5.py
工人0开始正在工作
工人0已经结束工作
农民0开始正在工作
农民0已经结束工作
工人1开始正在工作
工人1已经结束工作
农民1开始正在工作
农民1已经结束工作
工人2开始正在工作
工人2已经结束工作
农民2开始正在工作
农民2已经结束工作
工人3开始正在工作
工人3已经结束工作
农民3开始正在工作
农民3已经结束工作
工人4开始正在工作
工人4已经结束工作
农民4开始正在工作
农民4已经结束工作

Process finished with exit code 0

 可以看出来,加了锁的,农民或工人只有一个能工作,另外一个必须休息,另外只有一把锁,虽然这样安全,但效率真的很低。

进程间通信

虽然可以用文件共享数据实现进程间通信,但问题是:

1)效率低(共享数据基于文件,而文件是硬盘上的数据) 2)需要自己加锁处理

因此我们最好找寻一种解决方案能够兼顾:1)效率高(多个进程共享一块内存的数据)2)帮我们处理好锁问题。

这样看来进程间的通讯还是比较多,可以通过socket进行通讯,还可以文件共享数据,最后也是最方便的是队列,管道,其实就是共享内存。

mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。

1 队列和管道都是将数据存放于内存中

2 队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来, 我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性

简单的一些使用介绍

import multiprocessing

q = multiprocessing.Queue(maxsize=3)
q.put('1')    # 放如元素
print(q.empty())  # 查看容器是否有空间
q.get()          # 取出元素
q.get(block=True)     # 设置区块,如果为False取不到数据就报错

 一个简单的生产者,消费者关系:

import multiprocessing
import time


def producer(q):
    for i in ['鸡腿','duck', 'milk', 'egg']:
        time.sleep(1)
        print('put the',i)
        q.put(i)


def consumer(q):
    while True:
        print('eat:', q.get(timeout=3))    # 3秒取不到报错


q = multiprocessing.Queue()

t1 = multiprocessing.Process(target=producer,args=(q,))
t2 = multiprocessing.Process(target=consumer,args=(q,))
t1.start()
t2.start()
/usr/local/bin/python3.7 /Users/shijianzhong/study/multi_processing/n6.py
put the 鸡腿
eat: 鸡腿
put the duck
eat: duck
put the milk
eat: milk
put the egg
eat: egg
Process Process-2:
Traceback (most recent call last):
  File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/shijianzhong/study/multi_processing/n6.py", line 14, in consumer
    print('eat:', q.get(timeout=3))
  File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/queues.py", line 105, in get
    raise Empty
_queue.Empty

Process finished with exit code 0
import multiprocessing
import time


def producer(q):
    for i in ['鸡腿','duck', 'milk', 'egg']:
        time.sleep(1)
        print('put the',i)
        q.put(i)
    q.put(None)

def consumer(q):
    while True:
        if q.get() is None:     # 加一个信号,停止条件
            break
        print('eat:', q.get())    # 3秒取不到报错


q = multiprocessing.Queue()

t1 = multiprocessing.Process(target=producer,args=(q,))
t2 = multiprocessing.Process(target=consumer,args=(q,))
t1.start()
t2.start()
/usr/local/bin/python3.7 /Users/shijianzhong/study/multi_processing/n6.py
put the 鸡腿
put the duck
eat: duck
put the milk
put the egg
eat: egg

Process finished with exit code 0

 通过在队列里面放如一个截止信号,可以让生产者知道后,结束进程。

也可以通过JoinableQueue来实现当生产者停止后,消费者也停止,主要原理是通过阻塞队列来实现。

import multiprocessing
import time
import random


def producer(name, q):
    for i in ['鸡腿', 'duck', 'milk', 'egg']:
        time.sleep(random.random())
        print(name, 'put the', i)
        q.put(name + ':' + i)
    q.join()                      # 执行到这一步说明,生产加工已经完成,阻塞队列。


def consumer(name, q):
    while True:
        time.sleep(random.random())
        print(name, 'eat:', q.get())  # 取数据
        q.task_done()               # 取一次数据,向队列产生一次取出的信号。


q = multiprocessing.JoinableQueue()
p1 = multiprocessing.Process(target=producer, args=('xm001', q,))
p2 = multiprocessing.Process(target=producer, args=('xm002', q,))
p3 = multiprocessing.Process(target=producer, args=('xm001', q,))

t1 = multiprocessing.Process(target=consumer, args=('xibai01', q,))
t2 = multiprocessing.Process(target=consumer, args=('xibai02', q,))

t1.daemon = True    # 进程守护打开,消费者的停止,主要就是因为主进程的停止而停止。
t2.daemon = True
for i in (p1, p2, p3, t1, t2):
    i.start()

p1.join()              # 先阻塞生产者的主函数,函数里面再阻塞队列,等两次阻塞都通过以后,执行主函数,主函数执行后,守护的进程退出。
p2.join()
p3.join()

print('main')
/usr/local/bin/python3.7 /Users/shijianzhong/study/multi_processing/n7.py
xm001 put the 鸡腿
xibai02 eat: xm001:鸡腿
xm002 put the 鸡腿
xibai01 eat: xm002:鸡腿
xm002 put the duck
xibai01 eat: xm002:duck
xm001 put the duck
xm001 put the 鸡腿
xibai01 eat: xm001:duck
xibai02 eat: xm001:鸡腿
xm001 put the milk
xibai02 eat: xm001:milk
xm001 put the egg
xibai01 eat: xm001:egg
xm002 put the milk
xibai02 eat: xm002:milk
xm001 put the duck
xibai01 eat: xm001:duck
xm002 put the egg
xibai02 eat: xm002:egg
xm001 put the milk
xibai02 eat: xm001:milk
xm001 put the egg
xibai01 eat: xm001:egg
main

Process finished with exit code 0

2. 管道

创建管道的类:

Pipe([duplex]):在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道

参数介绍:

dumplex:默认管道是全双工的,如果将duplex射成False,conn1只能用于接收,conn2只能用于发送。

Queue就是基础Pipe的,这个我就先不写了。

3. 共享数据

展望未来,基于消息传递的并发编程是大势所趋

即便是使用线程,推荐做法也是将程序设计为大量独立的线程集合

通过消息队列交换数据。这样极大地减少了对使用锁定和其他同步手段的需求,

还可以扩展到分布式系统中

进程间通信应该尽量避免使用本节所讲的共享数据的方式

import multiprocessing

def demo(l):
    l.pop()                # 没有加锁,数据处理还是可以的,安全起见,可以加锁。


lock = multiprocessing.Lock()
arg = multiprocessing.Manager()
l_all = arg.list(range(20))        # 可以做字典,我这里做的列表
print(l_all)

l_arry = []
for i in range(10):
    t = multiprocessing.Process(target=demo, args=(l_all,))
    t.start()
    l_arry.append(t)
for i in l_arry:
    i.join()          # 必须阻塞,要不然主程序结束了,数据没了。


print(l_all)
/usr/local/bin/python3.7 /Users/shijianzhong/study/multi_processing/n9.py
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Process finished with exit code 0

控制资源的并发访问,有时候可能需要允许多个工作进程同时访问一个资源,但要限制总数。列如,连接池支持同时连接,但数目可能是固定的,或者一个网络应用可能支持固定数目的并发下载。

这个时候就可以用Semaphore

import random
import multiprocessing
import time

class ActivePool:

    def __init__(self):            
        self.mgr = multiprocessing.Manager()
        self.active = self.mgr.list()        # 建立一个模型,开虚拟连接池,这里用列表
        self.lock = multiprocessing.Lock()

    def makeActive(self, name):
        with self.lock:                   # 为了防止数据错误,全部加锁了
            self.active.append(name)

    def makeInactive(self, name):
        with self.lock:
            self.active.remove(name)

    def __str__(self):
        with self.lock:
            return str(self.active)

def worker(s , pool):
    name  = multiprocessing.current_process().name
    with s:
        pool.makeActive(name)                    # 通过s来限制进入的线程,进来的线程操作了都是全共享数据列表
        print(f'Activating {name} now running {pool}')
        time.sleep(random.random())
        pool.makeInactive(name)

if __name__ == '__main__':
    pool = ActivePool()
    s = multiprocessing.Semaphore(3)
    # 初始化以后开始工作
    jobs = [multiprocessing.Process(target=worker, name=str(i), args=(s, pool)) for i in range(10)]
    for i in jobs:
        i.start()

    while True:
        alive = 0
        for j in jobs:
            # 死循环这个工作任务列表里面的任务,查看是否有任务,有任务就打印出来。
            if j.is_alive():
                alive += 1
                j.join(timeout=0.1)
                print(f'Now running{pool}')
        # 没有任务的话,就结束这个循环。
        if alive == 0:
            break
/usr/local/bin/python3.7 /Users/shijianzhong/study/multi_processing/n10.py
Activating 0 now running ['0', '1']
Activating 1 now running ['0', '1', '2']
Activating 2 now running ['0', '1', '2']
Activating 3 now running ['0', '1', '3']
Now running['0', '1', '3']
Now running['0', '1', '3']
Now running['0', '1', '3']
Now running['0', '1', '3']
Now running['0', '1', '3']
Now running['0', '1', '3']
Activating 4 now running ['0', '3', '4']
Now running['0', '3', '4']
Activating 5 now running ['3', '4', '5']
Now running['3', '4', '5']
Activating 6 now running ['4', '5', '6']
Now running['4', '5', '6']
Now running['4', '5', '6']
Now running['4', '5', '6']
Now running['4', '5', '6']
Now running['4', '5', '6']
Now running['4', '5', '6']
Now running['4', '5', '6']
Now running['4', '5', '6']
Activating 7 now running ['5', '6', '7']
Activating 8 now running ['6', '7', '8']
Now running['6', '7', '8']
Now running['6', '7', '8']
Activating 9 now running ['7', '8', '9']
Now running['7', '8', '9']
Now running['7', '9']
Now running['7', '9']
Now running['7', '9']
Now running['7']
Now running['7']
Now running['7']
Now running['7']
Now running['7']
Now running[]

Process finished with exit code 0

 Event类是一种简单的方法,可以在进程之间传递状态信息。事件可以在设置状态和未设置状态之间切换。通过使用一个可选的超时值,时间对象的用户等待其状态从未设置变为设置。

里面就4个参数['clear', 'is_set', 'set', 'wait']

从字面也可以很好的辩识,后面我上一个简单的代码实例。

# _*_coding:utf-8_*_
# !/usr/bin/env python

from multiprocessing import Process, Event
import time, random


def wait_for_event(e):
    print('wait_for_event: starting')
    e.wait()            # 默认e未设置,这里阻塞
    print('wait_for_event: e.is_set()->', e.is_set())


def wait_for_event_timeout(e, t):
    print('wait_for_event_timeout: starting')
    e.wait(t)          # 可以设置阻塞时间
    print('wait_for_event_timeout: e.is_set()', e.is_set())

if __name__ == '__main__':
    e = Event()
    w1 = Process(target=wait_for_event, name='block', args=(e,))
    w1.start()
    w2 = Process(target=wait_for_event_timeout, name='nonblock', args=(e,2)) # 等待2秒后,阻塞将取消。
    w2.start()
    print('main:waiting before calling Event.set()')          # 
    time.sleep(3)
    e.set()       # 3秒后,给时间设置。
    print('main:event is set')
/usr/local/bin/python3.7 /Users/shijianzhong/study/multi_processing/n11.py
main:waiting before calling Event.set()
wait_for_event: starting
wait_for_event_timeout: starting
wait_for_event_timeout: e.is_set() False
main:event is set
wait_for_event: e.is_set()-> True

Process finished with exit code 0

 事件就像一个全局的对象,传入各个进程里面当做信号进行传输。

最后讲一个pool,进程池。

1 Pool([numprocess  [,initializer [, initargs]]]):创建进程池 

 参数介绍:

1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值
2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None
3 initargs:是要传给initializer的参数组
 

主要方法:

复制代码
1 p.apply(func [, args [, kwargs]])
在一个池工作进程中执行func(*args,**kwargs),然后返回结果。
需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()
2 p.apply_async(func [, args [, kwargs]]):
在一个池工作进程中执行func(*args,**kwargs),然后返回结果。
此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,
将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。   
3 p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
4 P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用
5.P.map():在功能上等价于内置的map()的结果,只不过各个任务会并行运行。(感觉跟高阶函数map基本一样,只不过这个是平行操作,返回的时候列表,高阶函数map返回的是迭代器)



  Multiprocessing.Pool可以提供指定数量的进程供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来执行它。Pool类用于需要执行的目标很多,而手动限制进程数量又太繁琐时,如果目标少且不用控制进程数量则可以用Process类。

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])


    processes: 是要使用的工作进程数。如果进程是None,那么使用返回的数字os.cpu_count()。也就是说根据本地的cpu个数决定,processes小于等于本地的cpu个数;
    initializer: 如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。
    maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。
    context: 用在制定工作进程启动时的上下文,一般使用 multiprocessing.Pool() 或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context。

from multiprocessing import Process, Pool
import time
import multiprocessing


def func(msg):
    print("msg:", msg)
    time.sleep(0.1)
    return msg

if __name__ == '__main__':
    def s_time(func):
        def wrap():
            t1 = time.perf_counter()
            func()
            cost_time = time.perf_counter() - t1
            print(f'消耗事件{cost_time:0.5f}')
        return wrap


    @s_time                # 写了一个简单的装饰器,测试成勋跑的事件。
    def run():
        count = multiprocessing.cpu_count()
        pool = Pool(processes=count)
        res_l = []
        for i in range(10):
            msg = "hello %d" % (i)
            # res = pool.apply_async(func, (msg,))  # 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
            res = pool.apply(func, (msg,))
            res_l.append(res)  # 同步执行,即执行完一个拿到结果,再去执行另外一个
        print("==============================>")
        pool.close()
        pool.join()  # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
        print(res_l)  # 看到的就是最终的结果组成的列表
        for i in res_l:  # apply是同步的,所以直接得到结果,没有get()方法
            # print(i.get())
            print(i)


    run()     


/usr/local/bin/python3.7 /Users/shijianzhong/study/multi_processing/n13.py
msg: hello 0
msg: hello 1
msg: hello 2
msg: hello 3
msg: hello 4
msg: hello 5
msg: hello 6
msg: hello 7
msg: hello 8
msg: hello 9
==============================>
['hello 0', 'hello 1', 'hello 2', 'hello 3', 'hello 4', 'hello 5', 'hello 6', 'hello 7', 'hello 8', 'hello 9']
hello 0
hello 1
hello 2
hello 3
hello 4
hello 5
hello 6
hello 7
hello 8
hello 9
消耗事件1.08404

Process finished with exit code 0
from multiprocessing import Process, Pool
import time
import multiprocessing


def func(msg):
    print("msg:", msg)
    time.sleep(0.1)
    return msg

if __name__ == '__main__':
    def s_time(func):
        def wrap():
            t1 = time.perf_counter()
            func()
            cost_time = time.perf_counter() - t1
            print(f'消耗事件{cost_time:0.5f}')
        return wrap


    @s_time                # 写了一个简单的装饰器,测试成勋跑的事件。
    def run():
        count = multiprocessing.cpu_count()
        pool = Pool(processes=count)
        res_l = []
        for i in range(10):
            msg = "hello %d" % (i)
            res = pool.apply_async(func, (msg,))  # 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
            # res = pool.apply(func, (msg,))
            res_l.append(res)  # 同步执行,即执行完一个拿到结果,再去执行另外一个
        print("==============================>")
        pool.close()
        pool.join()  # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
        print(res_l)  # 看到的就是最终的结果组成的列表
        for i in res_l:  # apply是同步的,所以直接得到结果,没有get()方法
            print(i.get())
            # print(i)


    run()
/usr/local/bin/python3.7 /Users/shijianzhong/study/multi_processing/n13.py
==============================>
msg: hello 0
msg: hello 1
msg: hello 2
msg: hello 3
msg: hello 4
msg: hello 5
msg: hello 6
msg: hello 7
msg: hello 8
msg: hello 9
[<multiprocessing.pool.ApplyResult object at 0x109f69450>, <multiprocessing.pool.ApplyResult object at 0x109f69590>, <multiprocessing.pool.ApplyResult object at 0x109f696d0>, <multiprocessing.pool.ApplyResult object at 0x109f69810>, <multiprocessing.pool.ApplyResult object at 0x109f69950>, <multiprocessing.pool.ApplyResult object at 0x109f69b10>, <multiprocessing.pool.ApplyResult object at 0x109f69c50>, <multiprocessing.pool.ApplyResult object at 0x109f69d90>, <multiprocessing.pool.ApplyResult object at 0x109f69ed0>, <multiprocessing.pool.ApplyResult object at 0x109f69a50>]
hello 0
hello 1
hello 2
hello 3
hello 4
hello 5
hello 6
hello 7
hello 8
hello 9
消耗事件0.17523

Process finished with exit code 0

 最后上演一个进程池的回调函数,还真好用,输入函数的返回值,直接传入了回调函数里面。

from multiprocessing import Pool
import requests
import os
import multiprocessing


def get_page(url):
    print('<进程%s> get %s' % (os.getpid(), url))        # 获取进程号,打印网址
    respone = requests.get(url)
    respone.encoding = 'utf-8'
    if respone.status_code == 200:
        return {'url': url, 'text': respone.text}        # 返回网址,网页源码


def pasrse_page(res):
    print('<进程%s> parse %s' % (os.getpid(), res['url']))    # 回调函数进程
    parse_res = 'url:<%s> size:[%s]
' % (res['url'], len(res['text']))
    with open('db.txt', 'a') as f:     # 写入本地文件
        f.write(parse_res)


if __name__ == '__main__':
    urls = [
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]
    count = multiprocessing.cpu_count()
    p = Pool(count)
    res_l = []
    for url in urls:
        res = p.apply_async(get_page, args=(url,), callback=pasrse_page)   # 异步进行操作
        res_l.append(res)

    p.close()      # 关闭池子,不允许新的进程进入
    p.join()       # 阻塞主进程
    # print([res.get() for res in res_l])
    print('main')
/usr/local/bin/python3.7 /Users/shijianzhong/study/multi_processing/n14.py
<进程48832> get https://www.baidu.com
<进程48833> get https://www.python.org
<进程48834> get https://www.openstack.org
<进程48835> get https://help.github.com/
<进程48836> get http://www.sina.com.cn/
<进程48831> parse https://www.baidu.com
<进程48831> parse http://www.sina.com.cn/
<进程48831> parse https://www.openstack.org
<进程48831> parse https://www.python.org
<进程48831> parse https://help.github.com/
main

Process finished with exit code 0

 从进程编号可以看出来,有多个进程参与了解析网页,就一个进程负责写入信息。还是非常不错的工作任务关系。



原文地址:https://www.cnblogs.com/sidianok/p/12020006.html