并发编程之进程

一、进程概述

(一)进程的基本概念

1、什么是进程?

  在说进程前,需要了解一下操作系统cpu是如何来执行任务的。cpu在某一时刻只能执行一个任务,只是由于cpu执行任务的速度较快,来回切换不同的任务,才会给我们感觉像是同一时刻执行多个任务一样。而这里cpu执行的任务就是所说的进程,可以理解为cpu某一时刻执行的一个过程、任务,它是一个抽象的概念。

  进程就是一个程序在一个数据集上的一次动态执行过程。举个例子,当你点开电脑上的某个程序,这个程序的执行过程就是一个进程;当你点击IE浏览器,浏览器软件并不是进程,但是它运行的过程就是一个进程。

2、进程的特点?

进程的内存是相互独立的,每一个进程是由三部分组成的,分别为:程序、数据集、进程控制块。

  • 程序

描述进程需要完成的功能以及如何完成

  • 数据集

在程序执行过程中所需要的资源

  • 进程控制块

描述进程的变化过程,系统基于此对进程进行管理和控制

3、为什么需要使用进程?

  进程是为了解决CPU的浪费问题,是为了让不同程序不断的切换,提高CPU的利用率,并且进程是最小的资源单位。cpu在执行任务时可以不断切换,减少等待时间,避免不必要的浪费。说的更直接一些就是进程可以实现并发。

(二)进程相关的其它概念

1、并发与并行

  • 并发

在资源有限的情况下,轮流交替的使用资源,比如一核cpu的电脑打开word和酷狗音乐的过程就是属于交替来执行这两个进程,最后打开这两个软件。

  • 并行

在资源充足的情况下,同时进行两项或者多项任务,比如四核cpu的电脑打开word和酷狗音乐的过程,这两个进程可以分别独占一个cpu。

总结:并发,在某一时刻有多个任务同时进行,这要求必须有多个处理器;并行,在一段时间内可以看出多个任务同时进行的。

2、同步与异步

  • 同步

当进程执行某个请求时,必须等到返回值才会进行后续的动作。

  • 异步

当进程执行某个请求时,无须等待,直接进行下一步动作,当有消息返回时系统会通知进行处理。

(三)Python实现多进程

1、Process实现多进程

from multiprocessing import Process
import time

def task(url):
    time.sleep(1)
    print(url,time.ctime())

url_list=[
    'https://www.baidu.com',
    'https://www.zhihu.com',
    'https://www.163.com',
]

if __name__ == '__main__':
    p_list = []
    for url in url_list:
        p = Process(target=task,args=(url,))
        p_list.append(p) #为join方法做准备
        p.start()
    for i in p_list:
        i.join() #子进程执行结束后才能接着主进程

  在上述代码中,循环url_list创建了3个子进程,传入的参数分别是执行函数和函数参数,创建3个Process实例,使用start启动,join方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。

注意:上述一个有4个进程,主进程就是这个程序运行的本身,另外3个子进程就是根据url_list创建的。

进程的创建是非常耗费资源的,相当于拷贝了一份主进程的空间,这样进程之间的内存各个都是独立的。

  虽然耗费资源,但是它也保证了并发,每一个进程中至少有一个线程,GIL锁允许cpu同一时刻处理一个线程,上述同一时刻3个进程就可以有三个线程被处理,如果cpu的数量足够(比如四核cpu),这就是并行了。

2、进程池实现多进程

如果需要启动大量的子进程,可以用进程池的方式批量创建子进程:

from multiprocessing import Pool
import time

def task(url):
    time.sleep(1)
    print(url,time.ctime())

url_list=[
    'https://www.baidu.com',
    'https://www.zhihu.com',
    'https://www.163.com',
]

if __name__ == '__main__':
    p_list = []
    p=Pool(4) #Pool对象,最多同时执行4个进程,cpu的个数
    for url in url_list:
        p.apply_async(task, args=(url,))
        p_list.append(p)
    print('Waiting for all subprocesses done...')
    p.close() #调用join()之前必须先调用close()
    for i in p_list:
        i.join() #等待所有的子进程执行完
    print('All subprocesses done.')

二、Process类及进程池

(一)属性、方法

1、属性

  daemon:默认值为False,如果设为True,代表进程为后台运行的守护进程,当守护进程的父进程终止时,守护也随之终止,并且设定为True后,守护不能创建自己的新进程,必须在守护进程运行之前设置

  name:进程名字。

  pid:进程号。

2、构造方法

