Python开发之进程

进程介绍

1、基本概念

狭义定义:进程是正在运行的程序的实例。

广义定义:进程是一个具有一定独立功能的程序关于某个数据集合的一次运行活动。它是操作系统动态执行的基本单元,在传统的操作系统中,进程既是基本的分配单元,也是基本的执行单元。

进程的概念:

第一,进程是一个实体。每一个进程都有它自己的地址空间,一般情况下,包括文本区域(text region)、数据区域(data region)和堆栈(stack region)。文本区域存储处理器执行的代码;数据区域存储变量和进程执行期间使用的动态分配的内存;堆栈区域存储着活动过程调用的指令和本地变量。

第二,进程是一个“执行中的程序”。程序是一个没有生命的实体,只有处理器赋予程序生命时(操作系统执行之),它才能成为一个活动的实体,我们称其为进程。

进程是操作系统中最基本、重要的概念。是多道程序系统出现后,为了刻画系统内部出现的动态情况,描述系统内部各道程序的活动规律引进的一个概念,所有多道程序设计操作系统都建立在进程的基础上。

2、进程的并发与并行

并行: 并行是指两者同时执行,比如赛跑,两个人都在不停的往前跑;(资源够用,比如三个线程,四核的CPU )

并发: 并发是指资源有限的情况下,两者交替轮流使用资源,比如一段路(单核CPU资源)同时只能过一个人,A走一段后,让给B,B用完继续给A ,交替使用,目的是提高效率。

区别:并行是从微观上,也就是在一个精确的时间片刻,有不同的程序在执行,这就要求必须有多个处理器。

           并发是从宏观上,在一个时间段上可以看出是同时执行的,比如一个服务器同时处理多个session。

3、进程的状态图

(1)就绪(Ready)状态:当进程已分配到除CPU以外的所有必要的资源,只要获得处理机便可立即执行,这时的进程状态称为就绪状态。

(2)执行/运行(Running)状态当进程已获得处理机,其程序正在处理机上执行,此时的进程状态称为执行状态。

(3)阻塞(Blocked)状态正在执行的进程,由于等待某个事件发生而无法执行时,便放弃处理机而处于阻塞状态。引起进程阻塞的事件可有多种,例如,等待I/O完成、申请缓冲区不能满足、等待信件(信号)等。

4、同步和异步

所谓同步就是一个任务的完成需要依赖另外一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成,这是一种可靠的任务序列。要么成功都成功,失败都失败,两个任务的状态可以保持一致。

所谓异步是不需要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工作,依赖的任务也立即执行,只要自己完成了整个任务就算完成了。至于被依赖的任务最终是否真正完成,依赖它的任务无法确定,所以它是不可靠的任务序列

5、阻塞/非阻塞

 阻塞和非阻塞这两个概念与程序(线程)等待消息通知(无所谓同步或者异步)时的状态有关。也就是说阻塞与非阻塞主要是程序(线程)等待消息通知时的状态角度来说的

进程的创建与结束

1、进程的创建

对于通用系统(跑很多应用程序),需要有系统运行过程中创建或撤销进程的能力,主要分为4中形式创建新的进程:

  1. 系统初始化(查看进程linux中用ps命令,windows中用任务管理器,前台进程负责与用户交互,后台运行的进程与用户无关,运行在后台并且只在需要时才唤醒的进程,称为守护进程,如电子邮件、web页面、新闻、打印)

  2. 一个进程在运行过程中开启了子进程(如nginx开启多进程,os.fork,subprocess.Popen等)

  3. 用户的交互式请求,而创建一个新进程(如用户双击暴风影音)

  4. 一个批处理作业的初始化(只在大型机的批处理系统中应用)

无论哪一种,新进程的创建都是由一个已经存在的进程执行了一个用于创建进程的系统调用而创建的。

 2、进程的结束

  • 正常退出(自愿,如用户点击交互式页面的叉号,或程序执行完毕调用发起系统调用正常退出,在linux中用exit,在windows中用ExitProcess)
  • 出错退出(自愿,python a.py中a.py不存在)
  • 严重错误(非自愿,执行非法指令,如引用不存在的内存,1/0等,可以捕捉异常,try...except...)
  • 被其他进程杀死(非自愿,如kill -9)

Python中进程的操作

multiprocess模块:multiprocess不是一个模块而是python中一个操作、管理进程的包。 之所以叫multi是取自multiple的多功能的意思,在这个包中几乎包含了和进程有关的所有子模块。由于提供的子模块非常多,为了方便大家归类记忆,将这部分大致分为四个部分:创建进程部分,进程同步部分,进程池部分,进程之间数据共享。

