进程

一.守护进程

  Process中的一个daemon属性,默认False

  当daemon设置为True时,守护进程会随着主程序的结束而强制结束

二.进程锁(同步锁/互斥锁)

  进程锁:把一段代码进行加锁,防止多进程同时进入导致数据不安全.

      由并发改变为串行,牺牲效率,保证数据安全不混乱

三.multiprocessing模块中的Queue队列

  队列(先进先出):实现了进程之间的通信

from multiprocessing import Process ,Queue

def f1(q):
    q.put(6661)    # 把需要给主进程的消息放在队列里
    q.put(6662)
    q.put(6663)
    q.put(6664)


if __name__ == '__main__':
    q = Queue(3)            # 创建一个队列并传给子进程
    p = Process(target=f1,args=(q,))
    p.start()

    print("子进程放在队列里的消息:",q.get())
基于队列的进程之间通信

Queue的方法

put():放入队列,放满了程序就会阻塞

get():从队列中拿数据,队列为空的时候程序就会阻塞

put_nowait():跟put一样,但是不会阻塞,只会报错

get_nowait():跟get一样,但是不会阻塞,只会报错

qsize():返回队列当前数量

full():判断队列是不是满了

empty():判断队列是不是空了

四.生产者消费者模型

  生产者消费者模型是通过一个容器来进行平衡,解耦.

from multiprocessing import Process ,Queue
import time

def producer(q):

    for i in range(10):
        time.sleep(0.2)
        s = f"包子{i}号"
        print(f"{s}出炉")
        q.put(s)
    q.put(None)

def consumer(q):

    while 1:
        time.sleep(0.5)
        baozi = q.get()
        if baozi == None:
            print("没了")
            break
        print(f"{baozi}被吃了  ")


if __name__ == '__main__':
    q = Queue(10)

    pro_p = Process(target=producer,args=(q,))
    con_p = Process(target=consumer,args=(q,))
    pro_p.start()
    con_p.start()
Queue收发信号的生产者消费者模型
from multiprocessing import Process,JoinableQueue
import time

def producer(q):
    for i in range(10):
        time.sleep(0.2)
        s = f"包子{i}号"
        print(f"{s}出炉")
        q.put(s)
    q.join()            # 等待没有task_done再来了   再结束
    print("接受完task_done()了")

def consumer(q):

    while 1:
        time.sleep(0.5)
        baozi = q.get()
        print(f"{baozi}被吃了  ")
        q.task_done()


if __name__ == '__main__':
    q = JoinableQueue()
    pro_p = Process(target=producer,args=(q,))
    con_p = Process(target=consumer,args=(q,))

    pro_p.start()
    con_p.daemon = True
    con_p.start()

    pro_p.join()        # 等pro_p的执行完才结束主进程

    print("完事")
JoinableQueue生产者消费者模型

JoinableQueue中的收发信号方式

  task_done():给队列发送一个完成队列get()完成的信号

  join():等待task_done()最后一次信号,也就是等待get()全部拿完,再往下执行.

五.multiprocessing模块中的Pipe管道

  管道:实现了进程之间的通信(第二种方式)

from multiprocessing import Process,Pipe

def f1(conn1):
    msg = conn1.recv()
    print(msg)
    conn1.send("你说呢!")


if __name__ == '__main__':
    conn1,conn2 = Pipe()    # 创建一个全双工通道

    p = Process(target=f1,args=(conn1,))
    p.start()
    conn2.send("你走到哪了?")
    msg = conn2.recv()          # 只能一个收一个发,不能自己收发自己的
    print(msg)

"""
conn1,conn2 = Pipe()    # 创建通道
conn.recv()
conn.send()
"""
管道的简单使用

管道的创建和方法

conn1,conn2 = Pipe()    全双工通道

recv():收消息,不能收自己的消息

send():发消息,不能发给自己消息

六.multiprocessing模块中的Event事件

  事件:控制状态,使子进程或主进程阻塞

from multiprocessing import Process,Event

def f1(msg):

    print("计算中...")
    n = 10
    print("结算完,结果为:",n)
    msg.set()               # 事件状态变为true
    print(msg.is_set())

if __name__ == '__main__':

    e = Event()   # 创建一个时间对象  默认为False
    p = Process(target=f1,args=(e,))
    p.start()
    print(e.is_set())       # 查看当前事件状态
    e.wait()                # 等待事件,事件状态为true就可以往下执行
    print("可以拿计算结果了")


"""
e = Event()    创建事件对象,事件状态默认为False
e.wait()        等待事件,当事件状态为True再往下执行
e.is_set()      查看当前事件状态
e.set()         改变事件状态为True
e.clear()       清除事件状态,变为False
"""
事件的简单使用

