多进程

多进程

  同一时刻并行的处理多个任务,即为多进程。比如,你一边喝茶、看书还听着音乐。真正的并行多任务只能在多核的CPU上实现,由于任务数量是远远多于CPU的核数,所以操作系统会自动将多任务短时间轮流切换执行,给我们的感觉就像同时在执行一样。

  进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。程序是指令、数据及其组织形式的描述,进程是程序的实体。编写的代码没有运行叫程序,正在运行的代码就是进程。

fork

  Python中可以使用os模块fork()函数来创建子进程。程序执行os.fork()时,操作系统会创建一个子进程,然后复制父进程的所有信息到子进程中;调用一次os.fork()时,会返回两次值。返回给子进程的值一定是0,返回给父进程的是子进程中的pid号。返回给父进程pid(子进程的)号,是因为父进程可以fork出多个子进程,所以有必要记住子进程的pid号。

import os

print("当前进程pid= %d"% os.getpid())
num = 0
pid = os.fork()
if pid == 0:
    print("我是子进程: %s,父进程是:%s"%(os.getpid(),os.getppid()))
    num += 1
    print("num = %d"%num)
else:
    print("我是父进程:%s,我的子进程是:%s"%(os.getpid(),pid))
    num += 1
    print("num = %d"%num)

print("父、子进程都可以执行")

输出:
当前进程pid= 4262
我是父进程:4262,我的子进程是:4263
num = 1
父、子进程都可以执行
我是子进程: 4263,父进程是:4262
num = 1
父、子进程都可以执行

多进程中,每个进程都各自拥有一份,互不影响,如上num = 1。

multiprocessing

  由于fork函数存在于Linux、Unix、Mac操作系统中,Windows操作系统无fork函数调用,Python作为一个跨平台的语言,使用multiprocessing模块封装fork函数来创建多进程。multiprocessing提供一个Process类来代表一个进程对象。

Process使用:

Process([group [, target [, name [, args [, kwargs]]]]])


    target:表示这个进程实例所调用对象;

    args:表示调用对象的位置参数元组;

    kwargs:表示调用对象的关键字参数字典;

    name:为当前进程实例的别名;

    group:大多数情况下用不到;

Process类常用方法:

    is_alive():判断进程实例是否还在执行;

    join([timeout]):是否等待进程实例执行结束,或等待多少秒;

    start():启动进程实例(创建子进程);

    run():如果没有给定target参数,对这个对象调用start()方法时,就将执行对象中的run()方法;

    terminate():不管任务是否完成,立即终止;

Process类常用属性:

    name:当前进程实例别名,默认为Process-N,N为从1开始递增的整数;

    pid:当前进程实例的PID值;

from multiprocessing import Process
import os
import time

def run_proc(name,age,**kwargs):
    for i in range(10):
        print('子进程运行中,name= %s,age=%d,pid=%d'%(name,age,os.getpid()))
        print(kwargs)
        time.sleep(1)

if __name__ == '__main__':
    print('父进程 %d'%os.getpid())
    p = Process(target=run_proc,args=('MI',18),kwargs={'S':99})
    print('将要执行子进程')
    p.start()
    time.sleep(2)
    p.terminate()
    p.join()
    print('子进程结束了')

输出:
父进程 4460
将要执行子进程
子进程运行中,name= MI,age=18,pid=4461
{'S': 99}
子进程运行中,name= MI,age=18,pid=4461
{'S': 99}
子进程结束了

   Process创建子进程,只需要传入一个函数和函数参数,创建一个Process实例对象,然后用start方法启动,这样创建比fork()更简单。join()方法可以等待子进程结束后主进程继续往下执行,通常用于进程同步。

from multiprocessing import Process
import os
import time
import signal

def run_proc(name,age,**kwargs):
    for i in range(10):
        print('子进程运行中,name= %s,age=%d,pid=%d'%(name,age,os.getpid()))
        print(kwargs)
        time.sleep(0.1)

if __name__ == '__main__':
    print('父进程 %d'%os.getpid())
    p = Process(target=run_proc,args=('MI',18),kwargs={'S':99})
    print('将要执行子进程')
    p.start()
    time.sleep(0.2)
    os.killpg(os.getpid(),signal.SIGKILL)  #杀死父进程
    print('子进程结束了')

输出:
父进程 3791
将要执行子进程
子进程运行中,name= MI,age=18,pid=3792
{'S': 99}
子进程运行中,name= MI,age=18,pid=3792
{'S': 99}
已杀死

  Process中主进程会默认等待子进程执行完后继续执行父进程,如果父进程退出或者异常被终止了,子进程也会退出运行。当进程中处理的事物比较复杂,一个函数不能完成时,我们可以让一个类去继承Process实现要处理的任务。

进程池Pool

  当创建的子进程不多的时候,可以使用Process动态成生多个进程,如果需要创建成百上千时,手动创建工作量大,最主要的是不断创建和删除子进程调度系统分配资源很耗时间。所以我们可以创建一个进程池,预先放一些进程进去,要用的时候就直接调用,用完之后再把进程归还给进程池,省下创建删除进程的时间,提高了效率。