1、process模块:process模块是一个创建进程的模块,借助这个模块,就可以完成进程的创建。

小试牛刀

from multiprocessing import Process

def func():
    print('process')

if __name__ == '__main__':
    p = Process(target=func,)  # 这个过程叫注册,p是一个进程对象,还没有启动进程
    p.start()

带参的Process

from multiprocessing import Process
import os
def func(args):
    print('process',args)
    print('子进程', os.getpid())

if __name__ == '__main__':
    p = Process(target=func,args=(1,))  # 这个过程叫注册,p是一个进程对象,还没有启动进程
    p.start()
    print('父进程',os.getpid())
Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)

强调:
1. 需要使用关键字的方式来指定参数
2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号

参数介绍:
1 group参数未使用,值始终为None
2 target表示调用对象,即子进程要执行的任务
3 args表示调用对象的位置参数元组,args=(1,2,'egon',)
4 kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}
5 name为子进程的名称
1 p.start():启动进程,并调用该子进程中的p.run() 
2 p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法  
3 p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
4 p.is_alive():如果p仍然运行,返回True
5 p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程  

注意:

在Windows操作系统中由于没有fork(linux操作系统中创建进程的机制),在创建子进程的时候会自动 import 启动它的这个文件,而在 import 的时候又执行了整个文件。因此如果将process()直接写在文件中就会无限递归创建子进程报错。所以必须把创建子进程的部分使用if __name__ ==‘__main__’ 判断保护起来,import 的时候 ,就不会递归运行了。

开启了子进程的主进程:子进程自己的代码如果长,等待自己的代码执行结束;子进程的执行时间长,主进程会在主进程代码执行完毕之后等待子进程执行完毕之后,主进程才结束。

2、多进程中的几个方法

  • join()方法:感知一个子进程的结束,从异步变成同步。
from multiprocessing import Process
import os
import time
def func(arg1,arg2):
    print('process',arg1)
    time.sleep(1)
    print('process',arg2)
    print('子进程', os.getpid())

if __name__ == '__main__':
    p = Process(target=func,args=(10,20))  # 这个过程叫注册,p是一个进程对象,还没有启动进程
    p.start()
    print('wahahah') # 这里是异步的
    p.join()
    print('运行完了!!!!') # 子进程运行完后,才执行这句,就可以使用join()
from multiprocessing import Process
import os
import time
def func(arg1,arg2):
    print('process',arg1)
    time.sleep(1)
    print('process',arg2)

if __name__ == '__main__':
   lst = []
   for i in range(10):
       p = Process(target=func,args=(i,i*10))
       p.start()
       lst.append(p)
   [i.join() for i in lst]
   print('运行完了')
  • 第二种开启进程的方法
from multiprocessing import Process

class MyProcess(Process):
    def __init__(self,arg):
        super().__init__()
        self.arg = arg

    def run(self):
        print('hahahha',self.arg)

if __name__ == '__main__':
    p1 = MyProcess(10)
    p1.start()
'''
1、自定义类继承Process类
2、必须实现一个run方法,run方法中是在子进程中执行的代码
'''

3、进程之间的数据隔离问题

'''
多进程之间的数据隔离问题
'''
from multiprocessing import Process

def func():
    global n # 声明了一个全局变量
    n = 0 # 重新定义了n
    print('>>>',n) # 0

if __name__ == '__main__':
    n = 100
    p = Process(target=func)
    p.start()
    p.join()
    print('<<<',n) # 100

4、守护进程

会随着主进程的结束而结束。

主进程创建守护进程

  其一:守护进程会在主进程代码执行结束后就终止

  其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children

注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止

