ACE_linux:任务 & 命令(Task and Command)

1.涉及类

ACE_Task//ACE任务
ACE_Activation_Queue//ACE命令队列

ACE_Method_Request//ACE请求(命令)

2.简介

ACE主动对象模式

主动对象模式用于降低方法执行和方法调用之间的耦合。该模式描述了另外一种更为透明的任务间通信方法。

传统上,所有的对象都是被动的代码段,对象中的代码是在对它发出方法调用的线程中执行的,当方法被调用时,调用线程将阻塞,直至调用结束。而主动对象却不一样。这些对象具有自己的命令执行线程,主动对象的方法将在自己的执行线程中执行,不会阻塞调用方法。

3.代码示例

 1 #include "ace/Task.h"
 2 #include "ace/Method_Request.h"
 3 #include "ace/Activation_Queue.h"
 4 #include <iostream>
 5 using namespace std;
 6 
 7 class Logger: public ACE_Task<ACE_MT_SYNCH>
 8 {
 9 public:
10     Logger() {}
11     int svc();
12     void LogMsg(const string& msg);
13     void LogMsgActive (const string& msg);
14 private:
15     ACE_Activation_Queue cmdQueue;    //命令队列
16 };
17 class LogMsgCmd: public ACE_Method_Request
18 {
19 public:
20     LogMsgCmd(Logger *log,const string& msg)
21     {
22         this->log=log;
23         this->msg=msg;
24     }
25     virtual int call()
26     {
27         log->LogMsg(msg);
28         return 0;
29     }
30 private:
31     Logger *log;
32     string msg;
33 };
34 void Logger::LogMsg(const string& msg)
35 {
36     cout<<msg<<endl;
37 }
38 //以主动的方式记录日志
39 void Logger::LogMsgActive(const string& msg)
40 {
41     //生成命令对象,插入到命令队列中
42     cmdQueue.enqueue(new LogMsgCmd(this,msg));   //enqueue  入队列
43 }
44 int Logger::svc()
45 {
46     while(true)
47     {
48         //遍历命令队列,执行命令(auto_ptr所做的事情,就是动态分配对象以及当对象不再需要时自动执行清理)
49         auto_ptr<ACE_Method_Request> pMsgCmd(cmdQueue.dequeue ());  //dequeue 出队列
50         pMsgCmd->call();
51     }
52     return 0;
53 }
54 int main (int argc, ACE_TCHAR *argv[])
55 {
56     Logger log;
57     log.activate();
58 
59     for(int i=0;i<4;i++)
60     {
61         log. LogMsgActive ("hello");
62         ACE_OS::sleep(1);
63         log.LogMsgActive("book");
64         ACE_OS::sleep(1);
65     }
66 
67     log.wait();
68     return 0;
69 }
Logger.cpp

4.结果

1 $ ./tt
2 hello
3 book
4 hello
5 book
6 hello
7 book
8 hello
9 book
View Code

**************************************************************************************

高级篇:一个基于ACE的负载自适应万能线程池实现

转摘自:http://www.cppblog.com/cppx/archive/2011/02/28/140808.html#140836

**************************************************************************************

