生产者消费者问题

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <windows.h>
/*最大临界区数*/
#define MAX_BUFFER_NUM 20
/*秒到毫秒的乘法因子*/
#define INTE_PER_SEC 1000
/*生产与消费线程的总数*/
#define MAX_THREAD_NUM 64
/*每个线程的参数*/
typedef struct
{
    int serial;/*线程序列号*/
    char entity;/*线程标识*/
    double delay;/*线程延迟*/
    int thread_request[MAX_THREAD_NUM];/*线程请求队列*/
    int n_request;/*请求个数*/
} ThreadInfo;
/*临界区对象的声明,用于管理缓冲区的互斥访问*/
CRITICAL_SECTION PC_Critical[MAX_BUFFER_NUM];
int Buffer_Critical[MAX_BUFFER_NUM];/*缓冲区声明,用于存放产品*/
HANDLE h_Thread[MAX_THREAD_NUM];/*每个线程句柄的数组*/
ThreadInfo Thread_Info[MAX_THREAD_NUM];/*线程信息数组*/
HANDLE empty_semaphore;/*一个信号量*/
HANDLE h_mutex;/*一个互斥量*/
DWORD n_Thread=0;/*实际线程数目*/
DWORD n_Buffer_or_Critical;/*临界区数目*/
HANDLE h_Semaphore[MAX_THREAD_NUM];/*生产者允许消费者开始消费的信号量*/

void Produce(void *p);
void Consume(void *p);
bool IfInOtherRequest(int);
int  FindProducePosition();
int  FindBufferPosition(int);