''
守护进程会随着主进程的代码执行完毕而结束。
'''
from multiprocessing import Process
import time

def func():
    while True:
        time.sleep(0.5)
        print('I am living....')

def func2():
    print('in func2')
    time.sleep(8)
    print('func2 finished')

if __name__ == '__main__':
    p = Process(target=func, )
    p.daemon = True # 设置子进程为守护进程
    p.start()

    p1 = Process(target=func2())
    p1.start()
    
    i = 0
    while i<10:
        print('this is socket server')
        time.sleep(1)
        i+=1

5、锁LOCK

当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题。

加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
虽然可以用文件共享数据实现进程间通信,但问题是:
1.效率低(共享数据基于文件,而文件是硬盘上的数据)
2.需要自己加锁处理

因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。
队列和管道都是将数据存放于内存中
队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,
我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。
'''
模拟抢票
'''
from multiprocessing import Process,Lock
import json,time

def show(i):
    with open('ticket') as f:
        dic = json.load(f)
    print('余票:%s'%dic['ticket'])

def buy_ticket(i,lock):
    lock.acquire() # 拿钥匙
    with open('ticket') as f:
        dic = json.load(f)
        time.sleep(0.1)
    if dic['ticket']>0:
        dic['ticket'] -=1
        print('%s买到票了'%i)
    else:
        print('%s来晚了,没买着票'%i)

    time.sleep(1)
    with open('ticket','w') as f:
        json.dump(dic,f)
    lock.release() # 还钥匙

if __name__ == '__main__':
    for i in range(10):
        p = Process(target=show,args=(i,))
        p.start()
    lock = Lock() # 给买票加锁
    for i in range(10):
        p1 = Process(target=buy_ticket,args=(i,lock))
        p1.start()

6、信号量

互斥锁同时只允许一个进程更改数据,而信号量Semaphore是同时允许一定数量的进程更改数据 。

信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞。这是迪科斯彻(Dijkstra)信号量概念P()和V()的Python实现。信号量同步机制适用于访问像服务器这样的有限资源信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念

from multiprocessing import Process,Semaphore
import time,random

def ktv(i,sem):
    sem.acquire() # 获取钥匙
    print('%s走进ktv'%i)
    time.sleep(random.randint(6,10))
    print('%s走出ktv'%i)
    sem.release()

if __name__ == '__main__':
    sem = Semaphore(4) # Semaphore同样也是利用Lock的机制。
    for i in range(10):
        p = Process(target=ktv,args=(i,sem))
        Semaphore()
        p.start()

7、事件

通过一个信号来控制多个进程同时执行或者阻塞。

from multiprocessing import  Process,Event

# 一个信号可以使所有的信号进入阻塞状态,也可以控制所有的进程解除阻塞,一个事件被创建之后,默认就是阻塞状态。
e = Event() # 创建一个事件
print(e.is_set()) # 查看一个事件的状态,默认被设置成阻塞。
print(123)
e.set() # 将这个事件的状态改为True
e.wait() # 是根据e.is_set()的值来决定是否是阻塞的。
print(123456)

e.clear() # 将这个事件的状态改为False

队列

创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递

基本方法:

Queue([maxsize]) 
创建共享的进程队列。maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。底层队列使用管道和锁定实现。另外,还需要运行支持线程以便队列中的数据传输到底层管道中。 
Queue的实例q具有以下方法:

q.get( [ block [ ,timeout ] ] ) 
返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。

q.put(item [, block [,timeout ] ] ) 
将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。

q.qsize() 
返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。


q.empty() 
如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。

q.full() 
如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。。

代码验证:

from multiprocessing import Queue

q = Queue(5)
print(q.qsize()) # 0
q.put(1)
q.put(11)
q.put(111)
q.put(1111)
q.put(11111)
print(q.full()) # 看队列是否满了True
print(q.qsize()) # 大小是5
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.empty()) #看队列是否空了

进程间通信例子:

from multiprocessing import Process,Queue
'''
实现两个子进程之间的通信
'''
def produce(q): #生产
    q.put('haha')

def consume(q): #消费
    print(q.get())

if __name__ == '__main__':
    q = Queue()
    p = Process(target=produce,args=(q,))
    p.start()
    p1 = Process(target=consume,args=(q,))
    p1.start()

队列中重要的模型:消费者生产者模型

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

 生产者消费者代码一

from multiprocessing import Process,Queue
import time,random
'''
生产者:进程
消费者:进程,两者互不影响
队列是进程安全的
'''
def consume(q,name):
    while True:
        food = q.get()
        if food is None:
            break       #收到结束信号就结束
        print('%s消费了%s'%(name,food))
        time.sleep(random.randint(1,3))

def producer(name,food,q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        f = '%s生产了%s%s'%(name,food,i)
        print(f)
        q.put(f)

if __name__ == '__main__':
    q = Queue(20)
    p = Process(target=producer,args=('jack','包子',q))
    p1 = Process(target=producer, args=('pp', '馒头', q))
    c1 = Process(target=consume, args=(q,'ap'))
    p.start()
    p1.start()
    c1.start()

    p.join() # 用来感知生产者的结束
    p1.join()
    q.put(None)
    q.put(None)

生产者消费者代码二

from multiprocessing import Process,JoinableQueue
import time,random
'''
生产者:进程
消费者:进程,两者互不影响
队列是进程安全的
'''
def consume(q,name):
    while True:
        food = q.get()
        print('%s消费了%s'%(name,food))
        time.sleep(random.randint(1,3))
        q.task_done() #count - 1

def producer(name,food,q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        f = '%s生产了%s%s'%(name,food,i)
        print(f)
        q.put(f)
    q.join() #阻塞,感知一个队列中的数据全部被执行完毕

if __name__ == '__main__':
    q = JoinableQueue(20)
    p = Process(target=producer,args=('jack','包子',q))
    p1 = Process(target=producer, args=('pp', '馒头', q))
    c1 = Process(target=consume, args=(q,'ap'))
    p.start()
    p1.start()
    c1.daemon=True #设置为守护进程,主进程中的代码执行完毕之后,子进程自动结束
    c1.start()
    p.join() # 用来感知进程的结束
    p1.join()
'''
消费者端:
每次获取一个数据,处理一个数据,发送一个记号,标志一个数据被处理成功
生产者端:
每一次生产一个数据,且每一次生产的数据都放在队列中,在队列中刻上一个记号,当生产者全部生产完毕,发送join信号
停止生产数据了,且要等待之前被刻上的记号全部被消费完,当数据都被处理完时,join阻塞结束。
===
consume中把所有的任务消耗完
producer端的join感知到,停止阻塞
所有的producer进程结束
主进程中的p.join()结束
主进程中代码结束
守护进程(消费者的进程)结束
''' 

 进程池

定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果。

multiprocess.Pool模块

Pool([numprocess [,initializer [, initargs]]]):创建进程池

1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值
2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None
3 initargs:是要传给initializer的参数组
from multiprocessing import Pool

def func(num):
    print('this is the pool')
    for i in range(10):
        print(num + i)

if __name__ == '__main__':

    p = Pool(3)  # 3个进程
    p.map(func,range(10)) # 10个任务,第二个参数必须是可迭代的。
    
from multiprocessing import Pool
import time,os
def func(n):
    print('start func%s'%n,os.getpid())
    time.sleep(1)
    print('end func%s'%n,os.getpid())

if __name__ == '__main__':
    pool = Pool()
    for i in range(10):
        pool.apply_async(func,args=(i,))
    pool.close() # 结束进程池接受任务
    pool.join() # 感知进程池中的任务执行结束
1 p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。
2 '''需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()'''
3 
4 p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。
5 '''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。'''
6    
7 p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
8 
9 P.join():等待所有工作进程退出。此方法只能在close()或teminate()之后调用

进程池的返回值问题

from multiprocessing import Pool

def func(i):
    return i*i

if __name__ == '__main__':
    p = Pool()
    for i in range(10):
        res = p.apply(func,args=(i,))
        print(res)

异步的返回值

from multiprocessing import Pool
import time
def func(i):
    time.sleep(0.5)
    return i*i

if __name__ == '__main__':
    p = Pool()
    res_l = []
    for i in range(10):
        res = p.apply_async(func,args=(i,)) # apply的结果就是func的返回值
        res_l.append(res)
    for res in res_l:print(res.get()) #等着func的计算结果

map异步提交的返回值

from multiprocessing import Pool
import time
def func(i):
    time.sleep(0.5)
    return i*i

if __name__ == '__main__':
    p = Pool()
    res = p.map(func,range(10))
    print(res)

map自带close和join方法。

回调函数

from multiprocessing import Pool

def func1(n):
    print('in func1')
    return n*n

def func2(nn):
    print('in func2')
    print(nn)

if __name__ == '__main__':
    p = Pool()
    p.apply_async(func1,args=(10,),callback=func2)
    p.close()
    p.join()
需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了啊,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数

我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。
回调函数不能传参数,只能等待子进程的返回值传递给回调函数,回调函数是主进程。
from multiprocessing import Pool
import os
def func1(n):
    print('in func1',os.getpid()) # in func1 3052
    return n*n

def func2(nn):
    print('in func2',os.getpid()) # in func2 7164
    print(nn)

if __name__ == '__main__':
    p = Pool()
    p.apply_async(func1,args=(10,),callback=func2)
    print(os.getpid()) # 7164
    p.close()
    p.join()

  

原文地址:https://www.cnblogs.com/crazyforever/p/9465118.html