Python 第三十四章 进程通信 文件通信+队列通信 + 锁 join+ lock

进程通信

"""
进程在内存级别是隔离的,但是文件在磁盘上,
1.基于文件通信
利用抢票系统讲解

2.基于队列通信

3.基于管道的通信
"""

文件通信

# 抢票系统
# 1.先可以查票,查询余票数 并发
# 2.进行购买,向服务端发送请求,服务端接收请求,在后端将票数-1,返回到前端 串行

from multiprocessing import Process
from multiprocessing import Lock
import json
import time
import os
import random


# task一个总进程
def search():
    """
    查看余票
    :return:
    """
    time.sleep(random.randint(1,3))
    with open('ticket.json',encoding='utf-8') as f1:
        dic = json.load(f1)
        print(f'{os.getpid()}查看了票数,剩余{dic["count"]}')
def paid():
    """
    支付
    打开文件写入文件转成json模式
    :return:
    """
    with open('ticket.json', encoding='utf-8') as f1:
        dic = json.load(f1)
    if dic['count'] > 0:
        dic['count'] -=1
        time.sleep(random.randint(1,3))
        with open('ticket.json',encoding='utf-8',mode='w') as f1:
            json.dump(dic,f1)
        print(f'{os.getpid()}购买成功')

def task(lock): # task进程
    search()

    lock.acquire()
    paid()
    lock.release()

if __name__ == '__main__':
    mutex = Lock()
    for i in range(6):
        p = Process(target=task,args=(mutex,))
        p.start() # 启动


# 当很多进程共抢一个资源数据时,要保证顺序数据安全,一定要串行
# 互斥锁:可以公平性的保证顺序以及数据的安全

# 基于文件的进程之间的通信
# 缺点:效率低,自己加锁会麻烦,容易出现死锁
# 优点:能通信了

"""
几乎同时6个进程开启,都进入了一个查票环节,
task1进入search,睡了3s,其他的进程都进入查票,查票此时是并发
当task2进程进入买票环节,读取文件票数,将文件票数-1,
进入阻塞状态,其他的进程陆续执行同样的操作,这样就造成了数据不安全
"""

队列通信

"""
"""
"""
队列:一个容器,这个容器可以承载一些数据 Queue
队列的特性:先进先出永远保持这个顺序 FIFO first in first out
"""
from multiprocessing import Queue
q = Queue()
def func():
    print('in func')
    return '函数'
q.put(1)
q.put('alex')
q.put([1,2,3])
q.put(func())

print(q.get())
print(q.get())
print(q.get())
print(q.get())



# 设置大小
from multiprocessing import Queue
q = Queue(3) # maxsize的最大值
def func():
    print('in func')
q.put(1)
q.put('alex')
q.put([1,2,3])
q.put(func) # 若put超出最大值则队列满了  进程put会阻塞 无法执行下面的代码

print(q.get())
print(q.get())
print(q.get())
print(q.get()) # 若get满了,当数据取完时,在进程get数据也会出现阻塞,直到再put一个数据

# 一个进程,几个put就几个get
#
# block = False 只要遇到阻塞就会报错
from multiprocessing import Queue
q = Queue(3) # maxsize
def func():
    print('in func')
q.put(1)
q.put('alex')
q.put([1,2,3])
q.put(555,block=False)

print(q.get())
print(q.get())
print(q.get())
print(q.get())

# timeout = 3 延时报错 超过3秒还阻塞,抛出异常
from multiprocessing import Queue
q = Queue(3) # maxsize
q.put(1)
q.put('alex')
q.put([1,2,3])

print(q.get())
print(q.get())
print(q.get())
print(q.get(timeout=3))

join

# join让主进程等待子进程的结束,再执行主进程
from multiprocessing import Process
import time

def task(name):
    print(f'{name} is running')
    time.sleep(2)
    print(f'{name} is gone') # 主进程

if __name__ == '__main__':
    p = Process(target=task,args=('zs',))# 创建一个进程对象
    p.start() # 启动p子进程同时启动主进程
    p.join() # 必须等待p执行完再执行主
    print('主进程')
    # 输出
    # zs is running
    # zs is gone
    # 主进程

