Thrift辅助类,用于简化Thrift编程

CThriftServerHelper用于服务端,CThriftClientHelper用于客户端。

IDL定义:
service PackageManagerService
{
}

服务端使用示例:
CThriftServerHelper<CPackageManagerHandler, PackageManagerServiceProcessor> _thrift_server_helper;
return _thrift_server_helper.serve(FLAGS_package_port, rpc_threads);

客户端使用示例:
CThriftClientHelper<PackageManagerServiceClient> thrift_client_helper(FLAGS_package_ip, FLAGS_package_port);

thrift_client_helper.connect(); // 注意需要处理异常TTransportException/TApplicationException/TException

#include <arpa/inet.h>
#include <thrift/concurrency/PosixThreadFactory.h>
#include <thrift/concurrency/ThreadManager.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TNonblockingServer.h>
#include <thrift/transport/TSocketPool.h>
#include <thrift/transport/TTransportException.h>
#include <thrift/async/TAsyncChannel.h>

using namespace apache;

// 用来判断thrift是否已经连接,包括两种情况:
// 1.从未连接过,也就是还未打开过连接
// 2.连接被对端关闭了
inline bool thrift_not_connected(
        thrift::transport::TTransportException::TTransportExceptionType type)
{
    return (thrift::transport::TTransportException::NOT_OPEN == type)
        || (thrift::transport::TTransportException::END_OF_FILE == type);
}

// 封装对thrift服务端的公共操作
template <class ThriftHandler, class ServiceProcessor>
class CThriftServerHelper
{
public:
    // 启动rpc服务,请注意该调用是同步阻塞的,所以需放最后调用
    bool serve(uint16_t port);
    bool serve(uint16_t port, uint8_t num_threads);
    bool serve(const std::string& ip, uint16_t port, uint8_t num_threads);
    void stop();

private:
    boost::shared_ptr<ThriftHandler> _handler;
    boost::shared_ptr<thrift::TProcessor> _processor;
    boost::shared_ptr<thrift::protocol::TProtocolFactory> _protocol_factory;
    boost::shared_ptr<thrift::server::ThreadManager> _thread_manager;
    boost::shared_ptr<thrift::concurrency::PosixThreadFactory> _thread_factory;
    boost::shared_ptr<thrift::server::TServer> _server;
};

// 封装对thrift客户端的公共操作
template <class ThriftClient>
class CThriftClientHelper
{
public:
    CThriftClientHelper(const std::string& host, uint16_t port
                      , int timeout = RPC_TIMEOUT);
    ~CThriftClientHelper();
    void connect();
    void close();

    ThriftClient* operator ->() const
    {
        return _container_client.get();
    }

    ThriftClient* get()
    {
        return _container_client.get();
    }

private:
    std::string _host;
    uint16_t _port;
    int _timeout;
    boost::shared_ptr<thrift::transport::TSocketPool> _sock_pool;
    boost::shared_ptr<thrift::transport::TTransport> _socket;
    boost::shared_ptr<thrift::transport::TFramedTransport> _transport;
    boost::shared_ptr<thrift::protocol::TProtocol> _protocol;
    boost::shared_ptr<ThriftClient> _container_client;
};

///////////////////////////////////////////////////////////////////////////////

template <class ThriftHandler, class ServiceProcessor>
bool CThriftServerHelper<ThriftHandler, ServiceProcessor>::serve(uint16_t port)
{
    return serve("0.0.0.0", port, 1);
}

template <class ThriftHandler, class ServiceProcessor>
bool CThriftServerHelper<ThriftHandler, ServiceProcessor>::serve(uint16_t port, uint8_t num_threads)
{
    return serve("0.0.0.0", port, num_threads);
}

template <class ThriftHandler, class ServiceProcessor>
bool CThriftServerHelper<ThriftHandler, ServiceProcessor>::serve(const std::string& ip, uint16_t port, uint8_t num_threads)
{
    try
    {
        _handler.reset(new ThriftHandler);
        _processor.reset(new ServiceProcessor(_handler));
        _protocol_factory.reset(new thrift::protocol::TBinaryProtocolFactory());
        _thread_manager = thrift::server::ThreadManager::newSimpleThreadManager(num_threads);
        _thread_factory.reset(new thrift::concurrency::PosixThreadFactory());

        _thread_manager->threadFactory(_thread_factory);
        _thread_manager->start();

        _server.reset(new thrift::server::TNonblockingServer(
                _processor, _protocol_factory, port, _thread_manager));

        _server->serve();
    }
    catch (thrift::TException& tx)
    {
        LOG(ERROR) << "start thrift error: " << tx.what();
        return false;
    }

    LOG(INFO) << "container-thrift start";
    return true;
}

template <class ThriftHandler, class ServiceProcessor>
void CThriftServerHelper<ThriftHandler, ServiceProcessor>::stop()
{
    _server->stop();
}

///////////////////////////////////////////////////////////////////////////////

template <class ThriftClient>
CThriftClientHelper<ThriftClient>::CThriftClientHelper(
        const std::string& host, uint16_t port, int timeout)
        : _host(host)
        , _port(port)
        , _timeout(timeout)
{
    _sock_pool.reset(new thrift::transport::TSocketPool());
    _sock_pool->addServer(host, port);
    _sock_pool->setConnTimeout(timeout);
    _sock_pool->setRecvTimeout(timeout);
    _sock_pool->setSendTimeout(timeout);
    
    _socket = _sock_pool;
    _transport.reset(new thrift::transport::TFramedTransport(_socket));
    _protocol.reset(new thrift::protocol::TBinaryProtocol(_transport));

    _container_client.reset(new ThriftClient(_protocol));
}

template <class ThriftClient>
CThriftClientHelper<ThriftClient>::~CThriftClientHelper()
{
    close();
}

template <class ThriftClient>
void CThriftClientHelper<ThriftClient>::connect()
{
    if (!_transport->isOpen())
    {
        _transport->open();
    }
}

template <class ThriftClient>
void CThriftClientHelper<ThriftClient>::close()
{
    if (_transport->isOpen())
    {
        _transport->close();
    }
}


原文地址:https://www.cnblogs.com/aquester/p/9891605.html