multiprocessing.Pool常用函数:

  •     apply_async(func[, args[, kwds]]) :使用非阻塞方式调用func(并行执行,堵塞方式必须等待上一个进程退出才能执  行下一个进程),args为传递给func的参数列表,kwds为传递给func的关键字参数列表;
  •     apply(func[, args[, kwds]]):使用阻塞方式调用func
  •     close():关闭Pool,使其不再接受新的任务;
  •     terminate():不管任务是否完成,立即终止;
  •     join():主进程阻塞,等待子进程的退出, 必须在close或terminate之后使用;
from multiprocessing import Pool
import os,time,random

def worker(msg):
    tStart = time.time()
    print("%s开始执行,进程号为%d"%(msg,os.getpid()))
    time.sleep(random.random()*2)
    tStop = time.time()
    print(msg,"执行完毕,耗时%0.2f"%(tStop-tStart))

if __name__ == '__main__':

    #创建一个进程池,最大进程数3
    po = Pool(3)
    for i in range(0,5):
        po.apply_async(worker,(i,))
    print('----start----')
    #关闭进程池,不再添加新的请求
    po.close()
    #必须放在close()之后,等待po中所有子进程执行完毕
    po.join()
    print('----end----')

输出:
----start----
0开始执行,进程号为4049
1开始执行,进程号为4050
2开始执行,进程号为4051
1 执行完毕,耗时0.96
3开始执行,进程号为4050
0 执行完毕,耗时0.98
4开始执行,进程号为4049
2 执行完毕,耗时1.30
4 执行完毕,耗时0.77
3 执行完毕,耗时1.11
----end----

  初始化Pool时,可以指定一个最大进程数,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来执行。

if __name__ == '__main__':

    #创建一个进程池,最大进程数3
    po = Pool(3)
    for i in range(0,5):
        po.apply_async(worker,(i,))
    print('----start----')
    #关闭进程池,不再添加新的请求
    #po.close()
    #必须放在close()之后,等待po中所有子进程执行完毕
    #po.join()
    print('----end----')

输出:
----start----
----end----

  如果我们在进程池之后没有添加join()会导致进程池中的任务不被执行。主进程创建/添加任务后,主进程默认不会等待进程池中的任务执行完后才结束,而是当主进程的任务做完之后立马结束。

进程通信Queue

  Process之间有时需要通信,操作系统提供了很多机制来实现进程间的通信。可以使用multiprocessing模块的Queue实现多进程之间的数据传递,Queue本身是一个消息列队程序。

 Queue使用:

  初始化Queue()对象时(例如:q=Queue()),若括号中没有指定最大可接收的消息数量,或数量为负值,那么就代表可接受的消息数量没有上限(直到内存的尽头);
    Queue.qsize():返回当前队列包含的消息数量;  
    Queue.empty():如果队列为空,返回True,反之False ;
    Queue.full():如果队列满了,返回True,反之False;
    Queue.get([block[, timeout]]):获取队列中的一条消息,然后将其从列队中移除,block默认值为True;
  1)如果block使用默认值,且没有设置timeout(单位秒),消息列队如果为空,此时程序将被阻塞(停在读取状态),直到从消息列队读到消息为止,如    果设置了timeout,则会等待timeout秒,若还没读取到任何消息,则抛出"Queue.Empty"异常;
  2)如果block值为False,消息列队如果为空,则会立刻抛出"Queue.Empty"异常;
    Queue.get_nowait():相当Queue.get(False);
    Queue.put(item,[block[, timeout]]):将item消息写入队列,block默认值为True;
  1)如果block使用默认值,且没有设置timeout(单位秒),消息列队如果已经没有空间可写入,此时程序将被阻塞(停在写入状态),直到从消息列队腾  出空间为止,如果设置了timeout,则会等待timeout秒,若还没空间,则抛出"Queue.Full"异常;
  2)如果block值为False,消息列队如果没有空间可写入,则会立刻抛出"Queue.Full"异常;
      Queue.put_nowait(item):相当Queue.put(item, False);

from multiprocessing import Process,Queue
import os,time,random


def write(q):
    print('Process to write: %s '%os.getpid())
    for value in ['A','B','C']:
        print('put %s to queue--'%value)
        q.put(value)
        time.sleep(random.random())

def read(q):
    print('Process to read: %s '%os.getpid())
    while True:
            value = q.get(True)
            print('get %s from queue '%value)

if __name__ == '__main__':
    q = Queue()
    pw = Process(target=write,args=(q,))
    pr = Process(target=read,args=(q,))
    pw.start()
    pr.start()
    pw.join()
    pr.terminate()
    print('Complete reading and writing')

输出:
Process to write: 4707 
put A to queue--
Process to read: 4708 
get A from queue 
put B to queue--
get B from queue 
put C to queue--
get C from queue 
Complete reading and writing

参考:

https://blog.csdn.net/sayhello_world/article/details/72829329

https://blog.csdn.net/hello_bravo_/article/details/52528283

原文地址:https://www.cnblogs.com/jsnhdream/p/10068495.html