Linux进程通信之共享内存实现生产者/消费者模式

共享内存 

  共享内存是内核为进程创建的一个特殊内存段,它将出现在进程自己的地址空间中,其它进程可以将同一段共享内存连接(attach)到自己的地址空间。这是最快的进程间通信方式,但是不提供任何同步功能(需要我们信号量实现)。

  

  使用共享内存实现生产者消费者任务模式。

共享内存系统调用 

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
int semget(key_t key, int size, int flag);
void *shmat(int shmid, void *addr, int flag);
int shmdt(void *addr);
int shmctl(int shmid, int cmd, struct shmid_ds *buf);

shmget函数:

  功能:获得或创建一个共享内存标识符。

int semget(key_t key, size_t size, int shmflag);
  • 成功返回一个共享内存标识符,失败返回-1;
  • 第一个参数key为共享内存段命名(一般由ftok产生);
  • 第二个参数size为需要共享的内存容量。(如果共享内存已存在时,不能不大于该共享内存段的大小);
  • 第三个参数设置访问权限(低9位)与IPC_CREAT, IPC_EXCL 的按位或。

shmat函数

  功能:将共享内存段连接到一个进程的地址空间中。

void *shmat(int  shm_id, const  void *addr, int shmflg) ;  
  • 成功返回共享存储段连接的实际地址,失败返回-1
  • 第一个参数shm_id为shmget返回的共享内存标识符。
  • 第二个参数addr指明共享内存段要连接到的地址(进程空间内部地址),通常指定为空指针,表示让系统来选择共享内存在进程地址空间中出现的地址。
  • 第三个参数shmflg可以设置为两个标志位(通常设置为0)
  • SHM_RND( 表示第二个参数指定的地址应被向下靠拢到内存页面大小的整数倍)
  • SHM_RDONLY,要连接的共享内存段是只读的。

shmdt函数

  功能:将共享内存从当前进程中分离。

int shmdt(const  void *shmaddr) ;    //其中shmaddr为shmat返回的地址。

shmctl函数

  功能:查看及修改共享内存段的shmid_ds结构,删除该结构以及相连的共享存储段标识。

int shmctl(int  shm_id, int command, struct shmid_ds *buf) ; 
  • 成功返回0,失败返回-1
  • 第二个参数commad取值:
  • IPC_STAT 获取当前共享内存段的shmid_ds结构
  • IPC_SET 把共享内存段的当前关联值设置为shmid_ds结构给出的值
  • IPC_RMID 从系统中删除该共享存储段。
  • 第三个参数buf是一个结构指针,它指向共享内存模式和访问权限的结构

    struct shmid_ds 
    { 
        uid_t shm_perm.uid; 
        uid_t shm_perm.gid; 
        mode_t shm_perm.mode; 
    };

生产者/消费者模式

  某个模块负责产生数据(生产者),另一个模块来负责处理(消费者)。

生产者任务流程图

消费者任务流程图

为什么要使用生产者消费者模式

  • 解耦 ----如果让生产者直接与消费者交互,那么生产者对于消费者就会产生依赖(也就是耦合);
  • 支持并发----传统方式,在消费者的方法没有返回之前,生产者只好一直等在那边。万一消费者处理数据很慢,生产者就会白白糟蹋大好时光。使用了生产者/消费者模式之后,生产者把制造出来的数据往缓冲区一丢,就可以再去生产下一个数据;
  • 支持忙闲不均----当数据制造快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中。

实现共享内存的生产者消费者模式 

  编写两个应用程序,其中一个应用程序实现生产者任务,一个应用程序实现消费者任务。
生产者任务和消费者任务之间通过共享内存机制实现跨进程的共享缓冲池;在信号量集合中支持一个信号量(或利用一个POSIX信号量),实现对共享缓冲池的互斥访问;缓冲区的分配计数通过修改缓冲池结构中的计数变量来实现。 

  缓冲池结构:

struct shared_use_st 
{ 
     //为0表示对应的缓冲区未被生产者使用,可分配但不可消费;为1表示对应    
     的缓冲区以被生产者使用,不可分配但可消费

    //5个字符串缓冲区
    char Buffer[5][100];
    int Index[5];
};    

  缓冲区的互斥访问,5个缓冲区的缓冲池作为一个临界资源:

  • 当生产者任务从数据源—文件中读取数据后将会申请一个缓冲区,并将此数据放入缓冲区中。
  • 消费者任务从一个缓冲区中取走数据,并将其中的内容打印输出。
  • 当一个生产者任务正在访问缓冲区时,其他生产者和消费者任务不能访问缓冲区
  • 当一个消费者任务正在访问缓冲区时,其他其他生产者和消费者任务不能访问缓冲区
  • 使用信号量集(包含1个信号量,其初始值为1)实现对缓冲池的互斥访问

源码

#Producer.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <semaphore.h>
#include <fcntl.h>
#define TEXT_SZ 1024

//缓冲池结构
struct shared_use_st  
{  
    int Index[5];    //5个缓冲池,为0表示对应的缓冲区未被生产者使用,可分配但不可消费;为1表示对应的缓冲区已被生产者使用,不可分配但可消费
    char Buffer[5][TEXT_SZ];    //5个字符串缓冲区
    sem_t sem;        //信号量,同步功能
};

