boost asio tcp 多线程异步读写,服务器与客户端。

  1 // server.cpp
  2 
  3 #if 0
  4 多个线程对同一个io_service 对象处理
  5 用到第三方库:log4cplus, google::protobuf
  6 用到C++11的特性,Windows 需要用到vs2013 gcc 4.8
  7 #endif
  8 
  9 #include <iostream>
 10 #include <thread>
 11 #include <vector>
 12 
 13 #include <boost/asio.hpp>
 14 
 15 #include <boost/shared_array.hpp>
 16 #include <boost/make_shared.hpp>
 17 #include <boost/function.hpp>
 18 #include <boost/bind.hpp>
 19 
 20 #include <common.pb.h>
 21 
 22 void async_accept();
 23 void handle_accept(boost::shared_ptr<boost::asio::ip::tcp::socket> new_conn,
 24     const boost::system::error_code &ec);
 25 void async_read_head(boost::shared_ptr<boost::asio::ip::tcp::socket> conn);
 26 void handle_head(
 27     boost::shared_ptr<boost::asio::ip::tcp::socket> conn,
 28     boost::shared_array<char> sa_len,
 29     const boost::system::error_code &ec,
 30     std::size_t bytes_transfered);
 31 void async_read_proto(boost::shared_ptr<boost::asio::ip::tcp::socket> conn, int32_t len);
 32 void handle_proto(
 33     boost::shared_ptr<boost::asio::ip::tcp::socket> conn,
 34     boost::shared_array<char> sa_data,
 35     const boost::system::error_code &ec,
 36     std::size_t bytes_transfered);
 37 
 38 boost::asio::io_service io_svc;
 39 boost::asio::ip::address_v4 lis_ip; // 默认监听本机所有IP
 40 boost::asio::ip::tcp::endpoint lis_ep(lis_ip, 20017);
 41 boost::asio::ip::tcp::acceptor acceptor(io_svc, lis_ep);
 42 
 43 #include <Log.h>    // log4cplus 的相关头文件,这里不再一个一个敲出来了。
 44 
 45 log4cplus::Logger *gLog = nullptr;
 46 
 47 static const int PACKAGE_LENGTH = 16;
 48 
 49 int main(int argc, char *argv[])
 50 {
 51     log4cplus::initialize();
 52 
 53     static log4cplus::Logger s_log = log4cplus::Logger::getInstance("server");
 54     gLog = &s_log;
 55 
 56     LOG4CPLUS_INFO_FMT(*gLog, "main begin...");
 57 
 58     for (int i = 0; i < 5; ++i)
 59     {
 60         async_accept();
 61     }
 62 
 63     // 捕获信号
 64     boost::asio::signal_set signals_(io_svc);
 65     signals_.add(SIGINT);
 66     signals_.add(SIGTERM);
 67     signals_.async_wait([](const boost::system::error_code &ec, int sig)
 68     {
 69         LOG4CPLUS_INFO_FMT(*gLog, "signal: %d, error_message: %s", 
 70             sig, ec.message().c_str());
 71         io_svc.stop();
 72     });
 73 
 74     std::vector<std::thread> vecThread;
 75     for (int i = 0; i < 10; ++i)
 76     {
 77         vecThread.emplace_back(std::thread([](){
 78             LOG4CPLUS_INFO_FMT(*gLog, "thread start...");
 79             io_svc.run();
 80             LOG4CPLUS_INFO_FMT(*gLog, "thread finished.");
 81         }));
 82     }
 83 
 84     for (size_t i = 0; i < vecThread.size(); ++i)
 85     {
 86         vecThread[i].join();
 87     }
 88     assert(io_svc.stopped();
 89 
 90 #ifdef WIN32
 91     system("pause");
 92 #endif
 93 
 94     return 0;
 95 }
 96 
 97 // 标记异步监听,投放到指定io_service 对象中
 98 void async_accept()
 99 {
100     LOG4CPLUS_INFO_FMT(*gLog, "async_accept waitting...");
101 
102     boost::shared_ptr<boost::asio::ip::tcp::socket> new_sock 
103         = boost::make_shared<boost::asio::ip::tcp::socket>(boost::ref(ios_svc));
104 
105     boost::function<void(const boost::system::error_code &> cb_accept;
106     cb_accept = boost::bind(handle_accept, new_sock, _1);
107     acceptor.async_accept(*new_sock, cb_accept);
108 }
109 
110 // 监听返回的处理
111 void handle_accept(boost::shared_ptr<boost::asio::ip::tcp::socket> new_conn,
112     const boost::system::error_code &ec)
113 {
114     if (ec != 0)
115     {
116         LOG4CPLUS_INFO(*gLog, "accept failed: " << ec.message());
117 
118         return;
119     }
120     LOG4CPLUS_INFO(*gLog, "a new client connected. " << new_conn->remote_endpoint());
121 
122     async_read_head(new_conn);
123 
124     // 处理下一个连接,每次处理完了之后,需要再次accept.
125     // 否则io_service 将只处理一次,然后结束监听。
126     // 所以这里可以处理一个情况,就是当你要结束监听的时候,人要在这里return
127     // 那么io_service 的run() 函数就会stop. 但如果还有其他的异步操作被记录, 
128     // run() 函数还是会继续运行,以处理其他的异步操作。
129     async_accept();
130 }
131 
132 // 对一个指定的连接标记异步读头部,然后投放到io_service 对象
133 void async_read_head(boost::shared_ptr<boost::asio::ip::tcp::socket> conn)
134 {
135     // 固定报文头长度为${PACKAGE_LENGTH} 个字节
136     boost::shared_array<char> sa_len(new char[PACKAGE_LENGTH]);
137 
138     // 回调函数
139     boost::function<void(const boost::system::error_code &, std::size_t)> cb_msg_len;
140     cb_msg_len = boost::bind(handle_head, conn, sa_len, _1, _2);
141 
142     // 异步读,读一个报文的长度,boost::asio::async_read() 函数有个特点,
143     // 它会将这里指定的buffer 缓冲区读满了才会回调handle_head 函数。
144     boost::asio::async_read(
145         *conn, boost::asio::buffer(sa_len.get(), PACKAGE_LENGTH), cb_msg_len);
146 }
147 
148 // 头部数据完整读取后的处理函数
149 void handle_head(
150     boost::shared_ptr<boost::asio::ip::tcp::socket> conn, 
151     boost::shared_array<char> sa_len, 
152     const boost::system::error_code &ec, 
153     std::size_t bytes_transfered)
154 {
155     if (!conn->is_open())
156     {
157         LOG4CPLUS_INFO(*gLog, "socket was not opened.");
158         return ;
159     }
160 
161     if (ec != 0)
162     {
163         if (ec == boost::asio::error::eof)
164         {
165             LOG4CPLUS_INFO(*gLog, "Disconnect from " << conn->remote_endpoint());
166         }
167         else
168         {
169             LOG4CPLUS_INFO(*gLog, "Error on receive: " << ec.message());
170         }
171 
172         return ;
173     }
174 
175     // 这里对的数据做处理
176     assert(bytes_transfered == PACKAGE_LENGTH);
177     int32_t len_net = 0;    // 网络字节序:数据部分长度
178     int32_t len_loc = 0;    // 本地字节序:数据部分长度
179     memcpy(&len_net, sa_len.get(), sizeof(len_net));
180     len_loc = boost::asio::detail::socket_ops::network_to_host_long(len_net);
181     LOG4CPLUS_INFO_FMT(*gLog, "nLenLoc: %d", len_loc);
182 
183     async_read_proto(conn, len_loc);
184 }
185 
186 // 对一个指定的连接标记异步读数据部,然后投放到io_service 对象
187 void async_read_proto(boost::shared_ptr<boost::asio::ip::tcp::socket> conn, int32_t len)
188 {
189     // 数据部分
190     boost::shared_array<char> sa_data(new char[len]());
191 
192     // 回调函数
193     boost::function<void(const boost::system::error_code &, std::size_t)> cb_proto;
194     cb_proto = boost::bind(handle_proto, conn, sa_data, _1, _2);
195 
196     boost::asio::async_read(*conn, 
197         boost::asio::buffer(sa_data.get(), len), cb_proto);
198 }
199 
200 // 数据部分读完整后的处理函数
201 void handle_proto(
202     boost::shared_ptr<boost::asio::ip::tcp::socket> conn, 
203     boost::shared_array<char> sa_data, 
204     const boost::system::error_code &ec, 
205     std::size_t bytes_transfered)
206 {
207     if (!conn->is_open())
208     {
209         LOG4CPLUS_INFO(*gLog, "socket was not opened.");
210         return ;
211     }
212 
213     if (ec != 0)
214     {
215         if (ec == boost::asio::error::eof)
216         {
217             LOG4CPLUS_INFO(*gLog, "Disconnect from " << conn->remote_endpoint());
218         }
219         else
220         {
221             LOG4CPLUS_INFO(*gLog, "Error on receive: " << ec.message());
222         }
223         return ;
224     }
225 
226     // 处理这个proto 数据
227     // 这里将这个数组转换成一个proto, 然后处理这个proto
228     MessageHead pro;
229     if (!pro.ParseFromArray(sa_data.get(), (int32_t)bytes_transfered))
230     {
231         LOG4CPLUS_ERROR_FMT(*gLog, "ParseFromArray() failed");
232         return ;
233     }
234 
235     int port = conn->remote_endpoint().port();
236     LOG4CPLUS_INFO_FMT(*gLog, "port: %d
%s", port, pro.DebugString().c_str()0;
237 
238     // 处理完了之后,类似accept 的异步调用一样,需要继续调用异步的读数据
239     // 同样的,如果要结束一个连接,正常的结算应该在这里return 调用。
240     // 当然了,使用socket 的close(), shut_down() 函数也可以关闭这个连接。
241     async_read_head(conn);
242 }
View Code
  1 // client.cpp
  2 
  3 #include <iostream>
  4 #include <thread>
  5 #include <vector>
  6 
  7 #include <boost/asio.hpp>
  8 
  9 #include <boost/shared_array.hpp>
 10 #include <boost/make_shared.hpp>
 11 #include <boost/function.hpp>
 12 #include <boost/bind.hpp>
 13 
 14 #include <boost/pool/pool.hpp>
 15 #include <boost/pool/singleton_pool.hpp>
 16 
 17 // proto buffer 生成的头文件
 18 #include <common.pb.h>
 19 
 20 void async_connect();
 21 void handle_connect(boost::shared_ptr<boost::asio::ip::tcp::socket> new_conn,
 22     const boost::system::error_code &ec);
 23 void async_write(boost::shared_ptr<boost::asio::ip::tcp::socket> conn);
 24 void handle_write_head(boost::shared_ptr<boost::asio::ip::tcp::socket> conn,
 25     const std::shared_ptr<std::string> sp_data_proto,
 26     const boost::system::error_code &ec,
 27     std::size_t bytes_transfered);
 28 void handle_write_proto(boost::shared_ptr<boost::asio::ip::tcp::socket> conn,
 29     const std::shared_ptr<std::string> sp_data_proto, 
 30     const boost::system::error_code &ec, std::size_t bytes_transfered);
 31 
 32 
 33 
 34 boost::asio::io_service             io_svc;
 35 boost::asio::ip::tcp::endpoint      svr_ep(
 36     boost::asio::ip::address_v4::from_string("127.0.0.1"), 20017);
 37 
 38 #include <Log.h>    // log4cplus 的相关头文件,这里不再一个一个敲出来了。
 39 
 40 log4cplus::Logger *gLog = nullptr;  // 应该是直接使用对象,但是懒得改就保留了。
 41 
 42 // 包头固定长度
 43 static const int PACKAGE_LENGTH = 16;
 44 
 45 using pool_head   = boost::singleton_pool<struct struHead, PACKAGE_LENGTH>;
 46 using pool_string = boost::singleton_pool<struct struString, sizeof(std::string)>;
 47 
 48 std::shared_ptr<std::string> createSharedString()
 49 {
 50     std::shared_ptr<std::string> spTp(new (pool_string::malloc()) std::string, 
 51         [](std::string *tp)
 52     {
 53         tp->~basic_string();
 54         pool_string::free(tp);
 55     });
 56 
 57     return spTp;
 58 }
 59 
 60 int main(int argc, char *argv[])
 61 {
 62     log4cplus::initialize();
 63 
 64     static log4cplus::Logger s_log = log4cplus::Logger::getInstance("client");
 65     gLog = &s_log;
 66     assert(gLog != nullptr);
 67 
 68     LOG4CPLUS_INFO_FMT(*gLog, "main begin...");
 69 
 70     for (int i = 0; i < 50; ++i)
 71     {
 72         async_connect();
 73     }
 74 
 75     std::vector<std::thread> vecThread;
 76     for (int i = 0; i < 5; ++i)
 77     {
 78         vecThread.emplace_back(std::thread([]() {
 79             LOG4CPLUS_INFO_FMT(*gLog, "thread start...");
 80             io_svc.run();
 81             LOG4CPLUS_INFO_FMT(*gLog, "thread finished.");
 82         }));
 83     }
 84 
 85     for (size_t i = 0; i < vecThread.size(); ++i)
 86     {
 87         vecThread[i].join();
 88     }
 89     assert(io_svc.stopped());
 90 
 91 #ifdef WIN32
 92     system("pause");
 93 #endif
 94 
 95 
 96     return 0;
 97 }
 98 
 99 void async_connect()
100 {
101     LOG4CPLUS_INFO_FMT(*gLog, "async_connect waitting...");
102 
103     boost::shared_ptr<boost::asio::ip::tcp::socket> new_sock
104         = boost::make_shared<boost::asio::ip::tcp::socket>(boost::ref(io_svc));
105 
106     new_sock->async_connect(svr_ep, boost::bind(
107         handle_connect, new_sock, 
108         boost::asio::placeholders::error));
109 }
110 
111 void handle_connect(boost::shared_ptr<boost::asio::ip::tcp::socket> new_conn, 
112     const boost::system::error_code &ec)
113 {
114     if (ec != 0)
115     {
116         LOG4CPLUS_INFO(*gLog, "connect failed: " << ec.message());
117         return ;
118     }
119 
120     LOG4CPLUS_INFO(*gLog, "connect success, server: " << new_conn->remote_endpoint());
121 
122     async_write(new_conn);
123 }
124 
125 #if 0
126 message messageHead
127 {
128     optional uint32 FunCode     = 1;
129     optional uint32 RequestID   = 2;
130     optional uint32 AccountId   = 3;
131     optional uint32 AccessId    = 4;
132     optional int64  ClientTime  = 5;
133     optional uint32 GoodsId     = 6;
134     optional bytes  UUID        = 7;
135 }
136 #endif
137 
138 void async_write(boost::shared_ptr<boost::asio::ip::tcp::socket> conn)
139 {
140     MessageHead pro;
141     pro.set_funcode(9527);
142     pro.set_requestid(10081);
143     pro.set_accountid(49005);
144     pro.set_clienttime(time(NULL));
145     pro.set_goodsid(35023);
146     pro.set_uuid(std::string("uuid_500384"));
147 
148     std::shared_ptr<std::string> sp_data = createSharedString();
149     if (!pro.SerializeToString(sp_data.get())
150     {
151         LOG4CPLUS_ERROR_FMT(*gLOg, "SerializeToString failed.");
152 
153         return ;
154     }
155 
156     LOG4CPLUS_INFO_FMT(*gLog, "data.size() = %lld", sp_data->size());
157 
158     char    ch_head[PACKAGE_LENGTH] = {};
159     int32_t len_net = boost::asio::detail::socket_ops::host_to_network_long((int32_t)sp_data->size());
160     memcpy(ch_head, &len_net, sizeof(len_net));
161 
162     if (sp_data->size() == 0)
163     {
164         return ;
165     }
166 
167     boost::function<void(const boost::system::error_code&, std::size_t)> cb_write_head;
168     cb_write_head = boost::bind(handle_write_head, conn, sp_data, _1, _2);
169     boost::asio::async_write(*conn, boost::asio::buffer(ch_head, PACKAGE_LENGTH), cb_write_head);
170 }
171 
172 void handle_write_head(boost::shared_ptr<boost::asio::ip::tcp::socket> conn, 
173     const std::shared_ptr<std::string> sp_data_proto,
174     const boost::system::error_code &ec, 
175     std::size_t bytes_transfered)
176 {
177     if (!conn->is_open())
178     {
179         LOG4CPLUS_INFO(*gLog, "socket was not opened.");
180         return;
181     }
182 
183     if (ec != 0)
184     {
185         if (ec == boost::asio::error::eof)
186         {
187             LOG4CPLUS_INFO(*gLog, "Disconnect from " << conn->remote_endpoint());
188         }
189         else
190         {
191             LOG4CPLUS_INFO(*gLog, "Error on receive: " << ec.message());
192         }
193 
194         return ;
195     }
196 
197     boost::function<void(const boost::system::error_code&, std::size_t)> cb_write_proto;
198     cb_write_proto = boost::bind(handle_write_proto, conn, sp_data_proto, _1, _2);
199     boost::asio::async_write(*conn, boost::asio::buffer(*sp_data_proto), cb_write_proto);
200 }
201 
202 // 这里的sp_data_proto 在该函数中并不需要使用,用它作参数的唯一作用,就是保留它的生命周期,
203 // 保证在数据写完之前它不会被析构。
204 // 因为,如果该对象在async_write 还未写完之前就被析构的话, 就会造成数据的错乱,最终导致对端的数据是错误的。
205 void handle_write_proto(boost::shared_ptr<boost::asio::ip::tcp::socket> conn, 
206     const std::shared_ptr<std::string> sp_data_proto, 
207     const boost::system::error_code &ec, std::size_t bytes_transfered)
208 {
209     if (!conn->is_open())
210     {
211         LOG4CPLUS_INFO(*gLog, "socket was not opened.");
212         return ;
213     }
214 
215     if (ec != 0)
216     {
217         if (ec == boost::asio::error::eof)
218         {
219             LOG4CPLUS_INFO(*gLog, "Disconnect from " << conn->remote_endpoint());
220         }
221         else
222         {
223             LOG4CPLUS_INFO(*gLog, "Error on receive: " << ec.message());
224         }
225 
226         return ;
227     }
228 
229     LOG4CPLUS_INFO(*gLog, "write proto finished.");
230     // 数据写完了之后,可以读对端发送过来的数据。
231     // 如果不再读对端的数据,直接该socket 将会被断开。
232     //async_read_head(conn);
233 }
View Code
原文地址:https://www.cnblogs.com/suyunhong/p/7120882.html