多线程编程之生产者消费者问题

  生产者-消费者问题(Producer-consumer problem),也称作有限缓冲问题(Bounded-buffer problem),是多线程领域的一个经典问题,可以描述为:两个或者更多个线程共享同一个缓冲区,其中一个或多个作为“生产者”会不断地向缓冲区中添加数据,另外的一个或者多个作为“消费者”从缓冲区中取走数据。这个问题的关键在于:要保证缓冲区满了之后“生产者”不能再继续添加数据,而缓冲区空了之后“消费者”不能再取走数据了。

  这个问题在多个“生产者”和“消费者”的情况下肯定要麻烦一点,所以先看一下只有一个“生产者”和一个“消费者”以及一个元素缓冲区的情况。这时候情况可以简化为:

  1. 从缓冲区中取走数据和向缓冲区中添加数据需要互斥操作保持同步,所以,这里需要用临界区或者互斥量来实现;
  2. 生产者要等待缓冲区“有空间”(由于这里缓冲区只有一个元素,所以等价于要等待缓冲区为空)才能添加数据,同样消费者也不能在缓冲区为空的时候取数据。这两个过程都需要事件或者信号量来通知进行。

  考虑好了上述两点要求,就可以设计出如下思路算法:

/* 针对生产者的算法1: */
{   WaitForSingleObject(hEmpty, INFINITE);   WaitForSingleObject(hMutex, INIFINITE);   /* 生产者的活动 */   ReleaseMutex(hMutex);   ReleaseSemaphore(hFull, 1, NULL);
}
/* 针对消费者的算法1: */
{
  WaitForSingleObject(hFull, INFINITE);   WaitForSingleObject(hMutex, INIFINITE);   /* 消费者的活动 */   ReleaseMutex(hMutex);   ReleaseSemaphore(hEmpty, 1, NULL);
}

   当然,生产者和消费者的互斥操作不用hMutex而改用EnterCriticalSection临界区也是一样的。举一个实例看看如何应用生产者-消费者算法:

typedef struct _MESSAGE_QUEUE    /* 消息队列的数据结构 */
{
    int threadId;
    int msgType[MAX_NUMBER];
    int count;
    HANDLE hFull;
    HANDLE hEmpty;
    HANDLE hMutex;
}MESSAGE_QUEUE;

/* 发送消息,类似于“生产者” */
void send_mseesge(int threadId, MESSAGE_QUEUE* pQueue, int msg)    
{
    assert(NULL != pQueue);
    
    if(threadId != pQueue->threadId)
        return;

    WaitForSingleObject(pQueue->hEmpty, INFINITE);
    WaitForSingleObject(pQueue->hMutex, INFINITE);
    pQueue->msgType[pQueue->count ++] = msg;
    ReleaseMutex(pQueue->hMutex);
    ReleaseSemaphore(pQueue->hFull, 1, NULL);    
}

/* 接收消息,类似于“消费者” */
void get_message(MESSAGE_QUEUE* pQueue, int* msg)
{
    assert(NULL != pQueue && NULL != msg);

    WaitForSingleObject(pQueue->hFull, INFINITE);
    WaitForSingleObject(pQueue->hMutex, INFINITE);
    *msg = pQueue->msgType[pQueue->count --];
    ReleaseMutex(pQueue->hMutex);
    ReleaseSemaphore(pQueue->hEmpty, 1, NULL);   
}
View Code

   搞清楚了一个生产者和一个消费者以及一个元素的缓冲区的简单模式,下面再看看如果消费者有两个而缓冲区可以容纳四个元素的情况:

//1生产者,2消费者,4缓冲区
#include <stdio.h>
#include <process.h>
#include <windows.h>

const int END_PRODUCE_NUMBER = 8;  // 生产产品个数
const int BUFFER_SIZE = 4;          // 缓冲区个数
int g_Buffer[BUFFER_SIZE];          // 缓冲池
int g_i, g_j;
CRITICAL_SECTION g_cs;    // 信号量与关键段
HANDLE g_hSemaphoreBufferEmpty, g_hSemaphoreBufferFull;