int main()
{
    int running = 1;
    int i = 0;  
    void *shm = NULL;        //共享存储段连接的实际地址
    struct shared_use_st *shared = NULL;
    char buffer[BUFSIZ + 1];        //缓冲区存放字符
    int shmid;        //共享内存标识符
    //获得或创建一个共享内存标识符
    shmid = shmget((key_t)1121, sizeof(struct shared_use_st), 0666|IPC_CREAT);
    if(shmid == -1)        //获取或创建一个共享内存标识符失败
    {   
      exit(EXIT_FAILURE);
    }
    shm = shmat(shmid, (void*)0, 0);        //返回共享存储段连接的实际地址
    if(shm == (void*)-1)
    {
        exit(EXIT_FAILURE);
    }
    printf("Memory attached at %ld
", (intptr_t)shm);
    shared = (struct shared_use_st*)shm;        //缓冲池为共享存储段连接地址
    for( ; i < 5; i++ )
    {
        shared->Index[i] = 0;        //对缓冲池初始化,Index为0表示可以生产
    }
    sem_init(&(shared->sem),1,1);        //信号量化初始化,且信号量初始值为第二个1
    i = 0;
    while(running)        //制造一个循环
    {
        if(sem_wait(&(shared->sem)) == -1)        //sem_wait为P操作,减少信号量的值
        {
            printf("P操作 ERROR!
");
            exit(EXIT_FAILURE);
        }
        for(i = 0; i < 5 && shared->Index[i] == 1; i++)
        ;
        if(i == 5)        //Index为1表示缓冲池被消费者占用
        {  
            //当五个空间都被消费者占用时输出“waiting...”
            sem_post(&shared->sem);        //sem_post为V操作,用来增加信号量的值
            sleep(1);        //sleep一段时间,再次进入循环
            printf("Waiting for some time...
");
        }
        else
        {
            sem_post(&shared->sem);        //V 操作增加信号量
               printf("Enter some text with keyboard: ");
               fgets(buffer, BUFSIZ, stdin);        //读取stdin字符流最多BUFSIZ-1个,并存在buffer数组中 其中stdin是键盘输入到缓冲区的字符
               strncpy(shared->Buffer[i%5], buffer,TEXT_SZ);        //读取的字符串存入缓冲区shared->Buffer中
               shared->Index[i%5] = 1;        //表示该缓冲区被生产者使用了
            if(strncmp(buffer, "end", 3) == 0)        //缓冲区的字符为end时,结束循环
            {
                running = 0;
            }
        }   
     }
     //将共享内存从当前进程中分离
    if(shmdt(shm) == -1)        //失败
    {
        exit(EXIT_FAILURE);
    }
    /*查看及修改共享内存段的shmid_ds结构,删除该结构以及相连的共享存储段标识
    struct shmid_ds  
    {  
        uid_t shm_perm.uid;  
        uid_t shm_perm.gid;  
        mode_t shm_perm.mode;  
    };
    */
    if(shmctl(shmid, IPC_RMID, 0) == -1)        //失败
    {
        exit(EXIT_FAILURE);
    }  
    exit(EXIT_SUCCESS);
}
#Consumer.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/shm.h>
#include <unistd.h>
#include <semaphore.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <fcntl.h>
#define TEXT_SZ 1024

//缓冲池结构
struct shared_use_st  
{  
    int Index[5];    //5个缓冲池,为0表示对应的缓冲区未被生产者使用,可分配但不可消费;为1表示对应的缓冲区被生产者使用,不可分配但可消费
    char Buffer[5][TEXT_SZ];    //5个字符串缓冲区
    sem_t sem;        //信号量,同步功能
};

int main()
{
    int running = 1;
    int i = 0; 
    void *shm = NULL;        //共享存储段连接的实际地址
    struct shared_use_st *shared = NULL;    //缓冲池
    int shmid;        //声明共享内存标识符
    
    shmid = shmget((key_t)1121, sizeof(struct shared_use_st), 0666|IPC_CREAT); //获得或创建一个共享内存标识符
    if(shmid == -1)        //获取或创建一个共享内存标识符失败
    {  
        exit(EXIT_FAILURE);
    }
    
    //将共享内存段连接到一个进程的地址空间中,返回void *指针
    shm = shmat(shmid, 0, 0);    //返回共享存储段连接的实际地址    
    if(shm == (void*)-1)        //失败
    {
        exit(EXIT_FAILURE);
    }
    printf("Memory attached at %ld
", (intptr_t)shm);
    shared = (struct shared_use_st*)shm;        //缓冲池为共享存储段连接地址
    while(running)
    {
        if(sem_wait(&(shared->sem)) == -1)        //sem_wait为P操作,减少信号量的值
        {
            printf("P操作 ERROR!
");
            exit(EXIT_FAILURE);
        }
        for(i = 0; i < 5 && shared->Index[i] == 0; i++)
           ;
           //五个缓冲区没有都被生产者占用
        if(i != 5)
        {   
            printf("You wrote: %s
", shared->Buffer[i%5]);        //打印出生产者写入的字符
            shared->Index[i%5] = 0;        //为0时,表示已被消费者使用
            sem_post(&shared->sem);        //sem_post为V操作
            sleep(1); 
            if( strncmp(shared->Buffer[i%5], "end", 3) == 0 )        //缓冲区的字符为end时,结束循环
            {
                 running= 0;
            }
        }
        //五个空间都被占用,输出waiting...  
          else
          {
             sem_post(&shared->sem);        //V操作
             sleep(1);  
            printf("Waiting for some time...
");
        }
    }
    //将共享内存从当前进程中分离
    if(shmdt(shm) == -1)        //分离失败
    {
        exit(EXIT_FAILURE);
    }
    if(shmctl(shmid, IPC_RMID, 0) == -1)    //失败
    {
        exit(EXIT_FAILURE);
    }  
    exit(EXIT_SUCCESS);
}

 

原文地址:https://www.cnblogs.com/cjvae/p/9179559.html