Process([group [, target [, name [, args [, kwargs])

  group: 线程组,目前还没有实现,库引用中提示必须是None; 
  target: 要执行的方法; 
  name: 进程名; 
  args/kwargs: 要传入方法的参数。

3、实例方法

  is_alive():返回进程是否在运行。

  join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。

  start():进程准备就绪,等待CPU调度

  run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。

  terminate():不管任务是否完成,立即停止工作进程

(二)进程池

  • 常用方法

  apply_async(func[, args[, kwds]]) :使用非阻塞方式调用func(并行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程),args为传递给func的参数列表,kwds为传递给func的关键字参数列表;

  close():关闭Pool,使其不再接受新的任务;

  terminate():不管任务是否完成,立即终止;

  join():主进程阻塞,等待子进程的退出, 必须在close或terminate之后使用;

三、concurrent.futures模块

  • 该模块为了并行任务提高更高级别的结构
  • 为了执行异步调用该模块既可以实现进程池也可以实现线程池
from concurrent.futures import ProcessPoolExecutor
import time

def task(url):
     time.sleep(1)
     print(url,time.ctime())
pool=ProcessPoolExecutor(4)

url_list=[
    'https://www.baidu.com',
    'https://www.zhihu.com',
    'https://www.163.com',
]

for url in url_list:
    pool.submit(task,url) #相当于apply_async()异步方法

pool.shutdown() #相当于close和join方法

值得注意的是,使用这个模块执行submit方法可以将future接收,传入到回调函数单独处理结果。

####加入回调函数,对返回的结果在单独的一个函数中进行处理####
from concurrent.futures import ProcessPoolExecutor
import requests

def task(url):
    """
    执行任务返回结果
    :param url: 
    :return: 
    """
    response=requests.get(url)
    return response
    #对返回狗的结果进行处理

############done为回调函数,task执行的结果返回给future,将结果与之后的动作分离开来#################
def done(future,*args,**kwargs):
    """
    获取结果并且进行处理
    :param future: 
    :param args: 
    :param kwargs: 
    :return: 
    """
    response=future.result() 
    print(response)


pool=ProcessPoolExecutor(4) 

url_list=[
    'https://www.baidu.com',
    'https://www.zhihu.com',
    'https://www.163.com',
]
for url in url_list:
    res=pool.submit(task,url) #接收future 相当于apply_async()异步方法
    res.add_done_callback(done) #将future传入到回调函数

pool.shutdown() #相当于close和join方法
加入回调函数

四、进程间通讯

进程中内存是保持独立的,数据也是各自持有一份,默认是无法共享的。

(一)进程队列Queue

import multiprocessing
import time

def handle_message(q,i):
    time.sleep(1)
    q.put(i)

if __name__ == '__main__':
    q = multiprocessing.Queue()

    p_list = []
    for i in range(3):
        p = multiprocessing.Process(target=handle_message,args=(q,i))
        p.start()
        p_list.append(p)

    for j in p_list:
        j.join()

    print(q.get())
    print(q.get())
    print(q.get())

#输出
#0
#2
#1

(二)管道

管道中的数据时单向流动,所以如果要建立进程之间的通讯,则需要2个管道,这种通讯方式只能在亲戚俄关系的进程间使用,比如父子进程。

import multiprocessing

def handle_message(conn):
    conn.send("I am your son!")
    res = conn.recv()
    print('child_res',res)
    conn.close()


if __name__ == '__main__':
    parent_conn,child_conn = multiprocessing.Pipe()
    p = multiprocessing.Process(target=handle_message,args=(child_conn,))
    p.start() #开始子进程
    parent_res = parent_conn.recv()
    print('parent_res',parent_res)
    parent_conn.send("I am your father!")
    p.join() #等待子进程结束,再进行主进程
    
#####输出####
#parent_res I am your son!
#child_res I am your father!

(三)Managers

Managers实现了进程间数据的共享,Queue以及Pipe只是实现了数据的交互,即一个进程去更改另一个进程的数据。

from multiprocessing import Manager,Process
import os

def handle_message(d,l):
    d[os.getpid()] = os.getpid()
    l.append(os.getpid())
    print('handle_message d',d)
    print('handle_message l',l)


if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict() #生成一个字典,可在多个进程间共享、传递
        l = manager.list() #生成一个列表,可在多个进程间共享、传递

        p_list = []
        for i in range(3):
            p = Process(target=handle_message,args=(d,l))
            p.start()
            p_list.append(p)

        for j in p_list: #等待子进程结果
            j.join()

        print(d)
        print(l)

#输出
"""
handle_message d {7856: 7856}
handle_message l [7856]
handle_message d {7856: 7856, 8052: 8052}
handle_message l [7856, 8052]
handle_message d {7856: 7856, 8168: 8168, 8052: 8052}
handle_message l [7856, 8052, 8168]
{7856: 7856, 8168: 8168, 8052: 8052}
[7856, 8052, 8168]
"""

五、进程同步

当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题。

(一)multiprocessing.Lock 同步锁/互斥锁

这就相当于资源在一个房间里,所有的进程都在房子外等着,房子外面挂着一把钥匙,谁拿着钥匙就进去,直到钥匙被归还,另一个进程再拿着钥匙进去。

from multiprocessing import Lock,Process

def handle(i,lock):
    lock.acquire()#拿到钥匙
    print(i)
    lock.release() #归还钥匙


if __name__ == '__main__':
    lock = Lock()

    for i in range(5):
        p = Process(target=handle,args=(i,lock))
        p.start()

死锁:当多个进程抢夺同一个资源而造成互相等待的现象,若无外力作用将会一直持续下去,此时系统处于死锁状态。

(二)multiprocessing.Semaphore 信号量

同步锁允许一个进程更改数据的,但是信号量允许一定数量的进程更改数据。

信号量同步是基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1;当计数器为0时,acquire()调用被阻塞,直到有进程调用release()。

值得注意的是信号量也是锁,只是在内部加了一个计算器。

在火车站内需要对顾客进行检查,假设每次只能检查4个人,然后检查完毕的人再换下个顾客来检查,这里每次检查4个人就是信号量可以并发处理4个进程。

from multiprocessing import Semaphore,Process
import time

def check_person(sem,i):
    sem.acquire() #4个人,相当于4个进程取下钥匙
    print("%s走进检查室"%i,time.ctime())
    print(time.sleep(1))
    print("%s检查完毕"%i,time.ctime())
    sem.release() #释放钥匙,下一个进程取下钥匙

if __name__ == '__main__':
    sem = Semaphore(4) #信号量最多同时处理4个进程

    for i in range(15):
        p = Process(target=check_person,args=(sem,i,))
        p.start()

####输出####
"""
0走进检查室 Fri Sep 27 17:53:23 2019
1走进检查室 Fri Sep 27 17:53:23 2019
3走进检查室 Fri Sep 27 17:53:23 2019
5走进检查室 Fri Sep 27 17:53:23 2019
None
0检查完毕 Fri Sep 27 17:53:24 2019
4走进检查室 Fri Sep 27 17:53:24 2019
None
1检查完毕 Fri Sep 27 17:53:24 2019
6走进检查室 Fri Sep 27 17:53:24 2019
None
3检查完毕 Fri Sep 27 17:53:24 2019
...
"""

(三)multiprocessing.Event 事件

1、多进程事件运行的机制

全局定义了一个Flag,如果Flag值为 False,当程序执行event.wait()方法时就会阻塞,如果Flag值为True时,程序执行event.wait()方法时不会阻塞继续执行。

 2、事件的方法

  • wait()

wait是否阻塞是看event对象内部的Flag的值

  • set()

将Flag改为True,解除阻塞

  • clear()

将Flag改为False,继续阻塞

  • is_set()

判断当前的Flag的值

3、事件的使用

以汽车过红绿灯为例,当为红灯时,汽车不能通过十字路,当为绿灯时,汽车是可以通过十字路:

from multiprocessing import Event,Process
import time
import random

def cars(e,i):
    if not e.is_set():
        print("%s车等待在十字路口"%i)
        e.wait()  #阻塞,直到flag变成True
    else:
        print("%s车通过了十字路口"%i)

def light(e):
    while True:
        if e.is_set():#绿灯
            e.clear() #将flag改为False,进程阻塞,
            print("红灯亮了")
        else: #默认走else,因为默认flag是False
            e.set() #将flag改为True,此时执行car进程,绿灯车通过
            print("绿灯亮了")

if __name__ == '__main__':
    e = Event() #默认为False,红灯亮
    l = Process(target=light,args=(e,)) #红绿灯进程
    l.start() #启动红绿灯进程

    car_list = []
    for i in range(10): #10辆车过红绿灯
        time.sleep(random.random())
        car = Process(target=cars,args=(e,i,))
        car.start()
        car_list.append(car)
    [car.join() for car in car_list]

在这里,汽车进程中首先需要判断e.is_set(),也就是事件的状态,然后改变事件的状态,通过红绿灯(信号)进程改变这个状态,当flag为True时解除阻塞状态,绿灯行。

在这个实例中使用事件:

  • 创建事件对象
  • 分别将其传入红绿灯进程和汽车进程对象
  • 红绿灯(信号)进程控制flag状态
  • 汽车进程依据flag状态做出相应的反应

参考:

https://www.cnblogs.com/yuanchenqi/articles/6248025.html

https://www.cnblogs.com/zhangfengxian/p/python-process-pool.html

原文地址:https://www.cnblogs.com/shenjianping/p/11593606.html