// 生产者线程函数
unsigned int __stdcall ProducerThreadFun(PVOID pM)
{
    for (int i = 1; i <= END_PRODUCE_NUMBER; i++)
    {
        // 等待“缓冲区有剩余空间”的信号!
        WaitForSingleObject(g_hSemaphoreBufferEmpty, INFINITE);

        // 互斥的访问缓冲区
        EnterCriticalSection(&g_cs);
        g_Buffer[g_i] = i;
        g_i = (g_i + 1) % BUFFER_SIZE;
        LeaveCriticalSection(&g_cs);

        // 通知消费者“缓冲区有数据”了!
        ReleaseSemaphore(g_hSemaphoreBufferFull, 1, NULL);
    }
    printf("生产者完成任务,线程结束运行
");
    return 0;
}

// 消费者线程函数
unsigned int __stdcall ConsumerThreadFun(PVOID pM)
{
    while (true)
    {
        // 等待“缓冲区中有数据”的信号
        WaitForSingleObject(g_hSemaphoreBufferFull, INFINITE);
        
        // 互斥的访问缓冲区
        EnterCriticalSection(&g_cs);
        if (g_Buffer[g_j] == END_PRODUCE_NUMBER) // 结束标志
        {
            LeaveCriticalSection(&g_cs);
            // 通知其它消费者有新数据了(结束标志)
            ReleaseSemaphore(g_hSemaphoreBufferFull, 1, NULL);
            break;
        }
        g_j = (g_j + 1) % BUFFER_SIZE;
        LeaveCriticalSection(&g_cs);

        Sleep(50);     // some other work to do
        ReleaseSemaphore(g_hSemaphoreBufferEmpty, 1, NULL);      // 给缓冲区增加一个空间
    }

    return 0;
}
int main()
{
    InitializeCriticalSection(&g_cs);
    // 初始化信号量,一个记录有产品的缓冲区个数,另一个记录空缓冲区个数
    g_hSemaphoreBufferEmpty = CreateSemaphore(NULL, 4, 4, NULL);    // 指定缓冲区初始状态存在四个“剩余空间”
    g_hSemaphoreBufferFull  = CreateSemaphore(NULL, 0, 4, NULL);
    g_i = 0;
    g_j = 0;
    memset(g_Buffer, 0, sizeof(g_Buffer));

    const int THREADNUM = 3;
    HANDLE hThread[THREADNUM];
    //生产者线程
    hThread[0] = (HANDLE)_beginthreadex(NULL, 0, ProducerThreadFun, NULL, 0, NULL);
    //消费者线程
    hThread[1] = (HANDLE)_beginthreadex(NULL, 0, ConsumerThreadFun, NULL, 0, NULL);
    hThread[2] = (HANDLE)_beginthreadex(NULL, 0, ConsumerThreadFun, NULL, 0, NULL);
    WaitForMultipleObjects(THREADNUM, hThread, TRUE, INFINITE);
    for (int i = 0; i < THREADNUM; i++)
        CloseHandle(hThread[i]);

    // 销毁信号量和关键段
    CloseHandle(g_hSemaphoreBufferEmpty);
    CloseHandle(g_hSemaphoreBufferFull);
    DeleteCriticalSection(&g_cs);
    return 0;
}    
View Code

  上面的代码思路很简单,一个生产者就一直等待“缓冲区有剩余空间”这个信号,而两个消费者就一直等待“缓冲区有数据”这个信号就行了!操作缓冲区的时候采取Mutex互斥操作防止冲突。注意,在创建信号量的初期要指定初始信号量的个数,这个信号量个数决定了缓冲区的大小。每一次WaitForSingleObject都会使信号量减一,而每一次ReleaseSemaphore都会使信号量加一。

  

小结:

  “生产者-消费者”问题只需考虑两个方面即可:

  1. 生产者和消费者要对缓冲区互斥操作
  2. 生产者要等待“缓冲区有空间”这个信号才能添加数据,消费者要等待“缓冲区有数据”这个信号才能取出数据。
原文地址:https://www.cnblogs.com/kuliuheng/p/4070851.html