python 34 多进程(二)

1. 互斥锁

​ 当多个进程抢占同一数据时,将数据加锁,使进程按串行的方式去获取数据,先到先得,保证了公平、数据的安全。

​ lock.acquire() # 加锁

​ lock.release() # 释放

​ 死锁:连续lock.acquice() 多次,会阻塞进程。

# 模拟三个用户使用同一个打印机打印。
from multiprocessing import Process
from multiprocessing import Lock	# 导入互斥锁
import os
import time
import random
import sys

def task1(lock):

    lock.acquire()		# 加锁
    print(f'task1-{os.getpid()}开始打印!')
    time.sleep(random.randint(1,3))
    print(f'task1-{os.getpid()}打印结束!')
    lock.release()		# 解锁释放

def task2(lock):

    lock.acquire()
    print(f'task2-{os.getpid()}开始打印!')
    time.sleep(random.randint(1,3))
    print(f'task2-{os.getpid()}打印结束!')
    lock.release()

def task3(lock):

    lock.acquire()
    print(f'task3-{os.getpid()}开始打印!')
    time.sleep(random.randint(1,3))
    print(f'task3-{os.getpid()}打印结束!')
    lock.release()

if __name__ == '__main__':
    mutex = Lock()
    for i in range(1,4):
        p = Process(target=getattr(sys.modules[__name__], f'task{i}'), args=(mutex,))
        p.start()
# 优化,多个用户打印

from multiprocessing import Process
from multiprocessing import Lock
import os
import time
import random

def task(lock, i):

    lock.acquire()
    print(f'用户{i}:{os.getpid()}开始打印!')
    time.sleep(random.randint(1,3))
    print(f'用户{i}:{os.getpid()}打印结束!')
    lock.release()

if __name__ == '__main__':
    mutex = Lock()
    for i in range(1,5):
        p = Process(target=task, args=(mutex, i))
        p.start()

Lock与join对比:

​ 相同点:都可以把并发变成串行,保证了顺序。

​ 不同点:join是人为设定的顺序;Lock是让其竞争顺序,保证公平性。

2. 进程之间的通信

​ 进程在内存级别是隔离的,但是文件在磁盘上是共享的。

2.1 基于文件的通信

当多个进程共同争抢一个数据、资源时,如果要保证顺序、数据的安全,必须要串行。

缺点:效率低;需人为加锁容易出现死锁。


# 模拟抢票系统,5个用户抢1张票。(查票时是并发的,但购票时是串行的)
# 文件ticket_json 中写入{"count":1}

from multiprocessing import Process
from multiprocessing import Lock
import time
import os
import random
import json

def search():		# 查看余票
    time.sleep(random.random())
    with open('ticket_json','r', encoding='utf-8') as f1:
        dic = json.load(f1)
        print(f'{os.getpid()}查看余票:{dic["count"]}')

def paid():			# 购票
    with open('ticket_json','r', encoding='utf-8') as f1:
        dic = json.load(f1)
    if dic["count"] > 0:
        dic["count"] -= 1
        time.sleep(random.randint(1,2))
        with open('ticket_json','w', encoding='utf-8') as f2:
            json.dump(dic,f2)
        print(f'{os.getpid()}购票成功!')
    else:
        print(f"{os.getpid()}:已没票!")

def task(lock):		# 子进程
    search()
    lock.acquire()	#购票加锁
    paid()
    lock.release()

if __name__ == '__main__':
    mutex = Lock()
    for i in range(6):		# 5个用户抢1张票
        p = Process(target=task, args=(mutex,))
        p.start()

2.2 基于队列的通信

队列:存在于内存,可以理解是一个容器。可以承载一些数据。

特性:先进先出,FIFO。

from multiprocessing import Queue

def func():
    print('is func')
class Q:
    pass
obj = Q()

q = Queue(4)	# 最大承载4个数据
q.put(1)		#添加数据到队列中
q.put([2])
q.put(func)
q.put(obj)
#q.put(111)  	# 超出会阻塞

for i in range(5):
    print(q.get())  # 依次取出数据,当没数据时,再get会阻塞

# 队列Queue中的一些方法参数
q = Queue(n) 	# maxsize = n 最大承载n个数据
q.qsize()		# 获取队列的元素个数
q.empty()        # 判断队列是否为空
q.full()              # 判断队列是否满
put(self, obj, block=True, timeout=None) 
get(self, block=True, timeout=None)
# 队列满时,再put会阻塞,直到某个进程get()数据时,会添加进去。
# 队列没数据时,再get会阻塞,直到某个进程put()数据时,会取出。

block = True : 默认阻塞。当写block=False时,只有遇到阻塞就会报错。
q.put(11,block=False)  	# 当队列满时,会报错
q.get(block=False)  	# 当队列满无数据时时,会报错

timeout = 3  # 阻塞3秒,3秒后还是阻塞状态就会报错。
# 用队列购票

from multiprocessing import Process
from multiprocessing import Queue
import os
import time
import random

def search(q):  # 查看余票
    print(f"用户-{os.getpid()}查看余票:{q.qsize()}票")

def paid(q):    # 购票
    if q.qsize() > 0:
        q.get()
        print(f"用户-{os.getpid()},购票成功")
    else:
        print(f"用户-{os.getpid()},购票失败")

def task(q):
    search(q)
    time.sleep(random.random())     # 网络延迟
    paid(q)

if __name__ == '__main__':
    q = Queue(10)
    for i in range(3):      # 3张票
        q.put(1)
    for i in range(5):      # 5个用户
        p = Process(target=task, args=(q,))
        p.start()
原文地址:https://www.cnblogs.com/yzm1017/p/11390986.html