Python-多进程

 多进程的定义

  提高效率(增加并发数)

  进程是程序一次动态的执行过程,包括代码加载,执行,执行完毕退出阶段

  进程是系统资源分配的独立单位(最小单位)

  进程拥有自己独立的堆和栈,既不共享堆,亦不共享栈,进程由操作系统调度

多进程的特性

  并发性:任何进程在操作系统中可以同时运行
  独立性:资源不共享
  异步性:进程和进程之间相互制约,进程运行有间断性

os.fork()

Unix/Linux操作系统提供了一个os.fork()系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。

子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用os.getppid()就可以拿到父进程的ID。

import os
import time
 
def child():
    print('我是子进程,id是:%d,父进程id是:%d' % (os.getpid(),os.getppid()))
    time.sleep(30)  #当主进程结束 子进程还在运行时 子进程的ppid为 0 
    os._exit(0)  #结束子进程
def parent():
    while True:
        newpid = os.fork() #Unix/Linux操作系统才有的方法
        if newpid == 0: # 子进程
            child()
        else:
            print('我的父进程id是:%d ------ 子进程id是:%d' % (os.getpid(),newpid))
        if input() == 'q':
            break
parent()

结果为:

我是子进程,id是:10026,父进程id是:10025
我的父进程id是:10025 ------ 子进程id是:10026
multiprocessing模块 python2.6版本之后才有的,属于内置模块,不需要额外安装
  这个模块支持跨平台,支持Windows,linux
  os.fork() : 只支持linux
  注意:子进程在复制父进程资源的时候,也将父进程的执行过程(当前的执行状态)也复制进去了,所以子进程中的代码并非全部从头到尾执行
1.创建多进程
  创建格式:p = Process(target=函数名)
  启动个事:进程对象名.start()
from multiprocess import Process
Process(target=None, name=None, args=(), kwargs={}, *, daemon=False)
    target:进程工作的任务函数
    name:进程的名字
    args:以元组为形式,不定长接收 进程工作的任务函数的参数
    kwargs:以字典为形式,不定长接收 进程工作的任务函数的参数
    daemon :默认情况下为Flase,创建的进程不是主进程的守护进程,即主进程会等待各个子进程的执行
            若设为True,表示创建的子进程是主进程的守护进程,主进程执行结束后,子进程也会结束
            需要在start()之前设置

这样创建的子进程实力,只会工作target参数所对应的函数

2.创建简单多进程 及参数讲解

from multiprocessing import Process,current_process

# current_process().name 返回进程名
# current_process().pid  返回进程id

def func():
    print('我是子进程,pid为%d,进程名为%s'%(current_process().pid,current_process().name))

def main():
    p_list = []
    # 进程执行的顺序是由CPU决定的
    for i in range(3):
        p = Process(target=func,name = '进程1')
        p.start()
        p_list.append(p)
    
    # 阻塞主进程 等待所有子进程结束,主进程继续向下执行
    for v in p_list:
        v.join()  # 参数 timeout=秒数,超过多少秒就不等了,否则就一直等

        # v.is_alive()  # 判断进程是否还在运行 

        if v.is_alive() == True:
            # v.terminate() #超过指定时间,如果子进程还活着,强制关闭子进程
        

if __name__ == "__mian__":
    main()
    #判断当前的监本,如果是按照模块被别人导入了,那么这个判断下的代码不会执行

    # 在当前脚本执行 __name__ 等于 __main__
    # 当这个脚本被其他程序导入时 __name__ 等于脚本名

3.进程类 案例

从键盘输入一个整数,分别开启两个进程来计算这个数的累加和和阶乘。
(第一个进程用系统提供给我们的类,第二个进程需要自己定义)
例如:输入5
10的累加和为:15
10的阶乘为:120
代码:
#  方法一:继承Process类,复写run()方法
class JieCheng(Process):
    def __init__(self,n):
        Process.__init__(self) #需要继承Process类本身的__init__方法
        self.n = n
    def run(self): #当创建的进程对象执行.start()方法时,就会执行类的run()方法
        j = 1
        for i in range(1,self.n):
            j = j * i
        print('%d!=%d'%(self.n,j))
