Rabbitmq解决连接时阻塞的问题(amqp_open_socket)

在使用接口Channel::Create()连接到rabbitmq时,如果网络中断或者ip端口地址不对的时候,程序就会一直阻塞在这个调用上,没有 返回值没有异常提示,这种情况如果你想提示个错误什么的就无能为力了,Panda工作中也遇到这个问题,我想:如果他能提供一个连接超时异常就好了,毕竟 SimpleAmqpClient只是对另外一个c语言开源项目rabbitmq-c的封装,而且我记得rabbitmq-c是支持我所说的功能的。下面 请跟随我一起一步一步完成这个事情吧。

 1
1 int m_nSockfd;
2 int m_nChannelIdSend;
3 int m_nChannelIdReve;
4 int m_nChannelIdResult;
5 amqp_connection_state_t m_Connection;
6 amqp_bytes_t m_stReply_to_queue;
View Code
       m_Connection = amqp_new_connection();
 2     m_nSockfd = amqp_open_socket(m_strIp.toLocal8Bit().data(), m_nPort);
 3     amqp_set_sockfd(m_Connection, m_nSockfd);
 4     amqp_login(m_Connection, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,m_strRabbitUser.toLocal8Bit().data(), m_strRabbitPwd.toLocal8Bit().data());
 5 
 6     //生产者
 7     amqp_channel_open(m_Connection, m_nChannelIdSend);
 8     amqp_get_rpc_reply(m_Connection);
 9     amqp_exchange_declare(m_Connection, m_nChannelIdSend, amqp_cstring_bytes("ping") , Type,
10                           0,1,0,0, amqp_empty_table);//绑定交换器 amqp_cstring_bytes("ping")
11 
12     m_strExchange = "ping";
13     m_strRoutingkey = "rpc";
14     m_pProducer1 = new CMqProducerThread(m_Connection, m_nChannelIdSend, m_strExchange, m_strRoutingkey, this);
15     connect(m_pProducer1, SIGNAL(SendProcess(int, QString)), this, SLOT(SetProcess(int, QString)));
16     m_pProducer1->start();
17 
18     //测试结果上传
19     amqp_channel_open(m_Connection, m_nChannelIdResult);
20     amqp_get_rpc_reply(m_Connection);
21     amqp_exchange_declare(m_Connection, m_nChannelIdResult, amqp_cstring_bytes("testResult") , Type,
22                           0,1,0,0, amqp_empty_table);
23     m_strExchange = "testResult";
24     m_strRoutingkey = "result";
25     m_pResoultThread = new MQResultThread(m_Connection, m_nChannelIdResult, m_strExchange, m_strRoutingkey, this);

先来看一下Channel::Channel(…) 

然后在rabbitmq-c项目头文件amqp.h中找到创建非阻塞socket的函数

代码实现
有方向了,终于可以快乐的写代码o(∩_∩)o 。根据设计模式的开闭原则:我们做的事情更好的是扩展而不是修改现有的功能,所以比较优雅的方案应该是增加一个工厂函数生成创建一个channel,做法如下: 
在Channel.h增加两个函数

    /**
     * 以非阻塞的方法创建Channel
     * author: panxianzhan
     * @param timeout 最大等待事件,为NULL时采用阻塞方式打开
     */
    explicit Channel(const std::string &host,
        int port,
        const std::string &username,
        const std::string &password,
        const std::string &vhost,
        int frame_max,
        timeval*
        );

    /**
     * 工厂方法
     * 以非阻塞的方法创建Channel
     * author: panxianzhan
     * @param timeout 最大等待事件,为NULL时采用阻塞方式打开
     */
    static ptr_t CreateNoBlock(const std::string &host = "127.0.0.1",
        int port = 5672,
        const std::string &username = "guest",
        const std::string &password = "guest",
        const std::string &vhost = "/",
        int frame_max = 131072,
        timeval* timeout = NULL)
    {
        return boost::make_shared<Channel>(host, port, username, password, vhost, frame_max, timeout);
    }

然后在Channel.cpp实现

Channel::Channel(const std::string &host,
                 int port,
                 const std::string &username,
                 const std::string &password,
                 const std::string &vhost,
                 int frame_max,
                 timeval* timeout) :
    m_impl(new Detail::ChannelImpl)
{
    m_impl->m_connection = amqp_new_connection();

    if (NULL == m_impl->m_connection)
    {
        throw std::bad_alloc();
    }

    try
    {
        amqp_socket_t *socket = amqp_tcp_socket_new(m_impl->m_connection);
        int sock = amqp_socket_open_noblock(socket, host.c_str(), port, timeout);
        }

        //如果连接超时,下面这一行就会抛出异常
        m_impl->CheckForError(sock);

        m_impl->DoLogin(username, password, vhost, frame_max);
    }
    catch (...)
    {
        amqp_destroy_connection(m_impl->m_connection);
        throw;
    }

    m_impl->SetIsConnected(true);
}

使用例子如下:

int main()
{
    timeval tv = {0};
    tv.tv_usec = 200 * 1000; //等待200毫秒
    try 
    {
        Channel::ptr_t channel = Channel::CreateNoBlock(
        "127.0.0.1", 5567"guest""guest""/", 131072, &tv);
        ...
        ...
    } catch (AmqpLibraryException& ex)
    {
        //提示连接失败;
    }
    return 0;
}
原文地址:https://www.cnblogs.com/Lijcyy/p/9045895.html