一个ACE 架构的 Socket Client

.h

/**************************************************************
 *  Filename:    TcpClient.h
 *  Copyright:   Shanghai X Co., Ltd.
 *
 *  Description: TcpClient头文件.
 *
 *  @author:     w
 *  @version     10/28/2016  @Reviser  Initial Version
 **************************************************************/

#ifndef _TCPCLIENT_
#define _TCPCLIENT_

#include <string>
#include <ace/Svc_Handler.h>
#include <ace/Connector.h>
#include <ace/SOCK_Connector.h>
#include <ace/Task.h>
 
using namespace std; 

//连接状态改变时回调
typedef void (__stdcall *pfnConnectChangeCallBack)(bool);
//接收到数据时回调
typedef void (__stdcall *pfnReceiveCallBack)(char*, const int); 

class CTcpClient : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH>
{
public:
    // 是否退出的标识
    long    m_lStop;                    

public:
    // 是否允许重连 
    bool    m_nReconnect;                
    // 通信超时ms 
    int        m_nCommunicateTimeOut;        

public:    
    //typedef ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH> Base;    
    CTcpClient();
    virtual ~CTcpClient();
    
public:
    /**
     *  设置连接参数.
     *         
     *  @param        -[in]  char* szHost: [主链接地址]; 
     *  @param        -[in]  char* szBackup: [备连接地址]; 
     *  @param        -[in]  int nRemotePort: [目标端口号]; 
     *  @param        -[in]  int nLocalPort: [本地端口号]; 
     *  @return        int.  
     *  @version    10/28/2016    w   Initial Version 
     */
    virtual long SetConnectParam(char* szHost, char* szBackup, int nRemotePort, int nLocalPort = 0);
    /**
     *  首次连接. 
     *
     *  @return        int. 
     *  @version    10/28/2016    w   Initial Version 
     */
    virtual int Connect();
    
    /**
    *  断开连接. 
    *           
    *  @return        int. 
    *  @version        10/28/2016    w   Initial Version 
    */
    virtual int Reconnect();
    /**
    *  断开连接. 
    *
    *  @return        int.  
    *  @version        10/28/2016    w   Initial Version 
    */
    virtual int Disconnect();

    /**
     *  发送数据.
     *
     *  @param        -[in,out]  char* szSend: [数据]
     *  @param        -[in]  char* lSendSize: [大小]
     *  @param        -[in]  int nCommunicateTimeOut: [超时ms] 
     *  @return        int.  
     *  @version    10/28/2016    w   Initial Version 
     */
    virtual long Send(const char* szSend, long lSendSize, int nCommunicateTimeOut = COMMUNICATE_TIMEOUT);

    /**
     *  接收数据.
     *
     *  @param        -[in,out]  char* szReceive: [数据]
     *  @param        -[in]  long lReceiveSize: [大小]
     *  @param        -[in]  int nCommunicateTimeOut: [超时ms]  
     *  @version    10/28/2016    w   Initial Version 
     */
    virtual long Receive(char* szReceive, long lReceiveSize, int nCommunicateTimeOut = COMMUNICATE_TIMEOUT);
    //
    virtual bool IsConnected() { return m_nIsConnected; } 
    /**
    *  设置连接改变回调函数.  
    *
    *  @version    10/28/2016    w   Initial Version 
    */
    void SetOnConnectChangeCallBack(pfnConnectChangeCallBack func);
    /**
    *  设置数据接收回调函数.  
    *
    *  @version    10/28/2016    w   Initial Version 
    */
    void SetOnReceiveCallBack(pfnReceiveCallBack func);

public:      
    /**
     *  建立连接时被调用.
     *
     *  @param        -[in,out]  char* param: [参数] 
     *  @return        int.  
     *  @version    10/28/2016    w   Initial Version 
     */
    int open(void* param = 0);    
    
    /**
     *  当有输入时该函数被调用.
     *
     *  @param        -[in]  ACE_HANDLE: [参数] 
     *  @return        int.  
     *  @version    10/28/2016    w   Initial Version 
     */
    int handle_input(ACE_HANDLE handle = ACE_INVALID_HANDLE);     
    
    /**
     *  当有输出时该函数被调用.
     *
     *  @param        -[in]  ACE_HANDLE handle: [参数] 
     *  @return        int.  
     *  @version    10/28/2016    w   Initial Version 
     */
    virtual int handle_output(ACE_HANDLE handle = ACE_INVALID_HANDLE);    
    
    /**
     *  当SockHandler从ACE_Reactor中移除时该函数被调用.
     *
     *  @param        -[in]  ACE_HANDLE handle: [参数]
     *  @param        -[in]  ACE_HANDLE closeMask: [参数] 
     *  @return        int 
     *  @version    10/28/2016    w   Initial Version 
     */
    int handle_close(ACE_HANDLE handle, ACE_Reactor_Mask closeMask);    
    