int main()
{
    int i,j,temp,len;
    char c;
    char Str[30];
    DWORD wait_for_all;
    /*初始化缓冲区*/
    for(i=0; i<MAX_BUFFER_NUM; ++i)
    {
        Buffer_Critical[i]=-1;
    }
    /*初始化每个线程的请求队列*/
    for(i=0; i<MAX_THREAD_NUM; ++i)
    {
        for(j=0; j<MAX_THREAD_NUM; ++j)
        {
            Thread_Info[i].thread_request[j]=-1;
        }
        Thread_Info[j].n_request=0;
    }
    /*初始化临界区*/
    for(i=0; i<MAX_THREAD_NUM; ++i)
    {
        InitializeCriticalSection(&PC_Critical[i]);
    }
    /*打开输入文件*/
    freopen("test1.txt","r",stdin);
    scanf("%lu%*c",&n_Buffer_or_Critical);
    while(scanf("%d",&Thread_Info[n_Thread].serial)!=EOF)
    {
        scanf("%*c%c%lf",&Thread_Info[n_Thread].entity,&Thread_Info[n_Thread].delay);
        c=getchar();
        while(c!='\n')
        {
            scanf("%d",&Thread_Info[n_Thread].thread_request[Thread_Info[n_Thread].n_request++]);
            c=getchar();
        }
        ++n_Thread;
    }
    fclose(stdin);
    /*回显信息*/
//    printf("输入文件是:\n");
//    printf("%d\n",(int)n_Buffer_or_Critical);
//    for(i=0; i<(int)n_Thread; ++i)
//    {
//        printf("thread %2d   %c   %f",Thread_Info[i].serial,Thread_Info[i].entity,Thread_Info[i].delay);
//        for(j=0; j<Thread_Info[i].n_request; ++j)
//        {
//            printf(" %d",Thread_Info[i].thread_request[j]);
//        }
//        printf("\n");
//    }
    /*创建信号量*/
    empty_semaphore=CreateSemaphore(NULL,n_Buffer_or_Critical,n_Buffer_or_Critical,"semaphore_for_empty");
    h_mutex=CreateMutex(NULL,FALSE,"mutex_for_update");
    /*命名*/
    for(i=0; i<(int)n_Thread; ++i)
    {
        strcpy(Str,"semaphore_for_produce_");
        len=strlen(Str);
        temp=i;
        do
        {
            Str[len++]=temp%10+'0';
            temp/=10;
        }
        while(temp);
        Str[len]='\0';
        h_Semaphore[i+1]=CreateSemaphore(NULL,0,n_Thread,Str);
    }
    /*创建生产者和消费者线程*/
    for(i=0; i<(int)n_Thread; ++i)
    {
        if(Thread_Info[i].entity=='P')
        {
            h_Thread[i]=CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)Produce,&(Thread_Info[i]),0,NULL);
        }
        else
        {
            h_Thread[i]=CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)Consume,&(Thread_Info[i]),0,NULL);
        }
    }
    /*主程序等待各个线程的动作结束*/
    wait_for_all=WaitForMultipleObjects(n_Thread,h_Thread,TRUE,-1);
    printf("\n\n全部生产者和消费者都已完成它们的工作.\n按任意键返回!\n");
    return 0;
}
/*确认是否还有对同一产品的消费请求未执行*/
bool IfInOtherRequest(int req)
{
    int i,j;
    for(i=0; i<n_Thread; ++i)
    {
        for(j=0; j<Thread_Info[i].n_request; ++j)
        {
            if(Thread_Info[i].thread_request[j]==req)
            {
                return TRUE;
            }
        }
    }
    return FALSE;
}
/*找出当前可以进行产品生产的空缓冲区位置*/
int FindProducePosition()
{
    int EmptyPosition,i;
    for(i=0; i<n_Buffer_or_Critical; ++i)
    {
        if(Buffer_Critical[i]==-1)
        {
            EmptyPosition=i;
            Buffer_Critical[i]=-2;
            break;
        }
    }
    return EmptyPosition;
}
/*找出当前所需生产者生产的产品的位置*/
int FindBufferPosition(int Propos)
{
    int TempPos,i;
    for(i=0; i<n_Buffer_or_Critical; ++i)
    {
        if(Buffer_Critical[i]==Propos)
        {
            TempPos=i;
            break;
        }
    }
    return TempPos;
}
/*生产者进程*/
void Produce(void *p)
{
    /*声明变量*/
    DWORD wait_for_semaphore,wait_for_mutex,m_delay;
    int m_serial;
    /*获取线程的信息*/
    m_serial=((ThreadInfo*)(p))->serial;
    m_delay=(DWORD)(((ThreadInfo*)(p))->delay*INTE_PER_SEC);
    Sleep(m_delay);
    /*开始请求生产*/
    printf("生产者 %2d 发送生产请求信号.\n",m_serial);
    /*确认有空缓冲区可供生产,同时将空位置数empty减1,用于生产者和消费者的同步*/
    wait_for_semaphore=WaitForSingleObject(empty_semaphore,-1);
    /*互斥访问下一个可用于生产的空临界区,实现写互斥*/
    wait_for_mutex=WaitForSingleObject(h_mutex,-1);
    int ProducePos=FindProducePosition();
    ReleaseMutex(h_mutex);
    /*生产者在获得自己的空位置并做上标记后,以下的写操作在生产者之间可以并发*/
    /*核心生产步骤中,程序将生产者的ID作为产品编号放入,方便消费者识别*/
    printf("生产者 %2d 开始在缓冲区 %2d 生产产品.\n",m_serial,ProducePos);
    Buffer_Critical[ProducePos]=m_serial;
    printf("生产者 %2d 完成生产过程:\n",m_serial);
    printf("\t缓冲区[%2d]:%3d\n",ProducePos,Buffer_Critical[ProducePos]);
    /*使生产者写的缓冲区可以被多个消费者使用,实现读写同步*/
    ReleaseSemaphore(h_Semaphore[m_serial],n_Thread,NULL);
    return;
}
/*消费者进程*/
void Consume(void *p)
{
    /*声明变量*/
    DWORD wait_for_semaphore,m_delay;
    /*消费者的序列号,和请求的数目,缓冲区位置*/
    int i,m_serial,m_requestNum,BufferPos;
    /*本消费线程的请求队列*/
    int m_thread_request[MAX_THREAD_NUM];
    /*获取线程信息*/
    m_serial=((ThreadInfo*)(p))->serial;
    m_delay=(DWORD)(((ThreadInfo*)(p))->delay*INTE_PER_SEC);
    m_requestNum=((ThreadInfo*)(p))->n_request;
    for(i=0; i<m_requestNum; ++i)
    {
        m_thread_request[i]=((ThreadInfo*)(p))->thread_request[i];
    }
    Sleep(m_delay);
    /*循环开始消费*/
    for(i=0; i<m_requestNum; ++i)
    {
        /*请求消费下一个产品*/
        printf("消费者 %2d请求消费 %2d产品\n",m_serial,m_thread_request[i]);
        /*没生产则等待,否则允许书减1,读写同步*/
        wait_for_semaphore=WaitForSingleObject(h_Semaphore[m_thread_request[i]],-1);
        /*查询所需产品放到缓冲区的号*/
        BufferPos=FindBufferPosition(m_thread_request[i]);
        /*消费处理*/
        EnterCriticalSection(&PC_Critical[BufferPos]);
        printf("消费者 %2d开始消费 %2d产品 \n",m_serial,m_thread_request[i]);
        ((ThreadInfo*)(p))->thread_request[i]=-1;
        if(!IfInOtherRequest(m_thread_request[i]))
        {
            Buffer_Critical[BufferPos]=-1;
            printf("消费者 %2d成功消费 %2d\n",m_serial,m_thread_request[i]);
            printf("\t缓冲区[%2d]:%3d\n",BufferPos,Buffer_Critical[BufferPos]);
            ReleaseSemaphore(empty_semaphore,1,NULL);
        }
        else
        {
            printf("消费者 %2d成功消费 %2d\n",m_serial,m_thread_request[i]);
        }
        LeaveCriticalSection(&PC_Critical[BufferPos]);
    }
    return;
}
/*
test1.txt
5
1 P 5.000000
2 P 4.000000
3 P 2.000000
4 C 6.000000 5 6
5 P 7.000000
6 P 1.000000
7 C 3.000000 1 3 2
*/
原文地址:https://www.cnblogs.com/NoSoul/p/2534946.html