boost::asio::udp 异步

//封装一个ioserver, work守护
#pragma
once #include <vector> #include <thread> #include <string> #include <mutex> #include <boost/serialization/singleton.hpp> #include <boost/thread/concurrent_queues/sync_queue.hpp> #include <boost/asio.hpp> using boost::asio::ip::udp; using boost_ec = boost::system::error_code; using thread_ptr = std::shared_ptr<std::thread>; using work_ptr = std::unique_ptr<boost::asio::io_service::work>;#define ioService CIoService::get_mutable_instance() class CIoService : public boost::serialization::singleton<CIoService> { public: CIoService(): m_work(m_ios){} //多个线程调用io_server事件 void Run(uint16_t count) { std::call_once(m_of, [&]() { for (uint16_t i = 0; i < count; ++i) { std::shared_ptr<std::thread> t(new std::thread([&]() { boost::system::error_code ec; m_ios.run(ec); })); m_vtr_thread.emplace_back(t); } }); } void Stop() { m_ios.stop(); } boost::asio::io_service& GetIoService() { return m_ios; } protected: boost::asio::io_service m_ios; boost::asio::io_service::work m_work; std::vector<thread_ptr> m_vtr_thread; std::once_flag m_of; };
//封装通信数据包
struct
STUDPPacket { enum { MAX_DATA_LEN = 2048 }; char data[MAX_DATA_LEN]; size_t len{ 0 }; std::string src_addr; //源地址 std::string dst_addr; //目的地址,对于发送的数据包,在这里指定发送ip uint16_t src_port{ 0 }; //源端口 uint16_t dst_port{ 0 }; //目的端口,对于发送的数据包,在这里指定发送端口 STUDPPacket() { memset(data, 0, sizeof(data)); } };
//定义一个 CUDPServer类
class CUDPServer
{
    using sync_recv_queue = boost::concurrent::sync_queue<STUDPPacketPtr>;
    using sync_send_queue = boost::concurrent::sync_queue<STUDPPacketPtr>;

public:
    CUDPServer() {};
    ~CUDPServer() {};

    boost_ec Start(boost::asio::io_service& ios, uint16_t port);
protected:
    void DoRecv();

    void HandleRecvPack();
    void HandleSendPack();
protected:
    udp::endpoint m_sender_endpoint;        //当接收到数据时,填入数据发送端的地址
    std::shared_ptr<udp::socket> m_socket;    //socket绑定本地UDP端口
    sync_recv_queue m_recv_queue;            //UDP数据包接收队列
    sync_recv_queue m_send_queue;            //UDP数据包发送队列
    std::vector<thread_ptr> m_vtr_thread;    //数据发送线程,接收到的数据处理线程
};
//异步实现udp server

#include "UDPServer.h" //------------------------------------------------------------------------ // 函数名称: Start // 返 回 值: boost_ec // 参 数: boost::asio::io_service& ios // 参 数: uint16_t port -- 端口 // 参 数: UDPPackHandlerPtr handler -- 数据包处理者 // 说 明: UDP绑定端口,接收数据 //------------------------------------------------------------------------ boost_ec CUDPServer::Start(boost::asio::io_service& ios, uint16_t port) { if (m_socket) return boost_ec(); m_socket.reset(new udp::socket(ios, udp::endpoint(udp::v4(), port)));
//异步接收数据 DoRecv();
//只用一个线程负责UDP数据的发送 thread_ptr thread_send(new std::thread(&CUDPServer::HandleSendPack, this)); m_vtr_thread.emplace_back(thread_send); //多个线程同时处理接收到的数据 for (int i = 0; i < 2; ++i) { thread_ptr thread_recv(new std::thread(&CUDPServer::HandleRecvPack, this)); m_vtr_thread.emplace_back(std::move(thread_recv)); } return boost_ec(); } //------------------------------------------------------------------------ // 函数名称: DoRecv // 返 回 值: void // 说 明: 异步读取数据 //------------------------------------------------------------------------ void CUDPServer::DoRecv() { STUDPPacketPtr pack(new STUDPPacket); m_socket->async_receive_from( boost::asio::buffer(pack->data, STUDPPacket::MAX_DATA_LEN), m_sender_endpoint, [this, pack](boost::system::error_code ec, std::size_t bytes_recvd) { std::string relay; if (!ec && bytes_recvd > 0) { pack->len = bytes_recvd; pack->src_addr = m_sender_endpoint.address().to_string(ec); pack->src_port = m_sender_endpoint.port(); m_recv_queue.push(pack); } DoRecv(); }); } //------------------------------------------------------------------------ // 函数名称: HandleRecvPack // 返 回 值: void // 说 明: 处理接收到的UDP数据包 //------------------------------------------------------------------------ void CUDPServer::HandleRecvPack() { while (true) { STUDPPacketPtr pack; boost::queue_op_status st = m_recv_queue.wait_pull(pack); if (st == boost::queue_op_status::closed) { exit(-1); } } } //------------------------------------------------------------------------ // 函数名称: HandleSendPack // 返 回 值: void // 说 明: 发送UDP数据包 //------------------------------------------------------------------------ void CUDPServer::HandleSendPack() { while (true) { STUDPPacketPtr pack; boost::queue_op_status st = m_send_queue.wait_pull(pack); if (st == boost::queue_op_status::closed) { exit(-1); } boost_ec ec; boost::asio::ip::address addr = boost::asio::ip::address::from_string(pack->dst_addr, ec); if (ec) { continue; } m_socket->send_to(boost::asio::buffer(pack->data, pack->len), udp::endpoint(addr, pack->dst_port), 0, ec); if (ec) { printf("send data error, ip: %s, port: %d, msg:%s ", pack->dst_addr.c_str(), pack->dst_port, ec.message().c_str()); } } }
//调用

//使用一个线程来调度io_server

ioService.Run(1);
//定义UDPServer对象, 在对象内部实现数据收发 CUDPServer udpServerMgr;
udpServerMgr.Start(ioService.GetIoService,
7300);
原文地址:https://www.cnblogs.com/osbreak/p/14564094.html