进程

1. 进程

引入模块

from multiprocessing import Process    #谋定噗rua赛斯听            #噗rua赛斯

进程之间是空间隔离的,不共享资源

进程的两种创建方法

创建进程的第一种方式:

    p1 = Process(target=func1, args=(1,))      #target(他给特)#args(啊渴死)
    p1.start()                        #start四大特

创建进行的第二种方式:

  自己定义一个类,继承Process类,必须写一个run方法,想传参数,自行写init方法,然后执行super父类的init方法

class MyProcess(Process):
    def __init__(self,n,name):
        super().__init__()
        self.n = n
        self.name = name
    def run(self):
        # print(1+1)
        # print(123)
        print('子进程的进程ID',os.getpid())
        print('你看看n>>',self.n)

if __name__ == '__main__':
    p1 = MyProcess(100,name='子进程1')
    p1.start() #给操作系统发送创建进程的指令,子进程创建好之后,要被执行,执行的时候就会执行run方法
    print('p1.name',p1.name)
    print('p1.pid',p1.pid)
    print('主进程结束')

 进程的其他方法 

import time
from multiprocessing import Process
def func1():
    time.sleep(2)
    print()
    print('子进程')
if __name__ == '__main__':
    p1 = Process(target=func1,)
    p1.start()
    p1.terminate() # 给操作系统发了一个关闭p1子进程的信号,关闭进程        terminate(涛妹雷特)
    time.sleep(1)              四离谱
    print('进程是否还活着:',p1.is_alive())#判断进程是否存活          is_alive(A子蓝付)
    print(p1.pid)
    print('主进程结束')
class MyProcess(Process):
    def __init__(self,n,name):
        super().__init__()
        self.n = n
        self.name = name
    def run(self):
        # print(1+1)
        # print(123)
        print('子进程的进程ID',os.getpid())
        print('你看看n>>',self.n)
if __name__ == '__main__':
    p1 = MyProcess(100,name='子进程1')
    p1.start() #给操作系统发送创建进程的指令,子进程创建好之后,要被执行,执行的时候就会执行run方法
    print('p1.name',p1.name)
    print('p1.pid',p1.pid)
    print('主进程结束')
name和pid和ppid

join方法

global_num = 100
def func1():
    time.sleep(2)
    global global_num
    global_num = 0
    print('子进程全局变量>>>',global_num)
if __name__ == '__main__':
    p1 = Process(target=func1,)
    p1.start()
    print('子进程执行')
    #time.sleep(3)
    p1.join()  #阻塞住,等待你的p1子进程执行结束,主进程的程序才能从这里继续往下执行
    print('主进程的全局变量>>>',global_num)
验证join方法
global_num = 100
def func1():
    start_time = time.time()
    time.sleep(2)
    global global_num
    global_num = 0
    print('子进程全局变量>>>',global_num)
    end_time = time.time()
    print(end_time - start_time)
if __name__ == '__main__':
    p1 = Process(target=func1,)
    p1.start()
    print('子进程执行')
    #time.sleep(3)
    p1.join()  #阻塞住,等待你的p1子进程执行结束,主进程的程序才能从这里继续往下执行
    print('主进程的全局变量>>>',global_num)
join方法验证代码执行时间的
#for循环在创建进程中的应用
def fun1(n):
    time.sleep(1)
    print(n)

if __name__ == '__main__':
    pro_list = []
    for i in range(10):
        p1 = Process(target=fun1,args=(i,))
        p1.start()
        pro_list.append(p1)

    for p in pro_list:
        p.join()
    print('主进程结束')

守护进程   一定要在start之前设置守护进程

import time
import os
from multiprocessing import Process
def func1():
    time.sleep(5)
    print(os.getpid())
    print('子进程')

if __name__ == '__main__':
    p1 = Process(target=func1,)
    p1.daemon = True#将p1子进程设置为守护进程        daemon(地们)
    p1.start()
    # print('主进程的ID',os.getpid())
    print('主进程结束')

三、进程同步(锁)

问题1:为什么要加进程锁?

       线程锁是为了在线程不安全的时候,为一段代码加上锁来控制实现线程安全,即线程间数据隔离;

       进程间的数据本来就是隔离的,所以一般不用加锁,当进程间共用某个数据的时候需要加锁;

Look(唠嗑)模块    acquire(额快也)#加锁    release(蕊李四)还锁

加锁的另种形式:     with

import json
import time
import random
from multiprocessing import Process,Lock

