[线程同步]生产者消费者代码实现

生产者消费者问题是一个著名的线程同步问题,该问题描述如下:
有一个生产者在生产产品,这些产品将提供给若干个消费者去消费,为了使生产者和消费者能并发执行,在两者之间设置一个具有多个缓冲区的缓冲池,生产者将它生产的产品放入一个缓冲区中,消费者可以从缓冲区中取走产品进行消费,显然生产者和消费者之间必须保持同步,即不允许消费者到一个空的缓冲区中取产品,也不允许生产者向一个已经放入产品的缓冲区中再次投放产品。

首先来简化问题,先假设生产者和消费者都只有一个,且缓冲区也只有一个。这样情况就简便多了。

1. 从缓冲区取出产品和向缓冲区投放产品必须是互斥进行的。可以用关键段和互斥量来完成。
2. 生产者要等待缓冲区为空,这样才可以投放产品,消费者要等待缓冲区不为空,这样才可以取出产品进行消费。
并且由于有两个等待过程,所以要用两个事件或信号量来控制。

#include <stdio.h>
#include <process.h>
#include <windows.h>

CRITICAL_SECTION g_csThreadCode;
HANDLE g_EventBuffFull;
HANDLE g_EventBuffEmpty;
int g_buffer;

const int THREAD_NUM = 2;
const int product_num = 100;