事件中的方法

wait():等待事件状态为True,再往下执行

set():将事件状态改为True

is_set():查看事件当前状态

clear():将事件状态改为False

七.multiprocessing模块中的Semaphore信号量

  信号量:类似于多把(最少一把)进程锁

import time,random
from multiprocessing import Process,Semaphore

def f1(s,i):
    s.acquire()
    print(f"轮到{i}号")
    time.sleep(random.randint(1,3))
    print(f"{i}号完事")
    s.release()

if __name__ == '__main__':
    s = Semaphore(4)

    for i in range(10):
        p = Process(target=f1,args=(s,i))
        p.start()

"""
s = Semaphore(n)    创建信号量(类似于多个进程锁)   n为锁的数量
s.acquier()         获取锁
s.release()         释放锁
"""
信号量的简单使用

信号量的方法

  acquire():获取锁

  release():释放锁

八.multiprocessing模块中的Pool进程池

  进程的创建和销毁是很耗内存的,影响代码执行效率,所以有了进程池  

1.进程池中的map方法是异步发行任务,并且第二个参数是可迭代对象,自带close()和join()

import time,os
from multiprocessing import Process,Pool

def f1(i):
    # time.sleep(1)
    for el in range(5):
        i += el

if __name__ == '__main__':

    s_time = time.time()
    pool = Pool(8)              # 创建一个进程池
    pool.map(f1,range(100))     # 进程池里的map()  异步发行任务 自带close()和join()
    e_time = time.time()
    sum_time = e_time - s_time
    print("进程池",sum_time)

    p_lst =[]
    start_time = time.time()
    for i in range(10):
        # print("xxx")
        p = Process(target=f1,args=(i,))
        p.start()
        p_lst.append(p)

    [pp.join() for pp in p_lst]     # 这里的join是Process中的join
    end_time = time.time()
    s_time = end_time - start_time
    print("多线程",s_time)

"""
pool = Pool(n)      创建进程池,n为进程池内的最多进程数量      
pool.map()          异步发行任务,但是一次执行的任务数量跟n有关,自带close()和join().跟映射函数用法一样

"""
进程池中的map方法

2.进程池中的同步和异步

同步

  res = apply(任务,args=(参数,)) 同步发行任务,可以直接拿到返回结果res

异步

  res = apply_async(任务,args=(参数,)) 异步发行任务,拿到的是结果对象,需要用get()方法拿到结果,get()会阻塞程序

from multiprocessing import Process,Pool
import time


def f1(n):
    time.sleep(0.5)
    # print(n)
    return n*n


if __name__ == '__main__':
    pool = Pool(4)
    for i in range(10):
        res = pool.apply(f1, args=(i,))    # 同步
        print(res)

    p_lst = []
    for i in range(10):
        res = pool.apply_async(f1,args=(i,))   # 异步,但是只能在进程池中执行.进程池多大就异步几个
        p_lst.append(res)

    pool.close()    # 关闭进程池,不让其他程序往进程池中扔新的任务,确保没有新的任务在进程池中
    pool.join()    # Pool 里面的join方法

    for el in p_lst:
        print(el.get())

    time.sleep(3)

"""
pool = Pool(n)      创建进程池,n为进程池内的最多进程数量  
pool.apply(任务,参数)                同步发行任务,
pool.apply_asncy(任务,参数)          异步发行任务
pool.join()         等待进程池中进程都执行完再往下执行,配合pool.close()使用
pool.close()        关闭进程池,不让进程池再有新的任务
"""
进程池的同步和异步的两个方法

两个方法

  close():关闭进程池接收任务,不让进程池再有新的任务

  join():等待进程池中的任务都执行完了再往下执行,配合close()使用

九.进程池中的回调函数

  回调函数:接收异步进程池中子进程的返回值,属于主程序,完成了进程的通信

  apply_asncy(任务,args=(参数,),callback=function) 将任务的返回结果作为参数传给callback指定的函数

from multiprocessing import Process, Pool
import time

def f1(n):
    print(n)
    return n * n

def callback_fn(cb):
    print(cb)

if __name__ == '__main__':
    pool = Pool()
    res = pool.apply_async(f1, args=(5,), callback=callback_fn)  # 回调函数可以接收子进程的结果
    # ret = pool.apply(f1,args=(5,))   # 同步中没有回调函数

    # pool.close()
    # pool.join()
    # time.sleep(3)
    print(res.get())
回调函数的简单使用

原文地址:https://www.cnblogs.com/q767498226/p/10253181.html