def get_ticket(i,ticket_lock):
    print('我们都到齐了,大家预备!!123')
    #所有进程异步执行,到这里等待,同时再去抢下面的代码执行
    time.sleep(1)
    ticket_lock.acquire()  #这里有个门,只有一个人能够抢到这个钥匙,加锁
    with open('ticket','r') as f:
        #将文件数据load为字典类型的数据
        last_ticket_info = json.load(f)
    #查看一下余票信息
    last_ticket = last_ticket_info['count']
    #如果看到余票大于0,说明你可以抢到票
    if last_ticket > 0:
        #模拟网络延迟时间
        time.sleep(random.random())
        #抢到一张票就减去1
        last_ticket = last_ticket - 1
        last_ticket_info['count'] = last_ticket
        #将修改后的票数写回文件
        with open('ticket','w') as f:
            #通过json.dump方法来写回文件,字符串的形式
            json.dump(last_ticket_info,f)
        print('%s号抢到了,丫nb!' % i)
    else:
        print('%s号傻逼,没票啦,明年再来!' % i)
    #释放锁,也就是还钥匙的操作
    ticket_lock.release()
if __name__ == '__main__':
    #创建一个锁
    ticket_lock = Lock()
    for i in range(10):
        #将锁作为参数传给每个进程,因为每个进程都需要通过锁来进行限制,同步
        p = Process(target=get_ticket,args=(i,ticket_lock,))
        p.start()
模拟抢票

四、信号量(了解)    

Semaphore(赛呢fao)

import time
import random
from multiprocessing import Process,Semaphore

def dbj(i,s):
    s.acquire()
    print('%s号男主人公来洗脚'%i)
    print('-------------')
    time.sleep(random.randrange(3,6))
    # print(time.time())
    s.release()

if __name__ == '__main__':
    s = Semaphore(4) #创建一个计数器,每次acquire就减1,直到减到0,那么上面的任务只有4个在同时异步的执行,后面的进程需要等待.
    for i in range(10):
        p1 = Process(target=dbj,args=(i,s,))
        p1.start()
洗脚

五、事件(了解)

Event(额外特)

e.clear(可累儿)将e改成False

e.wait(威特)等待

e.set(赛特)将e是True

e.is_set()查看当前e的状态是True或者False

import time
import random
from multiprocessing import Process,Event

#模拟红绿灯执行状态的函数
def traffic_lights(e):
    while 1:
        print('红灯啦')
        time.sleep(5)
        e.set()
        print('绿灯亮')
        time.sleep(3)
        e.clear()  #将e改为了False
def car(i,e):

    if not e.is_set(): #新来的车看到是红灯
        print('我们在等待.....')
        e.wait()
        print('走你')
    else:
        print('可以走了!!!')

if __name__ == '__main__':
    e = Event()
    hld = Process(target=traffic_lights,args=(e,))
    hld.start()
    while 1:
        time.sleep(0.5)
        #创建10个车
        for i in range(3):
            # time.sleep(random.randrange(1,3))
            p1 = Process(target=car,args=(i,e,))
            p1.start()
红绿灯
复制代码
from multiprocessing import Process,Semaphore,Event
import time,random

e = Event() #创建一个事件对象
print(e.is_set())  #is_set()查看一个事件的状态,默认为False,可通过set方法改为True
print('look here!')
# e.set()          #将is_set()的状态改为True。
# print(e.is_set())#is_set()查看一个事件的状态,默认为False,可通过set方法改为Tr
# e.clear()        #将is_set()的状态改为False
# print(e.is_set())#is_set()查看一个事件的状态,默认为False,可通过set方法改为Tr
e.wait()           #根据is_set()的状态结果来决定是否在这阻塞住,is_set()=False那么就阻塞,is_set()=True就不阻塞
print('give me!!')

#set和clear  修改事件的状态 set-->True   clear-->False
#is_set     用来查看一个事件的状态
#wait       依据事件的状态来决定是否阻塞 False-->阻塞  True-->不阻塞
事件

六、进程间通信 IPC(重要)

  队列:(重点)

full(fao)

from multiprocessing import Process,Queue
#先进后出
'''
要注意的事项
q.full()队列满了返回True,不满返回False
队列为空的时候,get会阻塞          盖特
put超出了队列长度,你put插入数据的时候会阻塞     破特
print('>>>',q.empty())  #不可信,队列空了返回True,不为空返回False
'''
q = Queue(3)
q.put(1)#往队列里写入3个数据
q.put(2)
q.put(3)
print(q.get())#获取值
#可以用异常踹一下
while 1:
    try:
        q.get(False)  #queue.Empty
    except:
        print('队列目前是空的')

队列实现进程间的通信

