进程同步:生产者消费者模型 以及解决方法

背景

生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品,当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞。

为什么要使用生产者和消费者模式:

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题,于是引入了生产者和消费者模式。

生产者——消费者模型 会遇到的问题

生产者——消费者模型中,生产者和消费者线程之间需要传递一定量的数据,两个线程会使用一个特定大小的共享环形缓冲器。

生产者向缓冲器中写入数据,直到它到达缓冲器的终点;然后它会再次从起点重新开始,覆盖已经存在的数据。消费者线程则会读取生成的数据。

在生产者——消费者实例中,对于同步的需求有两个部分:
1.如果生产者线程生成数据的速度太快,那么将会把消费者线程还没有读取的数据覆盖;
2.如果消费者线程读取数据的速度过快,那么它就会越过生产者线程而读取一些垃圾数据。

生产者——消费者模型 解决方法:

容易导致死锁的实现:

让生产者线程填满缓冲器,然后等待消费者线程读取完缓冲器中全部数据。

使用2个信号量 解决 单消费者单生产者的问题

使用两个信号量:freeSpace 与 usedSpace

freeSpace 信号量控制生产者线程写入数据的那部分缓冲器, usedSpace 信号量则控制消费者线程读取数据的那部分缓冲器区域。
这两个区域是相互补充的。

常用缓冲区容量值初始化 freeSpace 信号量,意味着它最多可以获取的缓冲器资源量。
在启动这个应用程序时,消费者线程就会获得自由的字节并把它们转换为用过的字节。
用0初始化usedSpace信号量,以确保消费者线程不会在一开始就读取到垃圾数据。

在生产者线程中,每次反复写入都是从获取一个 freeSpace 开始。
如果该缓冲器中充满了消费者线程还没有读取的数据,那么对acquire()的调用就会被阻塞,直到消费者线程开始消费这些数据。
一旦生产者线程获取这一字节,就写入数据,并将这个字节释放为 usedSpace ,以让消费者线程读取到。

在消费者线程中,我们从获取一个 usedSpace 开始。
如果缓冲器中还没有任何可用的数据,那么将会阻塞对acquire()调用,直到生产者线程生产数据。
一旦获取到这个字节,就使用数据,并把字节释放为 freeSpace ,这样,生产者线程就可以再次写入。

上述方法在只有一个生产者和一个消费者时能解决问题。对于多个生产者或者多个消费者共享缓冲区的情况,该算法也会导致竞争条件,出现两个或以上的进程同时读或写同一个缓冲区槽的情况。

解决多生产者、多消费者 的问题

为了保证同一时刻只有一个生产者能够执行 putItemIntoBuffer()。也就是说,需要寻找一种方法来互斥地执行临界区的代码。为了达到这个目的,可引入一个二值信号灯 mutex,其值只能为 1 或者 0。如果把线程放入 down(mutex) 和 up(mutex) 之间,就可以限制只有一个线程能被执行。


#include <QtCore/QCoreApplication>
#include <QThread>
#include <QSemaphore>
#include <QDebug>

const int SIZE = 5;
static unsigned char g_buff[SIZE] = {0};
static QSemaphore g_sem_free(SIZE);
static QSemaphore g_sem_used(0);
static QSemaphore mutex(1);

class Producer : public QThread
{
protected:
    void run()
    {
        while( true )
        {
            int value = qrand() % 256;

            g_sem_free.acquire();
            mutex.acquire();

            for(int i=0; i<SIZE; i++)
            {
                if( !g_buff[i] )
                {
                    g_buff[i] = (unsigned char)value;

                    qDebug() << objectName() << " generate: {" << i << ", " << value << "}";

                    break;
                }
            }
            mutex.release();

            g_sem_used.release();

            sleep(2);
        }
    }
};

class Customer : public QThread
{
protected:
    void run()
    {
        while( true )
        {
            g_sem_used.acquire();
            mutex.acquire();

            for(int i=0; i<SIZE; i++)
            {
                if( g_buff[i] )
                {
                    int value = g_buff[i];

                    g_buff[i] = 0;

                    qDebug() << objectName() << " consume: {" << i << ", " << value << "}";

                    break;
                }
            }
            mutex.release();

            g_sem_free.release();

            sleep(1);
        }
    }
};

int main(int argc, char *argv[])
{
    QCoreApplication a(argc, argv);

    Producer p1;
    Producer p2;
    Producer p3;

    p1.setObjectName("p1");
    p2.setObjectName("p2");
    p3.setObjectName("p3");

    Customer c1;
    Customer c2;

    c1.setObjectName("c1");
    c2.setObjectName("c2");

    p1.start();
    p2.start();
    p3.start();

    c1.start();
    c2.start();

    return a.exec();
}

使用 互斥量 + 条件变量 解决 多生产者多消费者的问题

#include <QtCore/QCoreApplication>
#include <QWaitCondition>
#include <QThread>
#include <QMutex>
#include <iostream>

const int DataSize = 100;
const int BufferSize = 1;
// static char buffer[BufferSize];

static QWaitCondition bufferIsNotFull;
static QWaitCondition bufferIsNotEmpty;
static QMutex mutex;
static int usedSpace;

class Producer : public QThread
{
protected:
    void run()
    {
        for (int i = 0; i < DataSize; ++i)
        {
            mutex.lock();
            while (usedSpace == BufferSize)
            {
                bufferIsNotFull.wait(&mutex);
            }
            std::cerr<<"P";
            ++usedSpace;
            bufferIsNotEmpty.wakeAll();
            mutex.unlock();
        }
    }
};

class Consumer : public QThread
{
protected:
    void run()
    {
        for (int i = 0; i < DataSize; ++i)
        {
            mutex.lock();
            while (usedSpace == 0)
            {
                bufferIsNotEmpty.wait(&mutex);
            }
            std::cerr<<"C";
            --usedSpace;
            bufferIsNotFull.wakeAll();
            mutex.unlock();
        }
        std::cerr<<std::endl;
    }
};

int main(int argc, char *argv[])
{
    QCoreApplication a(argc, argv);
    Producer producer;
    Consumer consumer;
    producer.start();
    consumer.start();
    producer.wait();
    consumer.wait();
    return a.exec();
}


原文地址:https://www.cnblogs.com/schips/p/12533081.html