# join开启多个子进程
# 未开启时:先执行主进程,再一个一个执行子进程(先执行时间短的)
from multiprocessing import Process
import time

def task(name,sec):
    print(f'{name} is running')
    time.sleep(sec)
    print(f'{name} is gone') # 主进程

if __name__ == '__main__':
    start_time = time.time()
    # 同一个时刻开启4个进程,并发或者并行,按照最大的时间走
    p = Process(target=task,args=('zs',2))
    p2 = Process(target=task,args=('zs2',7))
    p3 = Process(target=task,args=('zs3',5))
    p.start()
    p2.start()
    p3.start()
    print(f'主进程消耗的时间{time.time() - start_time}')  # 主进程消耗的时间与其他进程无关 0.000123秒
    # 输出:
    # 主进程消耗的时间0.0050699710845947266
    # zs is running
    # zs2 is running
    # zs3 is running
    # zs is gone
    # zs3 is gone
    # zs2 is gone

# 验证1:
    # join只针对主进程,如果join下面多次join是不阻塞的
    # 不会按照一行一行输出,同时开始执行,最后执行主进程
    # 必须等待最长的1个p执行完再执行主进程
    # p,p1,p2 同时进行,p执行速度快,先执行完,执行时阻塞不执行'2秒'主进程2秒后执行主进程'2秒'
    # 执行p的时候同时执行p1和p2 p2执行速度比p3慢 p3先执行完,执行完后join无法阻塞主进程
    # p3执行完,p2再过2秒也执行完,p2执行完后,直接执行'7'秒和'5'秒的主进程
    p.join()
    print('2秒')
    p2.join()
    print('7秒')
    p3.join()
    print('5秒')

    print(f'主进程消耗的时间{time.time() - start_time}')  # 并发执行主进程消耗的时间 3.0387587秒
    # p1和p2和p3 同时运行 按照最长的时长打印
    # 输出
    # 主进程消耗的时间0.005324602127075195
    # zs is running
    # zs2 is running
    # zs3 is running
    # zs is gone
    # 2秒
    # zs3 is gone
    # zs2 is gone
    # 7秒
    # 5秒
    # 主进程消耗的时间7.008730888366699



# 验证2
# 对验证1进行优化代码  循环打印

# 正确示范
from multiprocessing import Process
import time
def task(sec):
    print(f'is running')
    time.sleep(sec)
    print(f'is gone')

if __name__ == '__main__':
    start_time = time.time()
    l1 = []
    for i in range(1,4):
        p = Process(target=task,args=(i,))
        l1.append(p)
        p.start()

    for i in l1:
        i.join()

    print(f'主进程{time.time()-start_time}')


# join就是阻塞,主进程有join主进程下面的代码一律不执行

互斥锁

# 互斥锁:
# 多个任务共抢占一个资源时,想要顺序优先保障数据安全,一定要让其串行
# 现在的进程都并发的抢占输出
# 并发是以效率优先的,但是目前的需求是:顺序优先
# 多个进程共枪一个资源时,要保证顺畅优先:串行,一个一个来

# 并行或者并发
from multiprocessing import Process
import time
import random
import os
def task1():
    print(f'{os.getpid()}开始')
    time.sleep(random.randint(1,3))
    print(f'{os.getpid()}打印结束了')
def task2():
    print(f'{os.getpid()}开始')
    time.sleep(random.randint(1,3))
    print(f'{os.getpid()}打印结束了')
def task3():
    print(f'{os.getpid()}开始')
    time.sleep(random.randint(1,3))
    print(f'{os.getpid()}打印结束了')

if __name__ == '__main__':
    # 开启3个任务

    p1 = Process(target=task1)
    p2 = Process(target=task2)
    p3 = Process(target=task3)
    p1.start()
    p2.start()
    p3.start()

# 串行
# 利用join 解决了并行的问题,保证了顺序优先,但是这个谁先谁后是固定的,
# 这样不合理,争抢同一个资源的时候,应该是先到先得,保证公平
from multiprocessing import Process
import time
import random
import os
def task1():
    print(f'{os.getpid()}开始')
    time.sleep(random.randint(1,3))
    print(f'{os.getpid()}打印结束了')
