基于boost 线程并行技术实现消息队列方式[记录]

#include <queue>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <boost/thread/tss.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>


#include <iostream>
using namespace std;
using namespace boost;


boost::recursive_mutex io_mutex;
boost::condition_variable_any cond;
std::queue<int>  iq;

class printer
{
public:
    printer(boost::asio::io_service& io,int n)
        : strand_(io),
        timer1_(io, boost::posix_time::seconds(1)),
        timer2_(io, boost::posix_time::seconds(1)),
        count_(n)
    {
        timer2_.async_wait(boost::bind(&printer::enqueue, this));
        timer1_.async_wait(boost::bind(&printer::dequeue, this));
       
    }

    ~printer()
    {
         boost::recursive_mutex::scoped_lock  lock(io_mutex);
        std::cout << "Final count is " << count_ << "\n";
    }

    void dequeue()
    {
        boost::recursive_mutex::scoped_lock  lock(io_mutex);
        while(iq.empty())
        {
          cond.wait(lock);       
       
        }
        int pop =0;
        if (!iq.empty())
        {           
            pop = iq.front();
            iq.pop();
            cout<<"------------pop "<<pop<<endl;
        }
        cond.notify_all();
      
        timer1_.expires_at(timer1_.expires_at() + boost::posix_time::seconds(1));
        timer1_.async_wait(boost::bind(&printer::dequeue, this));       
    }

    void enqueue()
    {
        boost::recursive_mutex::scoped_lock  lock(io_mutex);        

        while(iq.size() == 1000)
        {
          cond.wait(lock);           
        }

        iq.push(count_);;
        cond.notify_all();
        cout<<"------------push "<<count_<<endl;
        timer1_.expires_at(timer1_.expires_at() + boost::posix_time::seconds(1));
        timer1_.async_wait(boost::bind(&printer::enqueue, this));          
    }

private:
    boost::asio::strand strand_;
    boost::asio::deadline_timer timer1_;
    boost::asio::deadline_timer timer2_;
    int count_;
};


int main()
{
    thread_group threads;
    boost::asio::io_service io;
    

    printer * p[1000];
    int num = 1000;
    for(int i = 0; i < num; i++)
    {
        p[i] = new printer(io,i);

        threads.create_thread(boost::bind(&boost::asio::io_service::run, &io));
    }

   
    io.run();   
    threads.join_all();
   

    std::system("pause");
    return 0;
}
原文地址:https://www.cnblogs.com/toosuo/p/2459764.html