并发编程(进程)——生产者消费者模型*****

一、生产者消费者模型

 1、理论:
-生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。
-生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取
-阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力

2、代码实现:

  (1)初级生产者消费者模型 

import time
import random
from multiprocessing import Process, Queue


def producer(name, food, q):
    for i in range(10):
        data = '%s 制造了%s' % (name, food)
        # 模拟制造食物延迟
        time.sleep(random.randint(1, 3))
        print(data)
        q.put(food)


def consumer(name, q):
    while True:
        food = q.get()
        # 模拟吃食物延迟
        time.sleep(random.randint(1, 3))
        print('%s消费了%s' % (name, food))


if __name__ == '__main__':
    q = Queue()
    # 创造一个生产者
    p = Process(target=producer, args=('egon', '包子', q))
    p.start()

    # 创造一个消费者
    c = Process(target=consumer, args=('鸡哥',  q))
    c.start()


如果queue中没有数据了,消费者会一直卡住
import time
import random
from multiprocessing import Process, Queue

def producer(name, food, q):
    for i in range(10):
        data = '%s 制造了%s' % (name, food)
        # 模拟制造食物延迟
        time.sleep(random.randint(1, 3))
        print(data)
        q.put(food)
    q.put(None)


def consumer(name, q):
    while True:
        food = q.get()
        if food is None:return # 当队列中取出None,之间结束
        # 模拟吃食物延迟
        time.sleep(random.randint(1, 3))
        print('%s消费了%s' % (name, food))


if __name__ == '__main__':
    q = Queue()
    # 创造一个生产者
    p = Process(target=producer, args=('egon', '包子', q))
    p.start()

    # 创造一个消费者
    c = Process(target=consumer, args=('鸡哥',  q))
    c.start()

(2)制造两个消费者

import time
import random
from multiprocessing import Process, Queue

def producer(name, food, q):
    for i in range(10):
        data = '%s 制造了%s' % (name, food)
        # 模拟制造食物延迟
        time.sleep(random.randint(1, 3))
        print(data)
        q.put(food)


def consumer(name, q):
    while True:
        food = q.get()
        if food is None:return # 当队列中取出None,之间结束
        # 模拟吃食物延迟
        time.sleep(random.randint(1, 3))
        print('%s消费了%s' % (name, food))


if __name__ == '__main__':
    q = Queue()
    # 创造一个生产者
    p = Process(target=producer, args=('egon', '包子', q))
    p.start()

    # 创造一个消费者
    c = Process(target=consumer, args=('鸡哥',  q))
    c.start()
    c1 = Process(target=consumer, args=('王铁蛋',  q))
    c1.start()
    # 生产者生产完毕,放两个None
    p.join()  # 等待p进程执行完成再放
    q.put(None)
    q.put(None)

(3)多个生产者和多个消费者(有None版本)

import time
import random
from multiprocessing import Process, Queue

def producer(name, food, q):
    for i in range(10):
        data = '%s 制造了%s' % (name, food)
        # 模拟制造食物延迟
        time.sleep(random.randint(1, 3))
        print(data)
        q.put(food)


def consumer(name, q):
    while True:
        food = q.get()
        if food is None:return # 当队列中取出None,之间结束
        # 模拟吃食物延迟
        time.sleep(random.randint(1, 3))
        print('%s消费了%s' % (name, food))


if __name__ == '__main__':
    q = Queue()
    # 创造一个生产者
    p = Process(target=producer, args=('egon', '包子', q))
    p.start()

    p1 = Process(target=producer, args=('alex', '泔水', q))
    p1.start()
    # 创造一个消费者
    c = Process(target=consumer, args=('鸡哥',  q))
    c.start()
    c1 = Process(target=consumer, args=('王铁蛋',  q))
    c1.start()
    c2 = Process(target=consumer, args=('李铁柱',  q))
    c2.start()
    # 生产者生产完毕,放两个None
    p.join()  # 等待p进程执行完成再放
    p1.join()  # 等待p1(另一个生产者)进程执行完成再放

    q.put(None)
    q.put(None)
    q.put(None)

(4)多个生产者和多个消费者(无None 最终版本)

import time
import random
from multiprocessing import Process, Queue,JoinableQueue

