ACE Reactor学习 --UDP

两种IO多路复用方案:Reactor and Proactor

一般情况下,I/O 复用机制需要事件分享器(event demultiplexor [13]). 事件分享器的作用,即将那些读写事件源分发给各读写事件的处理者,就像送快递的在楼下喊: 谁的什么东西送了, 快来拿吧。开发人员在开始的时候需要在分享器那里注册感兴趣的事件,并提供相应的处理者(event handlers),或者是回调函数; 事件分享器在适当的时候会将请求的事件分发给这些handler或者回调函数.

涉及到事件分享器的两种模式称为:Reactor and Proactor [1]. Reactor模式是基于同步I/O的,而Proactor模式是和异步I/O相关的. 在Reactor模式中,事件分离者等待某个事件或者可应用或个操作的状态发生(比如文件描述符可读写,或者是socket可读写),事件分离者就把这个事件传给事先注册的事件处理函数或者回调函数,由后者来做实际的读写操作。

而在Proactor模式中,事件处理者(或者代由事件分离者发起)直接发起一个异步读写操作(相当于请求),而实际的工作是由操作系统来完成的。发起时,需要提供的参数包括用于存放读到数据的缓存区,读的数据大小,或者用于存放外发数据的缓存区,以及这个请求完后的回调函数等信息。事件分离者得知了这个请求,它默默等待这个请求的完成,然后转发完成事件给相应的事件处理者或者回调。举例来说,在Windows上事件处理者投递了一个异步IO操作(称有overlapped的技术),事件分离者等IOCompletion事件完成[1]. 这种异步模式的典型实现是基于操作系统底层异步API的,所以我们可称之为“系统级别”的或者“真正意义上”的异步,因为具体的读写是由操作系统代劳的。

举另外个例子来更好地理解Reactor与Proactor两种模式的区别。这里我们只关注read操作,因为write操作也是差不多的。下面是Reactor的做法:

  • 某个事件处理者宣称它对某个socket上的读事件很感兴趣;
  • 事件分离者等着这个事件的发生;
  • 当事件发生了,事件分离器被唤醒,这负责通知先前那个事件处理者;
  • 事件处理者收到消息,于是去那个socket上读数据了. 如果需要,它再次宣称对这个socket上的读事件感兴趣,一直重复上面的步骤;

下面再来看看真正意义的异步模式Proactor是如何做的:

  • 事件处理者直接投递发一个写操作(当然,操作系统必须支持这个异步操作). 这个时候,事件处理者根本不关心读事件,它只管发这么个请求,它魂牵梦萦的是这个写操作的完成事件。这个处理者很拽,发个命令就不管具体的事情了,只等着别人(系统)帮他搞定的时候给他回个话。
  • 事件分离者等着这个读事件的完成(比较下与Reactor的不同);
  • 当事件分离者默默等待完成事情到来的同时,操作系统已经在一边开始干活了,它从目标读取数据,放入用户提供的缓存区中,最后通知事件分离者,这个事情我搞完了;
  • 事件分享者通知之前的事件处理者: 你吩咐的事情搞定了;
  • 事件处理者这时会发现想要读的数据已经乖乖地放在他提供的缓存区中,想怎么处理都行了。如果有需要,事件处理者还像之前一样发起另外一个写操作,和上面的几个步骤一样。

一个Reactor的例子:

事件处理器,继承自 ACE_Event_Handler 

头文件:

class Dgram_Endpoint : public ACE_Event_Handler
{
public:
Dgram_Endpoint(u_short local_port);
~Dgram_Endpoint(void);

int open();

virtual ACE_HANDLE get_handle(void) const;
virtual int handle_input(ACE_HANDLE handle);
virtual int handle_timeout(const ACE_Time_Value & tv, const void *arg = 0);
virtual int handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask);
int send(const char *buf, size_t len, const ACE_INET_Addr &);
private:
ACE_INET_Addr m_adrLocal; //本地地址信息
ACE_SOCK_Dgram endpoint_;


};

源文件:

Dgram_Endpoint::Dgram_Endpoint(u_short local_port):m_adrLocal(local_port),
endpoint_(m_adrLocal),
{
}

Dgram_Endpoint::~Dgram_Endpoint(void)
{
}

int Dgram_Endpoint::open()
{
int nRet = ACE_Reactor::instance()->register_handler(this, ACE_Event_Handler::READ_MASK);
return nRet;
}

int Dgram_Endpoint::send(const char *buf, size_t len, const ACE_INET_Addr &addr)
{
return this->endpoint_.send (buf, len, addr);
}


ACE_HANDLE Dgram_Endpoint::get_handle (void) const
{
return this->endpoint_.get_handle ();
}

int Dgram_Endpoint::handle_close (ACE_HANDLE handle, ACE_Reactor_Mask)
{

//close_mask = ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL;
//this->reactor()->remove_handler(this, close_mask);


ACE_UNUSED_ARG (handle);
this->endpoint_.close ();
delete this;
return 0;
}