i.代码

 1 #include <map>
 2 #include "ace/Task.h" 
 3 
 4 // 线程状态
 5 enum LF_Status_t
 6 {
 7     TH_LEADER_ACTIVE,
 8     TH_FOLLOWER,
 9     TH_WORKER,
10     TH_READY,
11     TH_STOP,
12 };
13 
14 // 线程属性
15 struct LF_StatusTime_t
16 {
17     LF_Status_t    status;
18     ACE_Time_Value working_tv;  //线程最后一次任务的时间间隔
19     ACE_Time_Value start_time;  //线程开始时间
20     ACE_Time_Value stop_time;   //线程结束时间
21     ACE_Time_Value work_start;  //线程最后一次任务的开始时间
22     ACE_Time_Value work_time;   //截止目前线程运行总时间
23 };
24 
25 // 线程集合
26 typedef std::map<ACE_thread_t,LF_StatusTime_t>  LF_StatusTimeList_t;
27 
28 
29 // 追随者
30 class LF_Follower
31 {
32 public:
33     LF_Follower(ACE_Thread_Mutex &leader_lock) : m_cond(leader_lock) 
34     {
35         m_owner = ACE_Thread::self();
36     }
37     int wait(void){
38         return m_cond.wait();
39     }
40     int signal(void){
41         return m_cond.signal();
42     }
43     ACE_thread_t owner(void){
44         return m_owner;
45     }
46 
47 private:
48     ACE_Condition<ACE_Thread_Mutex> m_cond;
49     ACE_thread_t                    m_owner;
50 
51 
52 };
53 
54 
55 // 领导者-追随者线程池 模式实现
56 class LeaderFollower
57 {
58 public:
59     LeaderFollower(void);
60     ~LeaderFollower(void);
61 
62 protected:
63     LF_Follower * make_follower(void);//增加追随者
64     int become_leader(void);//自己轮岗为领导,若没有空缺,先等待
65     int elect_new_leader(void);//领导离任前,选举出新领导
66     bool leader_active(void);
67     void set_active_leader(ACE_thread_t leader);
68 
69 private:
70     ACE_thread_t                        m_current_leader;//领导者(角色转换--轮岗)
71     ACE_Unbounded_Queue<LF_Follower*>   m_followers;     //追随者们(领导者是其中的头一个)
72     ACE_Thread_Mutex                    m_followers_lock;
73     ACE_Thread_Mutex                    m_leader_lock;
74 
75     //////////////////////////////////////////////////////////////////////////
76     /// 线程池状态监控
77 public:
78     const LF_StatusTimeList_t & get_status(void) const;   //现状,线程集合
79     const float get_load_rate(void) const;                //获取的负载率
80 
81 protected:
82     void set_status(LF_Status_t status);
83     void set_worktime(ACE_Time_Value work_time);
84 
85 private:
86     LF_StatusTimeList_t m_status_time_list;
87     ACE_Thread_Mutex    m_status_lock;
88 };
LeaderFollower.h
  1 #include "LeaderFollower.h"
  2 
  3 //////////////////////////////////////////////////////////////////////////
  4 LeaderFollower::LeaderFollower(void) :
  5 m_current_leader(0)
  6 {
  7 }
  8 
  9 LeaderFollower::~LeaderFollower(void)
 10 {
 11 }
 12 
 13 LF_Follower * 
 14 LeaderFollower::make_follower( void )
 15 {
 16     ACE_GUARD_RETURN(ACE_Thread_Mutex, follower_mon, m_followers_lock, 0);
 17 
 18     LF_Follower *fw;
 19     ACE_NEW_RETURN(fw, LF_Follower(m_leader_lock), 0);
 20     m_followers.enqueue_tail(fw);
 21     //ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) make_follower 	: Now has %d followers.
"), m_followers.size()));
 22     return fw;
 23 }
 24 
 25 int 
 26 LeaderFollower::become_leader( void )
 27 {
 28     ACE_GUARD_RETURN(ACE_Thread_Mutex, leader_mon, m_leader_lock, -1);
 29 
 30     if( leader_active() && m_current_leader != ACE_Thread::self() ){
 31         while(leader_active()){
 32             set_status(TH_FOLLOWER);
 33             auto_ptr<LF_Follower> fw(make_follower());
 34             fw->wait();         // Wait until told to do so.
 35         }
 36     }
 37 
 38     // Mark yourself as the active leader.
 39     set_active_leader(ACE_Thread::self());
 40     set_status(TH_LEADER_ACTIVE);
 41     //ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%t) become_leader 	: Becoming the leader.
")));
 42     return 0;
 43 }
 44 
 45 int 
 46 LeaderFollower::elect_new_leader( void )
 47 {
 48     ACE_GUARD_RETURN(ACE_Thread_Mutex, leader_mon, m_leader_lock, -1);
 49 
 50     set_active_leader(0);
 51 
 52     // Wake up a follower
 53     if( !m_followers.is_empty() ){
 54         ACE_GUARD_RETURN(ACE_Thread_Mutex, follower_mon, m_followers_lock, -1);
 55 
 56         // Get the old follower.
 57         LF_Follower *fw;
 58         if( m_followers.dequeue_head(fw) != 0 )
 59             return -1;
 60 
 61         //ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) elect_new_leader : Resigning and electing %d.
"), fw->owner()));
 62         return (fw->signal() == 0) ? 0 : -1;
 63     }
 64 
 65     //ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) elect_new_leader : Oops no followers left
")));
 66     return -1;
 67 }
 68 
 69 bool 
 70 LeaderFollower::leader_active( void )
 71 {
 72     return (m_current_leader != 0);
 73 }
 74 
 75 void 
 76 LeaderFollower::set_active_leader( ACE_thread_t leader )
 77 {
 78     m_current_leader = leader;
 79 }
 80 
 81 void LeaderFollower::set_worktime( ACE_Time_Value work_time )
 82 {
 83     ACE_GUARD(ACE_Thread_Mutex, status_worktime, m_status_lock);
 84     LF_StatusTime_t & info = m_status_time_list[ACE_Thread::self()];
 85     info.working_tv = work_time;
 86 }
 87 
 88 void LeaderFollower::set_status( LF_Status_t status )
 89 {
 90     ACE_GUARD(ACE_Thread_Mutex, status_guard, m_status_lock);
 91     LF_StatusTime_t & info = m_status_time_list[ACE_Thread::self()];
 92     switch(status)
 93     {
 94     case TH_READY:
 95         info.start_time = ACE_OS::gettimeofday();
 96         break;
 97     case TH_STOP:
 98         info.stop_time = ACE_OS::gettimeofday();
 99         break;
100     case TH_WORKER:
101         info.work_start = ACE_OS::gettimeofday();
102         break;
103     case TH_LEADER_ACTIVE:
104     case TH_FOLLOWER:
105         if( info.status == TH_WORKER )
106             info.work_time += ACE_OS::gettimeofday() - info.work_start;
107         break;
108     }
109     info.status = status;
110 }
111 
112 const LF_StatusTimeList_t & 
113 LeaderFollower::get_status( void ) const
114 {
115     return m_status_time_list;
116 }
117 
118 const float 
119 LeaderFollower::get_load_rate( void ) const
120 {
121     ACE_Time_Value work_time,run_time;
122 
123     for(LF_StatusTimeList_t::const_iterator iter = m_status_time_list.begin(); iter != m_status_time_list.end(); ++iter)
124         //foreach(const LF_StatusTimeList_t::value_type & info,get_status())
125     {
126         if( iter->second.status != TH_STOP ){
127             work_time += iter->second.work_time;
128             run_time += ACE_OS::gettimeofday() - iter->second.start_time;
129         }
130     }
131     return (float)work_time.usec()/run_time.usec()*100;
132 }
LeaderFollower.cpp
 1 #include "LeaderFollower.h"
 2 
 3 #include "ace/Activation_Queue.h"
 4 #include "ace/Method_Request.h"
 5 
 6 class ACE_Export LF_ThreadPool :
 7     public ACE_Task_Base,
 8     public LeaderFollower
 9 {
10 public:
11     LF_ThreadPool(void);
12     ~LF_ThreadPool(void);
13 
14     virtual int svc(void);
15 
16     int start_stread_pool( void );
17     int stop_thread_pool( void );
18     int post_request( ACE_Method_Request *request );// 任务请求接口
19 
20     int get_queue_load(void){ return m_activation_queue_.method_count(); }
21     int get_max_thread(void){ return MaxThreadNum; }
22     int get_min_thread(void){ return MinThreadNum; }
23 
24 private:
25     int _fork_new_thread( void );
26     int _post_exit_request(void);
27 
28 private:
29     class ExitRequest : public ACE_Method_Request //内置退出请求
30     {
31     public:
32         virtual int call (void){
33             return -1;  // Cause exit.
34         }
35     };
36 
37     bool m_bShutdown;
38     bool m_bRunning;
39     ACE_Activation_Queue m_activation_queue_;  ////命令队列
40 
41     static const size_t ScheduleTime = 10;
42     static const size_t MinThreadNum = 2;
43     static const size_t MaxThreadNum = 20;
44 
45 };
LF_ThreadPool.h
  1 #include "LF_ThreadPool.h"
  2 
  3 LF_ThreadPool::LF_ThreadPool(void) :
  4 m_bShutdown(false),
  5 m_bRunning(false)
  6 {
  7 }
  8 
  9 LF_ThreadPool::~LF_ThreadPool(void)
 10 {
 11 }
 12 
 13 int LF_ThreadPool::svc( void )
 14 {
 15     //ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) Thread started.	: %d working threads left.
"),thr_count()));
 16 
 17     // 线程开始运行
 18     m_bRunning = true;
 19 
 20     set_status(TH_READY);
 21 
 22     while(true){
 23         // Block until this thread is the leader.
 24         become_leader();
 25 
 26         // 设置线程空闲时间,空闲线程将会自动退出
 27         ACE_Time_Value tv(ScheduleTime);
 28         tv += ACE_OS::gettimeofday();
 29 
 30         // 从队列获取下一个请求,并获得所有权;长时间没有请求,dequeue超时返回
 31         auto_ptr<ACE_Method_Request> request(m_activation_queue_.dequeue(&tv));
 32         if( request.get() == NULL )
 33         {                                               
 34             // 成功选择新的领导者,且工作线程数大于最少线程数-- 结束当前线程
 35             if( elect_new_leader() == 0 && thr_count() > MinThreadNum )         
 36             { break;    }                                                      
 37             if( thr_count() < MinThreadNum )      // 工作线程数小于最少线程数,创建新的线程
 38             { _fork_new_thread();}
 39             continue;                    // 继续担当领导者(优先成为领导者),或返回线程池等待
 40         }
 41 
 42         // Elect a new leader then process the request
 43         // 没有空余线程可成为领导者,或者线程池容量调整
 44         if( elect_new_leader() != 0 || thr_count() < MinThreadNum )             
 45         {
 46             if( !m_bShutdown )                                 // 且没有调度关闭
 47                 if( thr_count() < MaxThreadNum )                 // 未达到线程数上线
 48                     _fork_new_thread();                            // 创建新的线程
 49         }
 50 
 51         // Invoke the method request.    调用
 52         set_status(TH_WORKER);
 53 
 54         ACE_Time_Value tv_start,tv_finish,tv_working;
 55         tv_start = ACE_OS::gettimeofday();
 56 
 57         int result = request->call();   // 调用
 58 
 59         tv_finish = ACE_OS::gettimeofday();
 60         tv_working = tv_finish - tv_start;
 61         set_worktime(tv_working);      // 任务执行耗时
 62 
 63         // If received a ExitMethod, Notify the next Thread(if exists) to exit too.
 64         if( result == -1 )
 65         {
 66             if( thr_count() > 1 )      
 67                 _post_exit_request();
 68             break;
 69         }
 70     }
 71 
 72     // 剩下最后一个线程,线程池停止
 73     if( thr_count() == 1 )
 74         m_bRunning = false;
 75 
 76     set_status(TH_STOP);
 77     //ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) Thread stoped.	: %d working threads left.
"),thr_count()-1));
 78     return 0;
 79 }
 80 
 81 int LF_ThreadPool::start_stread_pool( void )
 82 {
 83     m_bShutdown = false;
 84     int iRet = activate(THR_NEW_LWP| THR_JOINABLE,MinThreadNum);
 85     ACE_OS::sleep(1);//启动线程需要1秒钟,不然会出异常
 86     return  iRet;
 87 }
 88 
 89 int LF_ThreadPool::stop_thread_pool( void )
 90 {
 91     // 线程池已停止
 92     if( !m_bRunning )
 93         return 0;
 94 
 95     m_bShutdown = true;
 96     _post_exit_request();
 97     return wait();
 98 }
 99 
