分析Boost对 互斥量和条件变量的封装及实现生产者消费者问题

Boost的互斥量,条件变量做了很好的封装,因此比“原生的”POSIX mutex,condition variables好用。然后我们会通过分析boost相关源码看一下boost linux是如何对pthread_mutex_t和pthread_cond_t进行的封装。

首先看一下condition_variable_any的具体实现,代码路径:/boost/thread/pthread/condition_variable.hpp

 

[cpp] view plaincopy在CODE上查看代码片派生到我的代码片
  1. class condition_variable_any  
  2. {  
  3.     pthread_mutex_t internal_mutex;  
  4.     pthread_cond_t cond;  
  5.   
  6.     condition_variable_any(condition_variable_any&);  
  7.     condition_variable_any& operator=(condition_variable_any&);  
  8.   
  9. public:  
  10.     condition_variable_any()  
  11.     {  
  12.         int const res=pthread_mutex_init(&internal_mutex,NULL);  
  13.         if(res)  
  14.         {  
  15.             boost::throw_exception(thread_resource_error());  
  16.         }  
  17.         int const res2=pthread_cond_init(&cond,NULL);  
  18.         if(res2)  
  19.         {  
  20.             BOOST_VERIFY(!pthread_mutex_destroy(&internal_mutex));  
  21.             boost::throw_exception(thread_resource_error());  
  22.         }  
  23.     }  
  24.     ~condition_variable_any()  
  25.     {  
  26.         BOOST_VERIFY(!pthread_mutex_destroy(&internal_mutex));  
  27.         BOOST_VERIFY(!pthread_cond_destroy(&cond));  
  28.     }  
  29.    

condition_variable_any的构造函数是对于内部使用的mutex和cond的初始化,对应的,析构函数则是这些资源的回收。

 

BOOST_VERIFY的实现:

 

[cpp] view plaincopy在CODE上查看代码片派生到我的代码片
  1. #undef BOOST_VERIFY    
  2. #if defined(BOOST_DISABLE_ASSERTS) || ( !defined(BOOST_ENABLE_ASSERT_HANDLER) && defined(NDEBUG) )    
  3. // 在任何情况下,expr一定会被求值。    
  4. #define BOOST_VERIFY(expr) ((void)(expr))    
  5. #else    
  6. #define BOOST_VERIFY(expr) BOOST_ASSERT(expr)    
  7. #endif    

因此不同于assert在Release版的被优化掉不同,我们可以放心的使用BOOST_VERITY,因此它的表达式肯定会被求值,而不用担心assert的side effect。
接下来看一下condition_variable_any的核心实现:wait

 

 

[cpp] view plaincopy在CODE上查看代码片派生到我的代码片
  1. template<typename lock_type>  
  2.  void wait(lock_type& m)  
  3.  {  
  4.      int res=0;  
  5.      {  
  6.          thread_cv_detail::lock_on_exit<lock_type> guard;  
  7.          detail::interruption_checker check_for_interruption(&internal_mutex,&cond);  
  8.          guard.activate(m);  
  9.          res=pthread_cond_wait(&cond,&internal_mutex);  
  10.          this_thread::interruption_point();  
  11.      }  
  12.      if(res)  
  13.      {  
  14.          boost::throw_exception(condition_error());  
  15.      }  
  16.  }  

首先看一下lock_on_exit:

 

 

[cpp] view plaincopy在CODE上查看代码片派生到我的代码片
  1. namespace thread_cv_detail  
  2. {  
  3.     template<typename MutexType>  
  4.     struct lock_on_exit  
  5.     {  
  6.         MutexType* m;  
  7.   
  8.         lock_on_exit():  
  9.             m(0)  
  10.         {}  
  11.   
  12.         void activate(MutexType& m_)  
  13.         {  
  14.             m_.unlock();  
  15.             m=&m_;  
  16.         }  
  17.         ~lock_on_exit()  
  18.         {  
  19.             if(m)  
  20.             {  
  21.                 m->lock();  
  22.             }  
  23.        }  
  24.     };  
  25. }  

代码很简单,实现了在调用activate时将传入的lock解锁,在该变量生命期结束时将guard的lock加锁。

 

接下来的detail::interruption_checker check_for_interruption(&internal_mutex,&cond);是什么意思呢?From /boost/thread/pthread/thread_data.hpp

 

[cpp] view plaincopy在CODE上查看代码片派生到我的代码片
  1. class interruption_checker  
  2. {  
  3.     thread_data_base* const thread_info;  
  4.     pthread_mutex_t* m;  
  5.     bool set;  
  6.   
  7.     void check_for_interruption()  
  8.     {  
  9.         if(thread_info->interrupt_requested)  
  10.         {  
  11.             thread_info->interrupt_requested=false;  
  12.             throw thread_interrupted();  
  13.         }  
  14.     }  
  15.   
  16.     void operator=(interruption_checker&);  
  17. public:  
  18.     explicit interruption_checker(pthread_mutex_t* cond_mutex,pthread_cond_t* cond):  
  19.         thread_info(detail::get_current_thread_data()),m(cond_mutex),  
  20.         set(thread_info && thread_info->interrupt_enabled)  
  21.     {  
  22.         if(set)  
  23.         {  
  24.             lock_guard<mutex> guard(thread_info->data_mutex);  
  25.             check_for_interruption();  
  26.             thread_info->cond_mutex=cond_mutex;  
  27.             thread_info->current_cond=cond;  
  28.             BOOST_VERIFY(!pthread_mutex_lock(m));  
  29.         }  
  30.         else  
  31.         {  
  32.             BOOST_VERIFY(!pthread_mutex_lock(m));  
  33.         }  
  34.     }  
  35.     ~interruption_checker()  
  36.     {  
  37.         if(set)  
  38.         {  
  39.             BOOST_VERIFY(!pthread_mutex_unlock(m));  
  40.             lock_guard<mutex> guard(thread_info->data_mutex);  
  41.             thread_info->cond_mutex=NULL;  
  42.             thread_info->current_cond=NULL;  
  43.         }  
  44.         else  
  45.         {  
  46.             BOOST_VERIFY(!pthread_mutex_unlock(m));  
  47.         }  
  48.     }  

代码面前,毫无隐藏。那句话就是此时如果有interrupt,那么就interrupt吧。否则,lock传入的mutex,也是为了res=pthread_cond_wait(&cond,&internal_mutex);做准备。

 

关于线程的中断点,可以移步《【Boost】boost库中thread多线程详解5——谈谈线程中断》。

对于boost::mutex,大家可以使用同样的方法去解读boost的实现,相对于condition variable,mutex的实现更加直观。代码路径:/boost/thread/pthread/mutex.hpp。

气味

[cpp] view plaincopy在CODE上查看代码片派生到我的代码片
  1. namespace boost  
  2. {  
  3.     class mutex  
  4.     {  
  5.     private:  
  6.         mutex(mutex const&);  
  7.         mutex& operator=(mutex const&);  
  8.         pthread_mutex_t m;  
  9.     public:  
  10.         mutex()  
  11.         {  
  12.             int const res=pthread_mutex_init(&m,NULL);  
  13.             if(res)  
  14.             {  
  15.                 boost::throw_exception(thread_resource_error());  
  16.             }  
  17.         }  
  18.         ~mutex()  
  19.         {  
  20.             BOOST_VERIFY(!pthread_mutex_destroy(&m));  
  21.         }  
  22.   
  23.         void lock()  
  24.         {  
  25.             int const res=pthread_mutex_lock(&m);  
  26.             if(res)  
  27.             {  
  28.                 boost::throw_exception(lock_error(res));  
  29.             }  
  30.         }  
  31.   
  32.         void unlock()  
  33.         {  
  34.             BOOST_VERIFY(!pthread_mutex_unlock(&m));  
  35.         }  
  36.   
  37.         bool try_lock()  
  38.         {  
  39.             int const res=pthread_mutex_trylock(&m);  
  40.             if(res && (res!=EBUSY))  
  41.             {  
  42.                 boost::throw_exception(lock_error(res));  
  43.             }  
  44.   
  45.             return !res;  
  46.         }  
  47.   
  48.         typedef pthread_mutex_t* native_handle_type;  
  49.         native_handle_type native_handle()  
  50.         {  
  51.             return &m;  
  52.         }  
  53.   
  54.         typedef unique_lock<mutex> scoped_lock;  
  55.         typedef detail::try_lock_wrapper<mutex> scoped_try_lock;  
  56.     };   
  57. }  

 

boost对于pthread_mutex_t和pthread_cond_t的封装,方便了开发者的使用的资源的安全有效管理。当然,在不同的公司,可能也都有类似的封装,学习boost的源码,无疑可以加深我们的理解。在某些特定的场合,我们也可以学习boost的封装方法,简化我们的日常开发。

最后,奉上简单的生产者、消费者的boost的实现,和前文《并发编程实战: POSIX 使用互斥量和条件变量实现生产者/消费者问题》相比,我们可以看到boost简化了mutex和condition variable的使用。以下代码引自《Boost程序库完全开发指南》:

 

[cpp] view plaincopy在CODE上查看代码片派生到我的代码片
  1. #include <boost/thread.hpp>  
  2. #include <stack>  
  3. using std::stack;  
  4. using std::cout;  
  5. class buffer  
  6. {  
  7. private:  
  8.     boost::mutex mu; // 条件变量需要配合互斥量  
  9.     boost::condition_variable_any cond_put; // 生产者写入  
  10.     boost::condition_variable_any cond_get; // 消费者读走  
  11.   
  12.     stack<int> stk;  
  13.     int un_read;  
  14.     int capacity;  
  15.   
  16.     bool is_full()  
  17.     {  
  18.         return un_read == capacity;  
  19.     }  
  20.     bool is_empty()  
  21.     {  
  22.         return 0 == un_read;  
  23.     }  
  24.   
  25. public:  
  26.     buffer(size_t capacity) : un_read(0), capacity(capacity)  
  27.     {}  
  28.     void put(int x)  
  29.     {  
  30.   
  31.         boost::mutex::scoped_lock lock(mu); // 这里是读锁的门闩类  
  32.   
  33.         while (is_full())  
  34.         {  
  35.             cout << "full waiting..." << endl;  
  36.             cond_put.wait(mu); // line:51  
  37.         }  
  38.         stk.push(x);  
  39.         ++un_read;  
  40.   
  41.         cond_get.notify_one();  
  42.     }  
  43.     void get(int *x)  
  44.     {  
  45.         boost::mutex::scoped_lock lock(mu); // 这里是读锁的门闩类  
  46.   
  47.         while (is_empty())  
  48.         {  
  49.             cout << "empty waiting..." << endl;  
  50.             cond_get.wait(mu);  
  51.         }  
  52.         *x = stk.top();  
  53.         stk.pop();  
  54.         --un_read;  
  55.   
  56.         cond_put.notify_one(); // 通知 51line可以写入了  
  57.     }  
  58. };  
  59.   
  60. buffer buf(5);   
  61.   
  62. void producer(int n)  
  63. {  
  64.     for (int i = 0; i < n; ++i)  
  65.     {  
  66.         cout << "put : " << i << endl;  
  67.         buf.put(i);  
  68.     }  
  69. }  
  70.   
  71. void consumer(int n)  
  72. {  
  73.     int x;  
  74.     for (int i = 0; i < n; ++i)  
  75.     {  
  76.         buf.get(&x);  
  77.         cout << "get : " << x << endl;  
  78.     }  
  79. }  
  80.   
  81. int main()  
  82. {  
  83.     boost::thread t1(producer, 20);  
  84.     boost::thread t2(consumer, 10);  
  85.     boost::thread t3(consumer, 10);  
  86.   
  87.     t1.join();  
  88.     t2.join();  
  89.     t3.join();  
  90.   
  91.     return 0;  
  92. }  

最后说一句,condition_variable_any == condition, from /boost/thread/condition.hpp

 

 

[cpp] view plaincopy在CODE上查看代码片派生到我的代码片
 
  1. namespace boost  
  2. {  
  3.     typedef condition_variable_any condition;  
  4. }  

原文地址:https://www.cnblogs.com/ghostll/p/3546077.html