int Dgram_Endpoint::handle_input(ACE_HANDLE)
{
char buf[2048];
memset(buf, 0, 2048);
ACE_INET_Addr from_addr;

//ACE_HANDLE handle = this->endpoint_.get_handle();

ACE_DEBUG ((LM_DEBUG, "(%P|%t) activity occurred on handle %d! ", this->endpoint_.get_handle()));

ssize_t n = this->endpoint_.recv(buf, 2048, from_addr, 0, &ACE_Time_Value(0));

if (n == -1)
{
ACE_ERROR ((LM_ERROR, "%p ", "handle_input"));
}
else
{
ACE_DEBUG ((LM_DEBUG, "(%P|%t) buf of size %d = %*s ", n, n, buf));

处理数据

。。。。。。。

}
return 0;

/////////////////////////////////////////////////////////////////////////////////////////////

}

int Dgram_Endpoint::handle_timeout (const ACE_Time_Value &, const void *)
{
ACE_DEBUG ((LM_DEBUG, "(%P|%t) timed out for endpoint "));
return 0;
}

然后是启动处理器Dgram_Endpoint 

两种方法,一种是使用ACE_TASK,另一种是使用ACE Thread

1. 使用ACE_TASK:

头文件

class CUdpReceiver:public ACE_Task<ACE_MT_SYNCH>
{
public:
CUdpReceiver();
virtual ~CUdpReceiver(void);


public:
virtual int open();
virtual int svc();
virtual int close();
virtual int fini() ;

private:
ACE_thread_t threads[0x01];
Dgram_Endpoint* m_dgramep;

};

源文件

CUdpReceiver::CUdpReceiver( ):m_dgramep(NULL)
{
m_dgramep = new Dgram_Endpoint(TC_PORT_NUM);
}

CUdpReceiver::~CUdpReceiver( )
{
}

int CUdpReceiver::open()
{
//return activate(THR_NEW_LWP, 1, 0, ACE_DEFAULT_THREAD_PRIORITY, -1, this, 0, 0, 0, threads);

/////////////////////////////////////////////////////////////////////////////////////////////////////////////

int nOpen = m_dgramep->open();
if (-1 == nOpen)
{
ACE_ERROR_RETURN ((LM_ERROR, "ACE_Reactor::register_handler"), -1);
return -1;
}

int n = activate(THR_NEW_LWP,1);

return n;

}

int CUdpReceiver::svc()
{
int result = ACE_Reactor::instance()->run_reactor_event_loop();
if(result == -1)
{
return -1;
}
return 0;

}

int CUdpReceiver::close()
{
//ACE_Thread_Manager::instance()->wait();
return 0;

}
int CUdpReceiver::fini()
{
// 关闭与任务相关的消息队列,然后删除队列中的所有消息,并发信号给池中的线程
flush();
// 等待线程池中的线程退出
wait();
// 关闭
close();
return 0;
}

2. 使用ACE Thread启动

 定义线程函数:

  1. static ACE_THR_FUNC_RETURN  LoopThread(void *arg)   
  2. {   
  3.     ACE_Reactor *reactor = static_cast<ACE_Reactor *> (arg);   
  4.    
  5.     reactor->owner (ACE_OS::thr_self ());   
  6.     //reactor->restart(true);   
  7.     reactor->run_reactor_event_loop ();   
  8.     return 0;   

启动线程函数 :

          Dgram_Endpoint* m_dgramep = new Dgram_Endpoint(TC_PORT_NUM);

          int nOpen = m_dgramep->open();

  •  ACE_Thread_Manager::instance ()->spawn_n
  • (2,LoopThread, ACE_Reactor::instance ());   

最后,通常在main()的最后,等待所有线程退出:

 ACE_Thread_Manager::instance ()->wait ();  

另外,使用Reactor有许多需要注意的地方,参考:

有效使用反应堆Reactor的设计准则 - CSDN博客  http://blog.csdn.net/gunwithrose/article/details/1683374

 有效使用反应堆Reactor的设计准则(转)-GilBert1987-ChinaUnix博客  http://blog.chinaunix.net/uid-17102734-id-2830180.html

其他例子:

[ACE程序员教程笔记]使用ACE_Connector连接服务器 - CSDN博客  http://blog.csdn.net/maxcode/article/details/6126400 

[ACE程序员教程笔记]ACE_Connector使用一个连接多个线程发送数据 - CSDN博客  http://blog.csdn.net/maxcode/article/details/6126551

参考:

udp_reactor.rar udp_reactor.cpp  http://read.pudn.com/downloads206/sourcecode/unix_linux/972540/udp_reactor.cpp__.htm

采用C++的ACE库实现的一个通用的udp通信服务器程序 - CSDN博客  http://blog.csdn.net/itclock/article/details/1036647

原文地址:https://www.cnblogs.com/nanzhi/p/8489212.html