生产者消费者问题

问题描述:

生产者在生产产品,这些产品将提供给若干个消费者去消费,为了使生产者和消费者能并发执行,在两者之间设置一个具有多个缓冲区的缓冲池,生产者将它生产的产品放入一个缓冲区中,消费者可以从缓冲区中取走产品进行消费,显然生产者和消费者之间必须保持同步,即不允许消费者到一个空的缓冲区中取产品,也不允许生产者向一个已经放入产品的缓冲区中再次投放产品。

条件变量解决方案:

  • 基于队列构建一个缓冲区,生产者在队尾填充,消费者在队头获取。队列缓冲区作为多个线程的共享资源。
  • 由于多个消费者和生产者线程可以并发访问缓冲区,需要互斥锁来控制对缓冲区的互斥访问。
  • 队列空时消费者线程需要等到队列中存在资源、队列满时生产者线程需要等到队列中有资源被消费。通过使用条件变量来实现线程的阻塞、通知以达到生产、消费线程的同步。
from threading import Lock
from threading import Condition
import threading

class myQueue:
    def __init__(self, size):
        self.size = size
        self.list = list()
        self.lock = Lock()
        self.notFullCond = Condition(self.lock)
        self.notEmptyCond = Condition(self.lock)

    def isFull(self):
        if self.size == len(self.list):
            return True
        return False

    def isEmpty(self):
        if 0 == len(self.list):
            return True
        return False
    
    def enQueue(self, elem):
        self.lock.acquire()
        while self.isFull(): #队列满时触发等待notFullCond条件,线程阻塞同时释放互斥锁
            print('queue is full, waiting...')
            self.notFullCond.wait()   
        print(threading.current_thread().getName() + ' product ' + str(elem))
        self.list.append(elem)
#当有资源进入队列通知所有等待notEmptyCond条件的线程,等释放互斥锁后,等待notEmptyCond条件的线程获取锁,再次判断条件
        self.notEmptyCond.notify_all()
        self.lock.release()

    def deQueue(self):
        self.lock.acquire()
        while self.isEmpty(): #队列空时触发等待notEmptyCond条件,线程阻塞同时释放互斥锁
            print('queue is empty, waiting...')
            self.notEmptyCond.wait()
        elem = self.list[0]
        del(self.list[0])
        print(threading.current_thread().getName() + ' consume ' + str(elem))
#当有资源出队列通知所有等待notFullCond条件的线程,等释放互斥锁后,等待notFullCond条件的线程获取锁,再次判断条件
        self.notFullCond.notify_all()
        self.lock.release()

        return elem

信号量解决方案:

  • 通过信号量来控制线程的同步,信号量管理可以获得资源的个数,初始队列为空,写信号量资源个数为队列长度,读信号量资源个数为0
from threading import Lock
from threading import Semaphore
import threading

class mySemQueue:
    def __init__(self, size):
        self.size = size
        self.list = list()
        self.lock = Lock()
        self.writeSem = Semaphore(size)#初始化写信号量
        self.readSem  = Semaphore(0)   #初始化读信号量
    
    def enQueue(self, elem):
        self.writeSem.acquire()        #资源入队申请写信号量,如果为0则阻塞
        
        self.lock.acquire()        #互斥锁来保证资源的互斥访问
        self.list.append(elem)
        print(threading.current_thread().getName() + ' product ' + str(elem))
        self.lock.release()

        self.readSem.release()         #资源入队后释放一个读信号量,如果其它线程阻塞在这个信号量上,唤醒该线程

    def deQueue(self):
        self.readSem.acquire()         #资源出队申请读信号量,如果为0则阻塞

        self.lock.acquire()
        elem = self.list[0]
        del(self.list[0])
        print(threading.current_thread().getName() + ' consume ' + str(elem))
        self.lock.release()
        
        self.writeSem.release()        #资源出队后释放一个写信号量,如果其它线程阻塞在这个信号量上,唤醒该线程
        
        return elem
  • 测试
from threading import Thread
import sys
import threading

class myThread(Thread):
    def __init__(self, func):
        Thread.__init__(self)
        self.func = func

    def run(self):
        print(threading.current_thread().getName() + ' start')
        self.func()

from myThread import myThread
from myQueue import myQueue
import random
import sys

def producter():
    while True:
        elem =random.randint(1, 100)
        que.enQueue(elem)

def consumer():
    while True:
        que.deQueue()

fp = open('log.txt','w')
sys.stdout = fp

que = myQueue(10)
t1 = myThread(producter)
t2 = myThread(consumer)
t3 = myThread(consumer)
t1.start()
t2.start()
t3.start()
原文地址:https://www.cnblogs.com/chencheng/p/2893421.html