用信号量实现生产者&消费者模型

本文内容:

  1.POSIX信号量介绍

  2.信号量接口函数解释

  3.生产者消费者模型介绍

  4.信号量实现原理

  5.代码展示

一、信号量介绍

  1.POSIX信号量和System V 信号量的作用是一样的,都是用于同步,但是POSIX信号量可以用于线程间同步,也可实现互斥功能;

  2.同步与互斥实现原理

   同步:通过自身计数器对资源进行计数,判断线程是否可以访问临界资源;

      符合则访问,不符合则进行阻塞;当访问条件满足时,唤醒等待队列上的线程进行访问操作。

   互斥:保证计数器的值不大于1,通过计数判断,实现互斥

二、接口函数解释

  1.定义信号量:sem_t sem;

  2.初始化信号量:int sem_init(sem_t* sem, int pshared, unsigned int val);

   sem:信号量

   pshared:判断使用对象,0是线程,1是进程

   val:设置信号量 初始值

  3.信号量的值-1

   sem_wait(sem_t* sem); -- 判断访问条件是否满足,满足则访问,不满足则阻塞等待

   sem_trywait(sem_t* sem); -- 满足则访问,不满足直接报错

   sem_timewait(sem_t* sem, const struct timespec* time); -- 设置超时机制,若超时则报错

  4.信号量的值+1

   sem_post(sem_t* sem); -- 通过信号量唤醒阻塞队列中的pcb(进程/线程)

  5.销毁信号量

   sem_destroy(sem_t* sem); 

三、生产者消费者模型介绍

  1. 为何要使用生产者消费者模型

   生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

  2.生产者消费者模型的优点:解耦、支持并发、支持忙闲不均

四、实现原理

  本次展示一个单线程生产者消费者模型

  1.通过一个安全队列用于存储资源,这里采用vector实现队列

   实现队列的循环操作

   (1) _read_step = (_read_step  + 1) % _capacity; 

   (2) _write_step = (_write_step + 1) % _capacity;

  2.同步的实现

   (1) 通过sem_t sem_idle,sem_data进行计数判断

   (2) 通过sem_wait 和 sem_post进行阻塞等待和唤醒操作

   (3) _sem_idle:生产者,若有队列存在空闲,则生产者生产数据,若无则进行阻塞等待,等到存在空闲资源时唤醒

   (4) _sem_data:消费者,若队列存在数据,则消费者读取数据,若无则进行阻塞等待,等待存在数据资源时唤醒

  3.互斥的实现

   通过sem_t _lock计数判断实现互斥

五、代码展示

1. 安全队列结构和成员函数的实现

#include <vector>
#include <semaphore.h>

#define
QUEUESIZE
using namespace std;

class RingQueue{ private: std::vector<int> _queue; // 用数组实现队列的操作 int _capacity; // 队列容量 int _read_step; // 读步 int _write_step;// 写步 private: sem_t _lock; // 用于实现互斥操作 sem_t _sem_idle; // 对空闲资源进行计数,初值为空闲资源的数目,_sem_idle = _capacity;生产者 sem_t _sem_data; // 对数据资源进行计数,初值为数据资源的数目,_sem_data = 0;消费者 public: RingQueue(): _queue(QUEUESIZE), _capacity(QUEUESIZE), _read_step(0), _write_step(0) { // 初始化各个信号量 sem_init(&_lock, 0, 1); sem_init(&_sem_idle, 0, QUEUESIZE); sem_init(&_sem_data, 0, 0); } ~RingQueue(){ // 销毁各个信号量 sem_destroy(&_lock); sem_destroy(&_sem_idle); sem_destroy(&_sem_data); } bool Push(int data){ // 生产者写入数据 // 1.有没有空闲资源,若无则进行阻塞等待 sem_wait(&_sem_idle); // 2.对临界资源进行加锁 sem_wait(&_lock); // 3.写入数据 _queue[_write_step] = data; _write_step = (_write_step + 1) % capacity; // 4.对临界资源进行解锁 sem_post(&_lock); // 5.唤醒消费者读取数据 sem_post(&_sem_data); return true; } int Pop(){ // 消费者读取数据 // 1.有没有数据资源,若无则进行阻塞等待 sem_wait(&_sem_data); // 2.对临界资源进行加锁 sem_wait(&_lock); // 3.读取数据 int data = _queue[_read_step]; _read_step = (_read_step + 1) % capacity; // 4.对临界资源进行解锁 sem_post(&_lock); // 5.唤醒生产者写入数据 sem_post(&_sem_idle); return data; } };

2.main函数

void* pro_func(void* arg){   
   RingQueue* queue = (RingQueue*)arg;
   int i = 0;
   while(1){
      ++i;
      queue->Push(i);
      printf("pro_tid[%d] write:%d to queue
", pthread_self(), i);
      sleep(1);
   }
}
void* con_func(void* arg){
   RingQueue* queue = (RingQueue*)arg;
   int data = 0;
   while(1){
      data = queue->Pop();
      printf("con_tid[%d] read:%d for queue
", pthread_self(), data);
      sleep(2);
   }
}
int main(){
   RingQueue* queue = new RingQueue();
   pthread_t con_tid, pro_tid;
   int con_ret = pthread_create(&_pro_tid, NULL, pro_func, (void*)queue);
   int pro_ret = pthread_create(&_con_tid, NULL, con_func, (void*)queue);
   if(con_ret != 0 || pro_ret != 0){
      perror("thread create fail");
      return -1;
   }
   pthread_join(pro_tid, NULL);
   pthread_join(con_tid, NULL);
   return 0;
}

可参考博客链接:

生产者/消费者模式的理解及实现:https://www.cnblogs.com/luego/p/12048857.html

生产者消费者模式会遇到的问题及解决方案:https://www.cnblogs.com/schips/p/12533081.html

原文地址:https://www.cnblogs.com/bj3251101/p/14446469.html