100 int LF_ThreadPool::post_request( ACE_Method_Request *request )
101 {
102     ACE_TRACE (ACE_TEXT ("SvcThreadPool::enqueue"));
103     return m_activation_queue_.enqueue (request);
104 }
105 
106 int LF_ThreadPool::_fork_new_thread( void )
107 {
108     return activate(THR_NEW_LWP| THR_JOINABLE,1,1);
109 }
110 
111 int LF_ThreadPool::_post_exit_request( void )
112 {
113     return post_request(new ExitRequest);
114 }
LF_ThreadPool.cpp
 1 #include <iostream>
 2 
 3 #include "LF_ThreadPool.h"
 4 using std::string;
 5 
 6 
 7 class Logger
 8 {
 9 public:
10     Logger() {}
11     int init();
12     int end();
13     int currentNumberOfThreads(){return m_pool.thr_count();}
14     void LogMsg(const string& msg);//记录日志
15     void LogMsgActive (const string& msg);//记录日志触发
16 private:
17     LF_ThreadPool m_pool;    //命令队列
18 };
19 
20 
21 
22 class LogMsgCmd: public ACE_Method_Request
23 {
24 public:
25     LogMsgCmd(Logger *log,const string& msg)
26     {
27         this->log=log;
28         this->msg=msg;
29     }
30     virtual int call()
31     {
32         log->LogMsg(msg);
33         return 0;
34     }
35 private:
36     Logger *log;
37     string msg;
38 };
39 void Logger::LogMsg(const string& msg)
40 {
41     std::cout<<ACE_Thread::self()<<" : "<<msg<<std::endl;
42     ACE_OS::sleep(4);
43 }
44 //以主动的方式记录日志
45 void Logger::LogMsgActive(const string& msg)
46 {
47     //生成命令对象,插入到命令队列中
48     m_pool.post_request(new LogMsgCmd(this,msg));
49 }
50 int Logger::init()
51 {
52     m_pool.start_stread_pool();
53     return 0;
54 }
55 int Logger::end()
56 {
57     m_pool.stop_thread_pool();
58     return 0;
59 }
60 int main (int argc, ACE_TCHAR *argv[])
61 {
62     ACE_DEBUG((LM_DEBUG,ACE_TEXT("(%t) Thread main is started.
"))); 
63     Logger log;
64     log.init();
65 
66     for(int i=0;i<3;i++)
67     {
68         log.LogMsgActive ("hello");
69         //ACE_OS::sleep(1);
70         log.LogMsgActive("book");
71         //ACE_OS::sleep(1);
72     }
73 
74     ACE_OS::sleep(2);
75     std::cout<<"Current number of threads : "<<log.currentNumberOfThreads()<<std::endl;
76     log.end();
77     ACE_DEBUG((LM_DEBUG,ACE_TEXT("(%t) Thread main is stoped.
"))); 
78     return 0;
79 }
test.cpp

ii.结果

$ ./tt
(140498135467808) Thread main is started.
140498124969728 : hello
140498135459584 : book
140498043926272 : hello
140498033436416 : book
140498022946560 : hello
140498012456704 : book
Current number of threads : 7
(140498135467808) Thread main is stoped.
View Code

领导者/追随者线程池模型:在一组预先分配的线程中通过“互斥”锁来同步线程之间的行为,“线程”们通过“民主选举”选出一位代表“领导者”站在最前端接收请求,拿到“任务”后,就从身后的候选“继任者”中选出一个线程代替自己作为“领导者”,自己则变成“工作者”就跑到后面默默去执行处理命令,这个“任务”是一个包含待处理数据和处理逻辑的自说明性任务,也就是说所有的线程不必事先知道怎么处理接收到的任务,因为他所拿到的“任务包”中就包含了如何处理任务的说明。就像一个“代工工厂”的工人一样,无需任何文化基础,会干活就行。

GOOD LUCK !

原文地址:https://www.cnblogs.com/book-gary/p/4241564.html