def task2():
    print(f'{os.getpid()}开始')
    time.sleep(random.randint(1,3))
    print(f'{os.getpid()}打印结束了')
def task3():
    print(f'{os.getpid()}开始')
    time.sleep(random.randint(1,3))
    print(f'{os.getpid()}打印结束了')

if __name__ == '__main__':
    # 开启3个任务

    p1 = Process(target=task1)
    p2 = Process(target=task2)
    p3 = Process(target=task3)
    p1.start()
    p1.join()
    p2.start()
    p2.join()
    p3.start()
    p3.join()



from multiprocessing import Process
import time
import random
import os
def task1(p):
    print(f'{p}开始')
    time.sleep(random.randint(1,3))
    print(f'{p}打印结束了')
def task2(p):
    print(f'{p}开始')
    time.sleep(random.randint(1,3))
    print(f'{p}打印结束了')
def task3(p):
    print(f'{p}开始')
    time.sleep(random.randint(1,3))
    print(f'{p}打印结束了')

if __name__ == '__main__':
    # 开启3个任务

    p1 = Process(target=task1,args=('p1',))
    p2 = Process(target=task2,args=('p2',))
    p3 = Process(target=task3,args=('p3',))
    p1.start()
    p1.join()
    p2.start()
    p2.join()
    p3.start()
    p3.join()



from multiprocessing import Process
from multiprocessing import Lock #(锁)
import time
import random
import os
def task1(p,lock):
    # 上锁
    lock.acquire()
    # lock.acquire() 互斥锁上多个无法解开
    print(f'{p}开始')
    time.sleep(random.randint(1,3))
    print(f'{p}打印结束了')
    # 解锁 释放掉
    lock.release()
def task2(p,lock):
    # 上锁
    lock.acquire()
    print(f'{p}开始')
    time.sleep(random.randint(1,3))
    print(f'{p}打印结束了')
    # 解锁 释放掉
    lock.release()
def task3(p,lock):
    # 上锁
    lock.acquire()
    print(f'{p}开始')
    time.sleep(random.randint(1,3))
    print(f'{p}打印结束了')
    # 解锁 释放掉,再往下执行,开启强锁
    lock.release()

if __name__ == '__main__':
    # 开启3个任务
    # 开始上锁,添加参数lock
    mutex = Lock()
    p1 = Process(target=task1,args=('p1',mutex))
    p2 = Process(target=task2,args=('p2',mutex))
    p3 = Process(target=task3,args=('p3',mutex))
    p1.start()
    p1.join()
    p2.start()
    p2.join()
    p3.start()
    p3.join()


"""
三个任务同时抢占一把锁,先到先得,假如task2 先得到
然后执行下面的程序,此时的task1和task3也会抢锁,但是抢到后,发现已经上锁了,只能阻塞等待
遇到阻塞后,操作系统会强行将CPU切换到其他任务,其他任务发现锁还没有被打开,继续阻塞
直到task2 将锁释放掉,task1和task3继续争抢这把锁

如果一个子进程里面有多个锁,会变成死锁,无法继续执行
"""
"""
join和互斥锁的区别
共同点:都能将并行变成串行,保证了顺序
不同点:join人为的设定顺序,lock让其争抢顺序,保证了公平性
"""

from multiprocessing import Process
from multiprocessing import Lock
import time
import random

def task1(p,lock):
    lock.acquire()
    print(f'{p}开始打印了')
    time.sleep(random.randint(1,3))
    print(f'{p}打印结束了')
    lock.release()

def task2(p,lock):
    lock.acquire()
    print(f'{p}开始打印了')
    time.sleep(random.randint(1,3))
    print(f'{p}打印结束了')
    lock.release()

def task3(p,lock):
    lock.acquire()
    print(f'{p}开始打印了')
    time.sleep(random.randint(1,3))
    print(f'{p}打印结束了')
    lock.release()

if __name__ == "__main__":
    mutex = Lock()
    p1 = Process(target=task1,args=('p1',mutex))
    p2 = Process(target=task2,args=('p2',mutex))
    p3 = Process(target=task3,args=('p3',mutex))

    p2.start()
    p1.start()
    p3.start()
原文地址:https://www.cnblogs.com/zhangshan33/p/11390661.html