unsigned int __stdcall producer_thread(void *param)
{
	for (int i = 0; i < product_num; i++)
	{
		WaitForSingleObject(g_EventBuffEmpty, INFINITE);
		
		EnterCriticalSection(&g_csThreadCode);
		g_buffer = i;
		printf("生产者在缓冲区中放置一个产品,编号%d
", g_buffer);
		LeaveCriticalSection(&g_csThreadCode);
		
		SetEvent(g_EventBuffFull);
	}
	
	return 0;
}
unsigned int __stdcall consumer_thread(void *param)
{
	for (int i = 0; i < product_num; i++)
	{
		WaitForSingleObject(g_EventBuffFull, INFINITE);
		
		EnterCriticalSection(&g_csThreadCode);
		printf("	消费者取走%d产品
", g_buffer);
		g_buffer = 0;
		LeaveCriticalSection(&g_csThreadCode);
		
		SetEvent(g_EventBuffEmpty);
	}
	
	Sleep(10);
	return 0;
}

int main()
{
	InitializeCriticalSection(&g_csThreadCode);
	g_EventBuffEmpty = CreateEvent(NULL, false, true, NULL);
	g_EventBuffFull = CreateEvent(NULL, false, false, NULL);

	HANDLE handle[THREAD_NUM];
	handle[0] = (HANDLE)_beginthreadex(NULL, 0, producer_thread, NULL, 0, NULL);
	handle[1] = (HANDLE)_beginthreadex(NULL, 0, consumer_thread, NULL, 0, NULL);

	WaitForMultipleObjects(THREAD_NUM, handle, true, INFINITE);

	//销毁同步资源
	DeleteCriticalSection(&g_csThreadCode);
	CloseHandle(handle[0]);
	CloseHandle(handle[1]);
	CloseHandle(g_EventBuffFull);
	CloseHandle(g_EventBuffEmpty);
	return 0;
}

代码执行结果:

可以看出生产者与消费者已经是有序的工作了。


 先给出伪代码:

semaphore mutex=1; //临界区互斥信号量
semaphore empty=n;  //空闲缓冲区
semaphore full=0;  //缓冲区初始化为空
producer () { //生产者进程
    while(1){
        produce an item in nextp;  //生产数据
        P(empty);  //获取空缓冲区单元
        P(mutex);  //进入临界区.
        add nextp to buffer;  //将数据放入缓冲区
        V(mutex);  //离开临界区,释放互斥信号量
        V(full);  //满缓冲区数加1
    }
}
consumer () {  //消费者进程
    while(1){
        P(full);  //获取满缓冲区单元
        P(mutex);  // 进入临界区
        remove an item from buffer;  //从缓冲区中取出数据
        V (mutex);  //离开临界区,释放互斥信号量
        V (empty) ;  //空缓冲区数加1
        consume the item;  //消费数据
    }
}

然后再对这个简单生产者消费者问题加大难度。将消费者改成2个,缓冲池改成拥有4个缓冲区的大缓冲池。

代码实现:

#include <stdio.h>
#include <process.h>
#include <windows.h>

const int THREAD_NUM = 3;
const int BUFFER_SIZE = 4;
int g_buffer[BUFFER_SIZE];
int g_i;
int g_j;
const int product_num = 10;
//所有产品都被取走,消费者线程结束标志
bool g_isOver;

HANDLE g_hSemaphoreFull;
HANDLE g_hSemaphoreEmpty;
CRITICAL_SECTION g_csThreadCode;

unsigned int __stdcall producer_thread(void *param)
{
	for (int i = 0; i < product_num; i++)
	{
		WaitForSingleObject(g_hSemaphoreEmpty, INFINITE);

		EnterCriticalSection(&g_csThreadCode);
		g_buffer[g_i] = i;
		printf("%d位置上放置产品%d
", g_i, g_buffer[g_i]);
		g_i = (g_i + 1) % BUFFER_SIZE;
		LeaveCriticalSection(&g_csThreadCode);

		ReleaseSemaphore(g_hSemaphoreFull, 1, NULL);
	}
	printf("生产者任务完成,线程结束
");
	return 0;
}
unsigned int __stdcall consumer_thread(void *param)
{
	while (true)
	{
		WaitForSingleObject(g_hSemaphoreFull, INFINITE);
		if (g_isOver)
			break;

		EnterCriticalSection(&g_csThreadCode);
		printf("	线程%d在%d位置取走产品%d
", GetCurrentThreadId(), g_j, g_buffer[g_j]);
		//所有的产品都被取走,这时所有的消费者线程应该结束
		if (g_buffer[g_j] == product_num - 1)
		{
			g_isOver = true;
			LeaveCriticalSection(&g_csThreadCode);
			ReleaseSemaphore(g_hSemaphoreFull, 1, NULL);
			printf("	所有的产品都已经被消费者取走,消费者线程结束
");
			break;
		}
		g_j = (g_j + 1) % BUFFER_SIZE;
		LeaveCriticalSection(&g_csThreadCode);

		Sleep(50);

		ReleaseSemaphore(g_hSemaphoreEmpty, 1, NULL);
	}//while

	//printf("生产者任务完成,线程结束
");
	return 0;
}

int main()
{
	//初始化变量和内核对象
	InitializeCriticalSection(&g_csThreadCode);
	g_hSemaphoreEmpty = CreateSemaphore(NULL, 4, 4, NULL);
	g_hSemaphoreFull  = CreateSemaphore(NULL, 0, 4, NULL);
	g_i = 0;
	g_j = 0;
	memset(g_buffer, 0, sizeof(g_buffer));

	HANDLE handle[THREAD_NUM];

	handle[1] = (HANDLE)_beginthreadex(NULL, 0, consumer_thread, NULL, 0, NULL);
	handle[2] = (HANDLE)_beginthreadex(NULL, 0, consumer_thread, NULL, 0, NULL);

	handle[0] = (HANDLE)_beginthreadex(NULL, 0, producer_thread, NULL, 0, NULL);

	WaitForMultipleObjects(THREAD_NUM, handle, true, INFINITE);

	//释放资源
	for (int i = 0; i < THREAD_NUM; i++)
	{
		CloseHandle(handle[i]);
	}
	DeleteCriticalSection(&g_csThreadCode);
	CloseHandle(g_hSemaphoreEmpty);
	CloseHandle(g_hSemaphoreFull);

	return 0;
}

执行结果:

输出结果证明各线程的同步和互斥已经完成了。

至此,生产者消费者问题已经圆满的解决了,下面作个总结:

1. 首先要考虑生产者与消费者对缓冲区操作时的互斥。

2. 不管生产者与消费者有多少个,缓冲池有多少个缓冲区。都只有二个同步过程——分别是生产者要等待有空缓冲区才能投放产品,消费者要等待有非空缓冲区才能去取产品。

  

原文地址:https://www.cnblogs.com/stemon/p/4857481.html