import time
from multiprocessing import Process,Queue

def girl(q):
    print('来自boy的消息',q.get())
    print('来自校领导的凝视',q.get())
def boy(q):
    q.put('约吗')

if __name__ == '__main__':
    q = Queue(5)
    boy_p = Process(target=boy,args=(q,))
    girl_p = Process(target=girl,args=(q,))
    boy_p.start()
    girl_p.start()
    time.sleep(1)
    q.put('好好工作,别乱搞')
来自校领导的凝视

生产者消费者模型

import time
from multiprocessing import Process,Queue

def producer(q):
    for i in range(1,11):
        time.sleep(1)
        print('生产了包子%s号' % i)
        q.put(i)
    q.put(None)  #针对第三个版本的消费者,往队列里面加了一个结束信号
def consumer(q):
    while 1:
        time.sleep(2)
        s = q.get()
        if s == None:
            break
        else:
            print('消费者吃了%s包子' % s)

if __name__ == '__main__':
    #通过队列来模拟缓冲区,大小设置为20
    q = Queue(20)
    #生产者进程
    pro_p = Process(target=producer,args=(q,))
    pro_p.start()
    #消费者进程
    con_p = Process(target=consumer,args=(q,))
    con_p.start()

  

生产者消费者模型主进程发送结束信号

import time
from multiprocessing import Process,Queue

def producer(q):
    for i in range(1,11):
        time.sleep(1)
        print('生产了包子%s号' % i)
        q.put(i)

def consumer(q):
    while 1:
        time.sleep(2)
        s = q.get()
        if s == None:
            break
        else:
            print('消费者吃了%s包子' % s)

if __name__ == '__main__':
    #通过队列来模拟缓冲区,大小设置为20
    q = Queue(20)
    #生产者进程
    pro_p = Process(target=producer,args=(q,))
    pro_p.start()
    #消费者进程
    con_p = Process(target=consumer,args=(q,))
    con_p.start()
    pro_p.join()

    q.put(None)  

JoinableQueue的生产着消费者模型     Joinable(准呢包)Queue   张波Q

 q.task_done(她可死但)     给q对象发送一个任务结束的信号

import time
from multiprocessing import Process,Queue,JoinableQueue

def producer(q):
    for i in range(1,11):
        time.sleep(0.5)
        print('生产了包子%s号' % i)
        q.put(i)
    q.join()
    print('在这里等你')
def consumer(q):
    while 1:
        time.sleep(1)
        s = q.get()
        print('消费者吃了%s包子' % s)
        q.task_done()  #给q对象发送一个任务结束的信号

if __name__ == '__main__':
    #通过队列来模拟缓冲区,大小设置为20
    q = JoinableQueue(20)
    #生产者进程
    pro_p = Process(target=producer,args=(q,))
    pro_p.start()
    #消费者进程
    con_p = Process(target=consumer,args=(q,))
    con_p.daemon = True #
    con_p.start()
    pro_p.join()
    print('主进程结束')  

管道:

  进程间通信(IPC)方式二:管道(了解即可)

 Pipe(牌破)

from multiprocessing import Process,Pipe
def func1(conn1,conn2):
    try:
        msg = conn2.recv()
        print('>>>',msg)
        #如果管道一端关闭了,那么另外一端在接收消息的时候会报错
        msg2 = conn2.recv() #EOFError
    except EOFError:
        print('对方管道一端已经关闭')
        conn2.close()

if __name__ == '__main__':
    conn1, conn2 = Pipe()
    p = Process(target=func1,args=(conn1,conn2,))
    p.start()
    conn1.send('小鬼!')
    conn1.close()
    # conn1.recv()  #OSError: handle is closed
管道报错模拟

数据共享:(了解)

Lock(啦个)

from multiprocessing import Process,Manager,Lock

def func(m_dic,ml):
    #不加锁的情况会出现数据错乱
    # m_dic['count'] -= 1
    #加锁,这是另外一种加锁形式
    with ml:
        m_dic['count'] -= 1

    #等同
    # ml.acquire()
    # m_dic['count'] -= 1
    # ml.release()

if __name__ == '__main__':
    m = Manager()
    ml = Lock()
    m_dic = m.dict({'count':100})
    # print('主进程', m_dic)
    p_list = []
    #开启20个进程来对共享数据进行修改
    for i in range(20):
        p1 = Process(target=func,args=(m_dic,ml,))
        p1.start()
        p_list.append(p1)
    [ppp.join() for ppp in p_list]

    print('主进程',m_dic)
manager不安全

进程池和mutiprocess.Poll(噗奥)

  