    /**
     *  任务的主流程.
     *        1.激活事件
     *
     *  @return        int. 
     *  @version    10/10/2016    w   Initial Version
     */
    int    svc();


protected: 
    /**
     *  触发连接改变回调函数.
     *         
     *  @param        -[in]  bool nIsConnected: [是否已连接]
     *
     *  @version    10/28/2016    w   Initial Version 
     */
    void OnConnectChange(bool nIsConnected);

    /**
     *  触发数据接收回调函数.
     *         
     *  @param        -[in,out]  char* pszReceive: [接收的数据区]
     *  @param        -[in]  const int nReceiveSize: [数据大小] 
     *  @version    10/28/2016    w   Initial Version 
     */
    void OnReceive(char* pszReceive, const int nReceiveSize); 

protected:  
    // 是否已经连接
    bool    m_nIsConnected;                
    // 当前连接IP地址
    string    m_strConnectIPAddress;        
    // 主线连接IP地址
    string    m_strHostIPAddress;        
    // 备线连接IP地址
    string    m_strBackupIPAddress;        
    // 远程连接端口号
    unsigned short m_nRemotePort;        
    // 本地连接端口号
    unsigned short m_nLocalPort;        
     
    pfnConnectChangeCallBack m_pfnOnConnectChange;
    pfnReceiveCallBack m_pfnOnReceive; 
    
protected:
    // 最后一次连接时间
    time_t    m_tmLastConnect;            

private:
    /**
    *  关闭Socket. 
    *
    *  @return        int.  
    *  @version        10/28/2016    w   Initial Version 
    */
    int CloseSocket();                     

};
//typedef ACE_Connector<CTcpClient, ACE_SOCK_CONNECTOR>  CONNECTOR;

#endif // !_TCPCLIENT_

.cpp

/**************************************************************
 *  Filename:    TcpClient.cpp
 *  Copyright:   Shanghai X Co., Ltd.
 *
 *  Description: TcpClient源文件.
 *
 *  @author:     w
 *  @version     10/28/2016  @Reviser  Initial Version
 **************************************************************/
 
#include "TcpClient.h" 
#include <iostream>
#include <string>
 
#include <ace/ACE.h>
#include <ace/OS_NS_sys_socket.h>
#include <ace/OS_NS_strings.h>

using namespace std;

//ctor
CTcpClient::CTcpClient()
{ 
    m_lStop = true; 
    m_nReconnect = true;
    m_nIsConnected = false; 

    m_pfnOnConnectChange = NULL;
    m_pfnOnReceive = NULL;

    m_strConnectIPAddress = "";    
    m_strHostIPAddress = "";        
    m_strBackupIPAddress = "";    
    m_nRemotePort = 0;    
    m_nLocalPort = 0;         

    m_tmLastConnect = 0;     
}
//dctor
CTcpClient::~CTcpClient()
{
    m_lStop = true; 
    wait();
    //
    close();
    m_pfnOnConnectChange = NULL;
    m_pfnOnReceive = NULL;
}  


long CTcpClient::SetConnectParam(char* szHost, char* szBackup, int nRemotePort, int nLocalPort)
{ 
    m_strConnectIPAddress =    m_strHostIPAddress = szHost; 
    m_strBackupIPAddress = szBackup; 
    m_nRemotePort = nRemotePort; 
    m_nLocalPort = nLocalPort;
    return 0;
}

void CTcpClient::SetOnConnectChangeCallBack(pfnConnectChangeCallBack func)
 {
     this->m_pfnOnConnectChange = func; 
 }

void CTcpClient::SetOnReceiveCallBack(pfnReceiveCallBack func)
 {
     this->m_pfnOnReceive = func; 
 }

void CTcpClient::OnConnectChange(bool nIsConnected)
{  
    m_nIsConnected = nIsConnected; 
    if(!m_nIsConnected)
        Log(LOGLEVEL_ERROR, "Disconnect from(%s:%d).", m_strConnectIPAddress.c_str(), m_nRemotePort);
    else
        Log(LOGLEVEL_NOTICE, "Connected to(%s:%d).", m_strConnectIPAddress.c_str(), m_nRemotePort);    
    //
    if(m_pfnOnConnectChange)
        m_pfnOnConnectChange(m_nIsConnected);
}

void CTcpClient::OnReceive(char* pszReceive, const int nReceiveSize)
{        
    ACE_Message_Block *pFrame = new ACE_Message_Block(nReceiveSize);
    memcpy(pFrame->wr_ptr(), pszReceive, nReceiveSize);
    pFrame->wr_ptr(nReceiveSize); 
    this->putq(pFrame);
    /*if(m_pfnOnReceive)
        m_pfnOnReceive(pszReceive, nReceiveSize);*/
    delete[] pszReceive;
}

int CTcpClient::Disconnect()
{
    CloseSocket();
    OnConnectChange(false);
    return 0;
}

int CTcpClient::CloseSocket()
{
    //ACE_OS::shutdown(get_handle(), ACE_SHUTDOWN_BOTH);
    //int nRet = ACE_OS::closesocket(m_sockHandler.get_handle()); 
    this->peer().close();
    set_handle(ACE_INVALID_HANDLE);
    return 0;
}