def producer(name, food, q):
    for i in range(10):
        data = '%s 制造了%s' % (name, food)
        # 模拟制造食物延迟
        time.sleep(random.randint(1, 3))
        print(data)
        q.put(food)



def consumer(name, q):
    while True:
        food = q.get()
        # 模拟吃食物延迟
        time.sleep(random.randint(1, 3))
        print('%s消费了%s' % (name, food))
        q.task_done()  # 把队列中维护的数字减一




if __name__ == '__main__':
    # q = Queue()
    # 内部为何了一个数字,放一个数字会加一
    # 消费一个数字减一
    q = JoinableQueue()
    # 创造一个生产者
    p = Process(target=producer, args=('egon', '包子', q))
    p.start()

    p1 = Process(target=producer, args=('alex', '泔水', q))
    p1.start()


    # 创造一个消费者
    c = Process(target=consumer, args=('鸡哥',  q))
    # c.daemon = True
    c.start()

    c1 = Process(target=consumer, args=('王铁蛋',  q))
    # c1.daemon = True
    c1.start()
    c2 = Process(target=consumer, args=('李铁柱',  q))
    # c2.daemon = True
    c2.start()
    # 主结束,消费进程也结束,把每个消费进程都设置成守护进程

    # 等待所有生产者生产结束,主进程再结束
    p.join()
    p1.join()
    q.join()  # 会卡再者,一直等待q队列中数据没有了,才继续往下走
    print('生产者结束了,主进程结束')
"""
总结:
JoinableQueue()  #每放一个值,数字加一
q.task_done()  #取值减一
q.join()  #一直阻塞,当q没有值了,才继续走
"""

1 主进程中创建两个其它进程,实现主进程结束,两个子进程也自动结束(默写出来)

#守护进程
from multiprocessing import Process,current_process
import time
import os

def task():
    print(os.getpid())
    print('子进程')
    time.sleep(2000)
    print('子进程结束')


if __name__ == '__main__':
    t = Process(target=task, )
    t1 = Process(target=task, )
    # 守护进程:主进程一旦结束,子进程也结束
    t.daemon=True  # 一定要加在启动之前
    t1.daemon=True  # 一定要加在启动之前
    t.start()
    t1.start()


    time.sleep(1)
    print('主进程结束')
#守护进程

2 通过互斥锁,实现模拟抢票(**代码,默写出来)

from multiprocessing import Process, Lock
import json
import time
import random


def search():
    # 查票的函数
    # 打开文件,读出ticket_count
    with open('ticket', 'r', encoding='utf-8') as f:
        dic = json.load(f)
        print('余票还有:', dic.get('ticket_count'))


def buy():
    with open('ticket', 'r', encoding='utf-8') as f:
        dic = json.load(f)

    time.sleep(random.randint(1, 3))  # 模拟一下网络延迟
    if dic.get('ticket_count') > 0:
        # 能够买票
        dic['ticket_count'] -= 1
        # 保存到文件中去
        with open('ticket', 'w', encoding='utf-8') as f:
            json.dump(dic, f)
            print('买票成功')
    else:
        # 买票失败
        print('买票失败')


# 写一个函数,先查票,再买票

def task(mutex):
    search()
    # 买票过程要加锁
    # 买前加锁
    # mutex.acquire()
    # buy()  # 10个进程变成了串行执行
    # # 买后释放锁
    # mutex.release()
    with mutex:
        buy()


if __name__ == '__main__':
    # 锁的创建,在哪?主进程创建锁
    mutex = Lock()  # 创建一把锁
    # 模拟十个人买票(开10个进程)
    for i in range(10):
        t = Process(target=task, args=(mutex,))
        t.start()
方式一:原始方法
{"ticket_count": 1}
ticket文件 json格式

4 整理上下文管理器的使用,实现**代码(默写出来)

class MyClass():
    def __init__(self,file_name,mode,encoding):
        self.file_name=file_name
        self.mode=mode
        self.encoding=encoding

    def __enter__(self):
        print('只要有with,就会执行我')
        self.file=open(self.file_name,self.mode,encoding=self.encoding)

        return self.file

    def __exit__(self, exc_type, exc_val, exc_tb):
        # 只要顶格写代码,就会执行我
        print('只要顶格写代码,就会执行我')
        self.file.close()