为什么要有进程池?进程池的概念。

  在程序实际处理问题过程中,忙时会有成千上万的任务需要被执行,闲时可能只有零星任务。那么在成千上万个任务需要被执行的时候,我们就需要去创建成千上万个进程么?首先,创建进程需要消耗时间,销毁进程(空间,变量,文件信息等等的内容)也需要消耗时间。第二即便开启了成千上万的进程,操作系统也不能让他们同时执行,维护一个很大的进程列表的同时,调度的时候,还需要进行切换并且记录每个进程的执行节点,也就是记录上下文(各种变量等等乱七八糟的东西,虽然你看不到,但是操作系统都要做),这样反而会影响程序的效率。因此我们不能无限制的根据任务开启或者结束进程。就看我们上面的一些代码例子,你会发现有些程序是不是执行的时候比较慢才出结果,就是这个原因,那么我们要怎么做呢?

  在这里,要给大家介绍一个进程池的概念,定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果

p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。
'''需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()'''

p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。
'''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。'''
    
p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成

P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用
主要方法介绍

 apply(额扑来)      apply_async(额扑来森科)   Map(麦坡)  

import time
from multiprocessing import Pool,Process

#针对range(100)这种参数的
# def func(n):
#     for i in range(3):
#         print(n + 1)

def func(n):
    print(n)
    # 结果:
    #     (1, 2)
    #     alex
def func2(n):
    for i in range(3):
        print(n - 1)
if __name__ == '__main__':
    #1.进程池的模式
    s1 = time.time()  #我们计算一下开多进程和进程池的执行效率
    poll = Pool(5) #创建含有5个进程的进程池
    # poll.map(func,range(100)) #异步调用进程,开启100个任务,map自带join的功能
    poll.map(func,[(1,2),'alex']) #异步调用进程,开启100个任务,map自带join的功能
    # poll.map(func2,range(100))  #如果想让进程池完成不同的任务,可以直接这样搞
    #map只限于接收一个可迭代的数据类型参数,列表啊,元祖啊等等,如果想做其他的参数之类的操作,需要用后面我们要学的方法。
    # t1 = time.time() - s1
    #
    # #2.多进程的模式
    # s2 = time.time()
    # p_list = []
    # for i in range(100):
    #     p = Process(target=func,args=(i,))
    #     p_list.append(p)
    #     p.start()
    # [pp.join() for pp in p_list]
    # t2 = time.time() - s2
    #
    # print('t1>>',t1) #结果:0.5146853923797607s 进程池的效率高
    # print('t2>>',t2) #结果:12.092015027999878s
进程池的简单应用以及进程池的效率对比

同步与异步两种执行方式:

同步:

import time
from multiprocessing import Process,Pool


def fun(i):
    time.sleep(0.5)
    # print(i)
    return i**2
if __name__ == '__main__':
    p = Pool(4)
    for i in range(10):
        res = p.apply(fun,args=(i,))  #同步执行的方法,他会等待你的任务的返回结果,
        print(res)

  

异步:

import time
from multiprocessing import Process,Pool

def fun(i):
    time.sleep(1)
    print(i)
    return i**2
if __name__ == '__main__':
    p = Pool(4)
    res_list = []
    for i in range(10):
        res = p.apply_async(fun,args=(i,))  
        res_list.append(res)

    p.close()  # 不是关闭进程池,而是不允许再有其他任务来使用进程池
    p.join()   # 这是感知进程池中任务的方法,进程池中所有的进程随着主进程的结束而结束了,等待进程池的任务全部执行完
    #循环打印结果
    for e_res in res_list:
        print('结果:',e_res.get())

    print('主进程结束')

  

需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数,
这是进程池特有的,普通进程没有这个机制,但是我们也可以通过进程通信来拿到返回值,进程池的这个回调也是进程通信的机制完成的。 我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果
import os
from multiprocessing import Pool

def func1(n):
    print('func1>>',os.getpid())
    # print('func1')
    return n*n


def func2(nn):
    print('func2>>',os.getpid())
    # print('func2')
    print(nn)
    # import time
    # time.sleep(0.5)
if __name__ == '__main__':
    print('主进程:',os.getpid())
    p = Pool(4)
    p.apply_async(func1,args=(10,),callback=func2)
    p.close()
    p.join()

  

回调函数在写的时候注意一点,回调函数的形参执行有一个,如果你的执行函数有多个返回值,那么也可以被回调函数的这一个形参接收,接收的是一个元祖,包含着你执行函数的所有返回值。

原文地址:https://www.cnblogs.com/xihuanniya/p/9838138.html