# 方法二:定义一个方法,使用Process(target=)进行调用
def sunToN(n):
    s = 0
    for i in range(n):
        s += i
        print('1~%d的累加和为%d'%(n,s))

if __name__ == "__main__":
    n = input(('请输入一个数字:'))
    p1 = Process(target=subToN,args=(n,))
    p1.start()

    p2 = JieCheng(n)
    p2.start()
4.进程池
用于任务量巨大的时候
执行过程: 初始化Pool时,可以指定一个最大的进程数,当有新的请求提交到Pool中时,如果池还没有满,那么就会穿件一个新的进程用来执行该请求,但如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来执行
# 1-50-1号进程   51-100-2号进程
import time
# 进程池
from multiprocessing import Pool

def foo(num):
    time.sleep(1)
    print(num)

if __name__ == '__main__':
    # 创建一个进程池
    pool = Pool(16)
    for i in range(100):
        # 创建一个进程 异步 每次循环将会用空闲的子进程去调用目标 进程池中少一个 就会产生一个新的添加进去
        pool.apply_async(func=foo,args=(i,))
        # 同步的 16个为一组 执行 每次执行完16个 再次产生16个去执行
        # pool.apply(func=foo,args=(i,))
    pool.close() #关闭进程池
    pool.join() #阻塞主进程

5.多进程中的资源共享 (通信)

from multiprocessing import Manager,Pool
import os

def reader(q):
    print('reader启动(%s),父进程为(%s)'%(os.getpid(),os.getppid()))
    for i in range(q.qsize()):
        print('reader从Queue获取消息L%s'%q.get(True))
        
def write(q):
    print('write启动(%s),父进程为(%s)'%(os.getpid(),os.getppid()))
    for i in 'XiongDiLian':
        q.put(i)
        
if __name__ == "__main__":
    print('(%s) start'%os.getpid)
    # 使用Manager中的Queue来初始化
    q = Manager.Queue() #实例化一个队列
    po = Pool() # 实例化进程池
    #使用阻塞模式创建进程,避免在reader中使用死循环
    #让write完全执行后,再让reader去读取
    po.apply(write,(q,))
    po.apply(reader,(q,))
    po.close()
    po.close()
    po.join()
    
    print('(%s) end'%os.getpid())
参数说明:
初始化Queue()对象时(例如:q=Queue()),若括号中没有指定最大可接收的消息数量,或数量为负值,
那么就代表可接受的消息数量没有上限(直到内存的尽头);
  Queue.qsize():返回当前队列包含的消息数量;
  Queue.empty():如果队列为空,返回True,反之False ;
  Queue.full():如果队列满了,返回True,反之False;
  Queue.get([block[, timeout]]):获取队列中的一条消息,然后将其从列队中移除,block默认值为True;
    1. 如果block使用默认值,且没有设置timeout(单位秒),消息列队如果为空,此时程序将被阻塞(停在读取状态),直到从消息列队读到消息为止,
      如果设置了timeout,则会等待timeout秒,若还没读取到任何消息,则抛出"Queue.Empty"异常;
    2. 如果block值为False,消息列队如果为空,则会立刻抛出"Queue.Empty"异常;
  Queue.get_nowait():相当Queue.get(False);
  Queue.put(item,[block[, timeout]]):将item消息写入队列,block默认值为True;
    1. 如果block使用默认值,且没有设置timeout(单位秒),消息列队如果已经没有空间可写入,此时程序将被阻塞(停在写入状态),直到从消息列队
      腾出空间为止,如果设置了timeout,则会等待timeout秒,若还没空间,则抛出"Queue.Full"异常;
    2. 如果block值为False,消息列队如果没有空间可写入,则会立刻抛出"Queue.Full"异常;

  Queue.put_nowait(item):相当Queue.put(item, False);

原文地址:https://www.cnblogs.com/mswei/p/9325448.html