int CTcpClient::Reconnect()
{    
    //已连接
    if (IsConnected())
        return 0; 
    //未设置重连机制
    if(!m_nReconnect)
    {
        Log(LOGLEVEL_INFO, "Reconnect is disabled.");
        return -1;
    }        
    //小于超时时间3s不能重连
    time_t tmNow;
    time(&tmNow); 
    if(abs(tmNow - m_tmLastConnect) <= CONNECTION_TIMEOUT)
        return -1;
    //清理Socket
    CloseSocket();
    return Connect();
}
 
int CTcpClient::Connect()
{ 
    //与服务器建立连接 
    CTcpClient *pSockHandler = this;
    //创建连接器
    ACE_Connector<CTcpClient, ACE_SOCK_CONNECTOR> connector;
    //设置默认连接超时
    ACE_Time_Value connTimeOut(CONNECTION_TIMEOUT);
    ACE_Synch_Options synch_option(ACE_Synch_Options::USE_TIMEOUT, connTimeOut);    
    //远程端点
    ACE_INET_Addr remoteEP(m_nRemotePort, m_strConnectIPAddress.c_str()); 
    Log(LOGLEVEL_INFO, "Connecting to(%s:%d) ...", remoteEP.get_host_addr(), remoteEP.get_port_number());
    //更新当前连接时间戳
    time(&m_tmLastConnect);
    int nRet = 0;
    if (m_nLocalPort > 0)
    {
        //绑定本地固定端口号
        ACE_INET_Addr localEP(m_nLocalPort);
        nRet = connector.connect(pSockHandler, remoteEP, synch_option, localEP);  
    }
    else
    {
        //绑定本地随机端口号
        nRet = connector.connect(pSockHandler, remoteEP, synch_option);
    }
    //连接失败
    if(nRet == -1)
    {
        //轮询切换连接主备服务器(存在)
        if(!m_strBackupIPAddress.empty() && m_strBackupIPAddress.compare(m_strHostIPAddress) != 0)
        {
            m_strConnectIPAddress = 
                m_strConnectIPAddress.compare(m_strHostIPAddress) == 0 ? m_strBackupIPAddress : m_strHostIPAddress;
        }  
        OnConnectChange(false);
        return -1;
    }
    //启动接收事件(OneTime)
    if(!nRet && m_lStop)
    {
        m_lStop = false; 
        this->activate(THR_NEW_LWP | THR_JOINABLE |THR_INHERIT_SCHED);
    } 
    OnConnectChange(true);
    return 0;
}

int CTcpClient::svc()
{
    //接收
    while(!m_lStop)
    { 
        ACE_Time_Value tvSleep;
        tvSleep.msec(TASK_NAP_TIME_VALUE);
        ACE_OS::sleep(tvSleep);    
        ACE_Time_Value tvWaite(0, TASK_NAP_TIME_VALUE); 
        //BLOCKED
        this->reactor()->handle_events(&tvWaite);
    }
    return 0;    
}

int CTcpClient::open(void* param)
{
    return this->reactor()->register_handler(this, ACE_Event_Handler::READ_MASK | ACE_Event_Handler::WRITE_MASK);
    //if (Base::open(param) == -1)
    //{
    //    Log(LOGLEVEL_ERROR, "open() Failied.");
    //    return -1;
    //}
    //return 0;
}

int CTcpClient::handle_input(ACE_HANDLE)
{    
    char *szBuffer = new char[DEFAULT_BUFFER_SIZE];
    //接收数据
    ssize_t length = this->peer().recv(szBuffer, DEFAULT_BUFFER_SIZE);
    //连接断开接收失败
    if(length <= 0) 
    {
        delete[] szBuffer; 
        return -1;//implicit call handle_close() clear up
    }
    OnReceive(szBuffer, length);
    return 0;
}

int CTcpClient::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask closeMask)
{    
    int nRet = ACE_Event_Handler::handle_close(handle, closeMask);
    Disconnect();
    return nRet;
}

int CTcpClient::handle_output(ACE_HANDLE handle /* = ACE_INVALID_HANDLE */)
{
    //调用一次
    return 0;
}

long CTcpClient::Receive(char* szReceive, long lReceiveSize, int nCommunicateTimeOut)
{ 
    //Confirmed
    //implicit call handle_close() clear up
    if(!IsConnected())
        return -1;
    
    ACE_Time_Value tvTimeout(0, nCommunicateTimeOut); 
    //return this->peer().recv((void *)szReceive, lReceiveSize, &tvTimeout);
    return this->peer().recv_n((void *)szReceive, lReceiveSize, &tvTimeout);
} 

long CTcpClient::Send(const char* szSend, long lSendSize, int nCommunicateTimeOut)
{
    //Uncertainty
    //implicit call handle_close() clear up
    if(!IsConnected())
        return -1;

    ACE_Time_Value tvTimeout(0, nCommunicateTimeOut);
    ssize_t length = this->peer().send_n(szSend, lSendSize, &tvTimeout);
    return length;
}
View Code
原文地址:https://www.cnblogs.com/wjshan0808/p/6612394.html