with MyClass('ticket','r','utf-8') as f:
    print(f.read())
    print('xxss')
    print("sdfadasf")


# a=MyClass('ticket','r','utf-8')#这样做不会打印__enter__里的内容
方式一:加with方法

3 通过队列实现多个进程之间数据通信

from multiprocessing import Process, current_process, Queue
import time
import os


def task1(q):
    print('我是task1进程,我的id号是:%s'%os.getpid())
    q.put('lqz is handsome')


def task2(q):

    # res=q.get()
    # print('我是task2进程,我的id号是:%s'%os.getpid(),res)
    print('我是task2进程,我的id号是:%s'%os.getpid())


if __name__ == '__main__':
    q = Queue(5)

    t1 = Process(target=task1, args=(q,))
    t1.start()
    t2 = Process(target=task2, args=(q,))
    t2.start()

    print(q.get())
IPC机制

5 (进阶)通过多进程,实现TCP服务端支持多个客户端连接

import socket
from threading import Thread
from multiprocessing import Process
"""
服务端
    1.要有固定的IP和PORT
    2.24小时不间断提供服务
    3.能够支持并发
    
从现在开始要养成一个看源码的习惯
我们前期要立志称为拷贝忍者 卡卡西 不需要有任何的创新
等你拷贝到一定程度了 就可以开发自己的思想了
"""
server =socket.socket()  # 括号内不加参数默认就是TCP协议
server.bind(('127.0.0.1',8080))
server.listen(5)


# 将服务的代码单独封装成一个函数
def talk(conn):
    # 通信循环
    while True:
        try:
            data = conn.recv(1024)
            # 针对mac linux 客户端断开链接后
            if len(data) == 0: break
            print(data.decode('utf-8'))
            conn.send(data.upper())
        except ConnectionResetError as e:
            print(e)
            break
    conn.close()

# 链接循环
while True:
    conn, addr = server.accept()  # 接客
    # 叫其他人来服务客户
    # t = Thread(target=talk,args=(conn,))
    t = Process(target=talk,args=(conn,))
    t.start()
#服务端
"""客户端"""
import socket


client = socket.socket()
client.connect(('127.0.0.1',8080))

while True:
    client.send(b'hello world')
    data = client.recv(1024)
    print(data.decode('utf-8'))
#客户端

6(进阶)通过队列和多进程实现生产者消费者模型

from multiprocessing import Process, Queue, JoinableQueue
import time
import random


def producer(name,food,q):
    for i in range(5):
        data = '%s生产了%s%s'%(name,food,i)
        # 模拟延迟
        time.sleep(random.randint(1,3))
        print(data)
        # 将数据放入 队列中
        q.put(data)


def consumer(name,q):
    # 消费者胃口很大 光盘行动
    while True:
        food = q.get()  # 没有数据就会卡住
        # 判断当前是否有结束的标识
        # if food is None:break
        time.sleep(random.randint(1,3))
        print('%s吃了%s'%(name,food))
        q.task_done()  # 告诉队列你已经从里面取出了一个数据并且处理完毕了


if __name__ == '__main__':
    # q = Queue()
    q = JoinableQueue()
    p1 = Process(target=producer,args=('大厨egon','包子',q))
    p2 = Process(target=producer,args=('马叉虫tank','泔水',q))
    c1 = Process(target=consumer,args=('春哥',q))
    c2 = Process(target=consumer,args=('新哥',q))
    p1.start()
    p2.start()
    # 将消费者设置成守护进程
    c1.daemon = True
    c2.daemon = True
    c1.start()
    c2.start()
    p1.join()
    p2.join()
    # 等待生产者生产完毕之后 往队列中添加特定的结束符号
    # q.put(None)  # 肯定在所有生产者生产的数据的末尾
    # q.put(None)  # 肯定在所有生产者生产的数据的末尾
    q.join()  # 等待队列中所有的数据被取完再执行往下执行代码
    """
    JoinableQueue 每当你往该队列中存入数据的时候 内部会有一个计数器+1
    没当你调用task_done的时候 计数器-1
    q.join() 当计数器为0的时候 才往后运行
    """
    # 只要q.join执行完毕 说明消费者已经处理完数据了  消费者就没有存在的必要了
生产消费模型

 ---38---

原文地址:https://www.cnblogs.com/guojieying/p/13554224.html