进程之间的通信

互斥锁

from multiprocessing import Process,Lock
import os,time
def work(lock):
    lock.acquire()    上锁
    print('%s is running' %os.getpid())
    time.sleep(2)
    print('%s is done' %os.getpid())
    lock.release()    解锁
if __name__ == '__main__':
    lock=Lock()      造锁(实例化一个锁)
    for i in range(3):
        p=Process(target=work,args=(lock,))
        p.start()
调用“锁”的方式

示例背景

三台电脑同一时刻共同调用打印机完成打印任务,即三个进程同一时刻共抢同一个资源(输出平台)

多个进程共抢一个资源,应结果第一位,效率第二位,所以应该牺牲效率,保求结果(串行)

示例代码

以上的代码虽然完成串行结果,但没有实现公平,执行的顺序都是人为写好的,应该做到公平的抢占资源,谁先抢到就执行谁

from multiprocessing import Process
from multiprocessing import Lock
import time
import random


def task1(lock):
    print('task1')  # 验证cpu遇到io切换了
    lock.acquire()
    print('task1: 开始打印')
    time.sleep(random.randint(1, 3))
    print('task1: 打印完成')
    lock.release()

def task2(lock):
    print('task2')  # 验证cpu遇到io切换了
    lock.acquire()
    print('task2: 开始打印')
    time.sleep(random.randint(1, 3))
    print('task2: 打印完成')
    lock.release()


def task3(lock):
    print('task3') # 验证cpu遇到io切换了
    lock.acquire()
    print('task3: 开始打印')
    time.sleep(random.randint(1, 3))
    print('task3: 打印完成')
    lock.release()


if __name__ == '__main__':

    lock = Lock()

    p1 = Process(target=task1, args=(lock,))
    p2 = Process(target=task2, args=(lock,))
    p3 = Process(target=task3, args=(lock,))

    p1.start()
    p2.start()
    p3.start()
正确代码

上锁时一定要给进程上同一把锁,而且上锁一次,就一定要解锁一次

 互斥锁和join的区别

共同点

都是完成了进程之间的串行

区别

join是通过人为控制使进程串行

互斥锁是随机抢占资源,保证了公平性

业务需求分析:

买票之前先要查票,必须经历的流程: 你在查票的同时,100个人也在查本此列票.
买票时,你要先从服务端获取到票数,票数>0 ,买票,然后服务端票数减一. 中间肯定有网络延迟.
from multiprocessing import Process
from multiprocessing import Lock
import time
import json
import os
import random
多进程原则上是不能互相通信的,它们在内存级别数据隔离的.不代表磁盘上数据隔离.
它们可以共同操作一个文件.

def search():
    time.sleep(random.random())
    with open('db.json',encoding='utf-8') as f1:
        dic = json.load(f1)
    print(f'剩余票数{dic["count"]}')


def get():
    with open('db.json',encoding='utf-8') as f1:
        dic = json.load(f1)
    time.sleep(random.randint(1,3))
    if dic['count'] > 0:
        dic['count'] -= 1
        with open('db.json', encoding='utf-8', mode='w') as f1:
            json.dump(dic,f1)
        print(f'{os.getpid()}用户购买成功')
    else:
        print('没票了.....')

def task(lock):
    search()
    lock.acquire()
    get()
    lock.release()


if __name__ == '__main__':
    lock = Lock()
    for i in range(5):
        p = Process(target=task,args=(lock,))
        p.start()
模拟抢票系统

队列

进程之间的通信最好的方式是基于队列

from multiprocessing import Process
from multiprocessing import Queue
import time
import os
import random
# 多进程原则上是不能互相通信的,它们在内存级别数据隔离的.不代表磁盘上数据隔离.
# 它们可以共同操作一个文件.

def search(ticket):
    time.sleep(random.random())
    print(f'剩余票数{ticket}')


def _get(q):
    dic = q.get()
    time.sleep(random.randint(1,3))
    if dic['count'] > 0:
        dic['count'] -= 1
        print(f'{os.getpid()}用户购买成功')
    else:
        print('没票了.....')
    q.put(dic)

def task(ticket,q):
    search(ticket)
    _get(q)


if __name__ == '__main__':
    q = Queue(1)
    q.put({'count': 1})
    ticket = 1
    for i in range(5):
        p = Process(target=task,args=(ticket,q))
        p.start()
通过队列模拟抢票系统

什么是队列

队列是存在与内存中的一个容器,最大的特点:先进先出(FIFO)

利用队列进行进程之间的通信,简单,方便,不用自己手动加锁,队列自带优质阻塞,可持续化取数据

from multiprocessing import Queue

q = Queue(3)  # 可以设置元素个数

def func():
     print('in func')

q.put('alex')
q.put({'count': 1})
q.put(func)
q.put(666)  # 当队列数据已经达到上限,在插入数据的时候,程序就会夯住.
                     当你将数据全部取完继续再取值时,程序也会夯住

put中有block参数 默认为True 当你插入的数据超过最大限度,默认阻塞.改成False 数据超过最大限度,不阻塞了直接报错.

put中有timeout参数 延时报错,比如q.put(3,timeout = 3) 超过三秒再put不进,程序就会报错

get也有这两个参数
队列的基本方法
# 小米:抢手环4.预期发售10个.
# 有100个人去抢.
import os
from multiprocessing import Queue
from multiprocessing import Process

def task(q):
    try:
        q.put(f'{os.getpid()}',block=False)
    except Exception:
        return


if __name__ == '__main__':
    q = Queue(10)
    for i in range(100):
        p = Process(target=task,args=(q,))
        p.start()
    for i in range(1,11):
        print(f'排名第{i}的用户: {q.get()}',)
用进程通信队列模拟实例

栈与队列性质类似,特点与队列相反:先进后出(FILO)

import queue

q = queue.LifoQueue()
q.put(1)
q.put(3)
q.put('barry')

print(q.get())
print(q.get())
print(q.get())
print(q.get())
栈的基本方法

优先级队列

需要通过元组的形式放入,(int,数据) int 代表优先级,数字越低,优先级越高

import queue
q = queue.PriorityQueue(3)

q.put((10, '垃圾消息'))
q.put((-9, '紧急消息'))
q.put((3, '一般消息'))

print(q.get())
print(q.get())
print(q.get())
队列的基本方法

生产者消费者模型(多应用于并发)

生产者:生产数据进程

消费者:对生产者生产出来的数据做进一步处理进程

from multiprocessing import Process
from multiprocessing import Queue
import time
import random


def producer(name,q):
    for i in range(1,6):
        time.sleep(random.randint(1,3))
        res = f'{i}号包子'
        q.put(res)

        print(f'33[0;32m 生产者{name}: 生产了{res}33[0m')



def consumer(name,q):
    while 1:
        try:
            time.sleep(random.randint(1,3))
            ret = q.get(timeout=5)
            print(f'消费者{name}: 吃了{ret}')
        except Exception:
            return




if __name__ == '__main__':

    q = Queue()

    p1 = Process(target=producer, args=('alex',q))
    p2 = Process(target=consumer, args=('barry',q))

    p1.start()
    p2.start()

 模型中有三个主体:生产者,消费者,容器队列

合理的调控多个进程去生成数据以及提取数据,中间有个必不可少的环节就是容器队列

如果没有容器,生产者与消费者形成强耦合性,不合理,所以要有一个缓冲区(容器),平衡了生产力与消费力

总结

进程之间的通信

基于文件 + 锁    效率低,麻烦

基于队列       推荐

基于管道       管道自己加锁,底层可能会出现数据丢失损坏

应用

多个进程抢占一个资源

串行,有序以及数据安全

多个进程实现并发效果

生产者消费者模型

原文地址:https://www.cnblogs.com/biulo/p/11232892.html