C++20协程实例:协程化的IOCP服务端/客户端

VC支持协程已经有一段时间了,之前一直想不明白协程的意义在哪里,前几天拉屎的时候突然灵光一闪:

以下是伪代码:

task server() {
    for (;;) {
        sock_context s = co_await io.accept();
        for (;;) {
            auto buf = co_await io.recv(s);
            if (!buf.length())
                break;

            std::cout << buf.data() << std::endl;
            int n = co_await io.send(s, "收到!", strlen("收到!") + 1);
        }
        co_await io.close(s);
    }
}

如果把IO库对外的接口做成上面这样,那岂不是看起来和最简单的阻塞模型相同的代码结构,但它的内在其实是异步的,用单线程相同的代码就能支撑一堆连接通信。

所以才有了接下来的研究(闲出屁才研究的),好在研究出成品了。

最终我也明白协程的意义了:

  协程化的库越多,C++程序员的门槛会越低,做上层开发的程序员可以不用知道协程的细节,只要知道如何正确使用库即可。

好了,真正介绍协程细节的文章有一大堆,不用我来写,我直接放代码,有兴趣的可以参考我的实现以及那些细节文章自己做:

2021/12/23:我最近使用了一个边缘应用试毒了这个库,一系列修修补补过后,还是很好用的。

2021/12/23:备注:最好不要用lambda函数作为协程函数,它可能会异常,也可能不会,这属于编译器bug带来的玄学。

#pragma once
#include <WinSock2.h>
#include <MSWSock.h>
#include <ws2tcpip.h>
#pragma comment(lib, "ws2_32.lib")
#include <coroutine>
#include <string>
#include <functional>
#include <thread>
#include "logger.hpp"
#include <random>


/**
* 最近花了点时间学习了一下C++20协程,初步改造实现了IOCP协程化的网络IO库
* 此前基于回调分发的机制,由于上层协议解析所需的各种上下文,导致这个库是模板化的,
* 现在有了协程,上层协议上下文已经可以在协程函数中实现,消除了模板化,也变得易于维护了一丢丢。
* 但目前协程还有多少坑是未知的,是好是坏还得再看。
* 使用协程,就意味着,这个库几乎完全失去了多线程的能力,
* 要维护好一个内部是多线程,外皮是协程的IO库,我承认我没那个脑子。
* 我个人当前的状态是不考虑过度设计,只追求上层代码优雅简洁,10几万并发对我而言已经满足了。
* 如果这还不够用,那就意味着该放弃协程了,协程不是完全没有损耗的,根据我的测试,协程相比回调函数分发的方式,有15%左右的性能损耗。
*/
#pragma warning(push)
#pragma warning(disable:4996)
namespace aqx{

    static int init_winsock() {
        WSADATA wd;
        return WSAStartup(MAKEWORD(2, 2), &wd);
    }

    static aqx::log nlog;

#ifndef _nf
#define _nf ((size_t)-1)
#endif
#ifndef __AQX_TIME_HPP
#define __AQX_NOW_FUNC
    using clock64_t = long long;
    template<typename period = std::milli>
    clock64_t now() {
        const clock64_t _Freq = _Query_perf_frequency();
        const clock64_t _Ctr = _Query_perf_counter();
        const clock64_t _Whole = (_Ctr / _Freq) * period::den;
        const clock64_t _Part = (_Ctr % _Freq) * period::den / _Freq;
        return _Whole + _Part;
    }
#endif

    /** 
    * 操作码与状态码定义
    */
    struct net_status {
        static constexpr unsigned int s_accept = 0x01;
        static constexpr unsigned int s_connect = 0x02;
        static constexpr unsigned int s_read = 0x04;
        static constexpr unsigned int s_write = 0x08;
        static constexpr unsigned int s_close = 0x10;
        static constexpr unsigned int s_exec = 0x20;

        static constexpr unsigned int t_activated = 0x40;

        static constexpr unsigned int t_acceptor = 0x0100;
        static constexpr unsigned int t_connector = 0x0200;
        static constexpr unsigned int t_await_undo = 0x0400;

        static constexpr unsigned int t_await_accept = 0x010000;
        static constexpr unsigned int t_await_connect = 0x020000;
        static constexpr unsigned int t_await_read = 0x040000;
        static constexpr unsigned int t_await_write = 0x080000;
        static constexpr unsigned int t_await_close = 0x100000;
        static constexpr unsigned int t_await = 0xFF0000;
    };

    /** net_base 主要负责衔接操作系统
    * 不考虑过度设计,写得比较辣鸡,能用就行。
    */
    class net_base {
    public:
        net_base() {
            fd = INVALID_SOCKET;
            hIocp = NULL;
            AcceptEx = NULL;
            ConnectEx = NULL;
            DisconnectEx = NULL;
            StreamCapacity = 1440;
            Timeout = 0;
            DataBacklog = 0;
            WorkerThreadId = 0;
        }

        static bool sockaddr_from_string(sockaddr_in& _Addr, const std::string& _Dest) {
            _Addr.sin_addr.S_un.S_addr = INADDR_NONE;

            size_t pos = _Dest.find(":");
            if(pos == _nf) {
                nlog("%s->错误的目标地址:(%s)\n", __FUNCTION__, _Dest.data());
                return false;
            }

            auto strip = _Dest.substr(0, pos);
            auto strport = _Dest.substr(pos + 1);
            strport.erase(strport.find_last_not_of("\r\n\t ") + 1);
            strport.erase(0, strport.find_first_not_of("\r\n\t "));
            unsigned short port = (unsigned short)atoi(strport.c_str());
            if (!port) {
                nlog("%s->目标端口号错误:(%s)\n", __FUNCTION__, _Dest.data());
                return false;
            }
            
            strip.erase(strip.find_last_not_of("\r\n\t ") + 1);
            strip.erase(0, strip.find_first_not_of("\r\n\t "));
            auto it = std::find_if(strip.begin(), strip.end(), [](char c)->bool {
                return ((c < '0' || c > '9') && (c != '.'));
                });
            _Addr.sin_family = AF_INET;
            _Addr.sin_port = htons(port);
            if (it != strip.end()) {
                hostent* host = gethostbyname(strip.c_str());
                if (!host) {
                    nlog("%s->错误的目标域名:(%s)\n", __FUNCTION__, _Dest.data());
                    return false;
                }
                _Addr.sin_addr = *(in_addr*)(host->h_addr_list[0]);
            }
            else {
                _Addr.sin_addr.S_un.S_addr = inet_addr(strip.c_str());
            }

            if (_Addr.sin_addr.S_un.S_addr == INADDR_NONE) {
                nlog("%s->错误的目标地址:(%s)\n", __FUNCTION__, _Dest.data());
                return false;
            }
            return true;
        }

        static void sockaddr_any(sockaddr_in& _Addr, unsigned short _Port) {
            _Addr.sin_family = AF_INET;
            _Addr.sin_port = htons(_Port);
            _Addr.sin_addr.S_un.S_addr = INADDR_ANY;
        }

        static void sockaddr_local(sockaddr_in& _Addr, unsigned short _Port) {
            _Addr.sin_family = AF_INET;
            _Addr.sin_port = htons(_Port);
            _Addr.sin_addr.S_un.S_addr = INADDR_LOOPBACK;
        }

        static void* getmswsfunc(SOCKET s, GUID guid) {
            DWORD dwBytes;
            void* lpResult = nullptr;
            WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid,
                sizeof(guid), &lpResult, sizeof(lpResult), &dwBytes, NULL, NULL);
            return lpResult;
        }

        static std::string sockaddr_to_string(const sockaddr_in &_Addr) {
            char buf[256];
            sprintf(buf, "%d.%d.%d.%d:%d", _Addr.sin_addr.S_un.S_un_b.s_b1,
                _Addr.sin_addr.S_un.S_un_b.s_b2,
                _Addr.sin_addr.S_un.S_un_b.s_b3,
                _Addr.sin_addr.S_un.S_un_b.s_b4,
                htons(_Addr.sin_port));
            std::string _Result = buf;
            return _Result;
        }

    private:
        int init(int _StreamCapacity, int _DataBacklog, int _Timeout) {
            if (fd != INVALID_SOCKET) {
                return 0;
            }
            auto reterr = [this](int n) {
                if (fd != INVALID_SOCKET) {
                    closesocket(fd);
                    fd = INVALID_SOCKET;
                }
                return n;
            };
            StreamCapacity = _StreamCapacity;
            Timeout = _Timeout;
            if (Timeout < 0) {
                nlog("%s->Timeout必须>=0", __FUNCTION__);
                return reterr(-1);
            }
            DataBacklog = _DataBacklog;
            fd = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
            if (fd == INVALID_SOCKET) {
                nlog("%s->创建套接字失败:%d", __FUNCTION__, WSAGetLastError());
                return reterr(-1);
            }
            ConnectEx = (LPFN_CONNECTEX)getmswsfunc(fd, WSAID_CONNECTEX);
            if (!ConnectEx) {
                nlog("%s->获取 ConnectEx 地址失败,错误号:%d", __FUNCTION__, WSAGetLastError());
                return reterr(-2);
            }
            AcceptEx = (LPFN_ACCEPTEX)getmswsfunc(fd, WSAID_ACCEPTEX);
            if (!AcceptEx) {
                nlog("%s->获取 AcceptEx 函数失败,错误号:%d", __FUNCTION__, WSAGetLastError());
                return reterr(-3);
            }
            
            // 我已经不止一次做过DisconnectEx的测试,最终结论都是DisconnectEx并不能提高并发连接数。
            // DisconnectEx 在想象中会更快是因为用IOCP队列锁去换系统全局锁带来了性能提升。
            // 还有一种方法是开一个线程搞个表去阻塞调用DisconnectEx,完事之后直接AcceptEx,也就最终把全局内核锁完全转嫁成你自己的锁了。
            // DisconnectEx首先是不同的操作系统行为不一致,真正保险的做法只能在对方关闭连接时,调用DisconnectEx来复用。
            // 对于IOCP来说,也就是在WSARecv或者WSASend 从 GetQueuedCompletionStatus 返回之后,第2个参数transferred == 0时
            // 同时它受到TCP TIME_WAIT状态的影响
            // 系统存在大量TIME_WAIT套接字时,最终得到的效果是,用了更多内存,去换来了更少的并发连接数。

            /*DisconnectEx = (LPFN_DISCONNECTEX)getmswsfunc(fd, WSAID_DISCONNECTEX);
            if (!DisconnectEx) {
                nlog("%s->获取 DisconnectEx 函数失败,错误号:%d", __FUNCTION__, WSAGetLastError());
                return reterr(-4);
            }*/

            hIocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
            if (!hIocp) {
                nlog("%s->创建完成端口失败,错误号:%d", __FUNCTION__, GetLastError());
                return reterr(-5);
            }
            CreateIoCompletionPort((HANDLE)fd, hIocp, 0, 0);
            return 0;
        }

        void close() {
            if (fd != INVALID_SOCKET) {
                closesocket(fd);
                fd = INVALID_SOCKET;
            }

            if (hIocp) {
                CloseHandle(hIocp);
                hIocp = NULL;
            }
        }

        BOOL Accept(SOCKET s, char* _Data, LPOVERLAPPED _Overlapped) {
            DWORD _Received = 0;
            return AcceptEx(fd, s, _Data, 0, sizeof(SOCKADDR_IN) << 1, sizeof(SOCKADDR_IN) << 1, &_Received, _Overlapped);
        }

        BOOL Connect(SOCKET s, sockaddr* _Addr, int _AddrLen, LPOVERLAPPED _Overlapped) {
            DWORD _Sent = 0;
            return ConnectEx(s, _Addr, _AddrLen, nullptr, 0, &_Sent, _Overlapped);
        }

        /*BOOL Disconnect(SOCKET s, LPOVERLAPPED _Overlapped) {
            return DisconnectEx(s, _Overlapped, TF_REUSE_SOCKET, 0);
        }*/

        /* 使用了C++11的条件变量与互斥锁实现了同步消息来保证多线程安全IO处理,本质上只是多线程Output
        * 因为完成端口未实现同步消息机制,所以这种操作无论如何都至少要涉及到两个锁(一个IOCP锁,一个其他锁):
        * 1、采用动态new delete,这种方式最坏的情况要经过那把系统全局的大锁,不可取。
        * 2、采用一个我们自己的锁对象,当前使用的方式。
        * 3、每个套接字上下文拥有一个独立的锁对象,总觉得在这种了不起就才10几万并发IO的场景,锁竞争带来的性能损耗不该发展到这一步。
        */
        int SafeIOMessage(DWORD dwNumberOfBytesTransferred, ULONG_PTR dwCompletionKey) {
            std::unique_lock<std::mutex> lock(safeIO.mtx);
            safeIO.cv.wait(lock, [this]() {
                return (safeIO.s & 1);
            });
            if (safeIO.s == -1)
                return -1;
            safeIO.s = 0;
            PostQueuedCompletionStatus(hIocp, dwNumberOfBytesTransferred, dwCompletionKey, 0);
            safeIO.cv.wait(lock, [this]() {
                return (safeIO.s & 3);
            });
            if (safeIO.s == -1)
                return -1;
            int _Result = safeIO.result;
            safeIO.s = 1;
            safeIO.cv.notify_all();
            return _Result;
        }

        void InitSafeIO() {
            std::lock_guard<std::mutex> lg(safeIO.mtx);
            safeIO.s = 1;
        }

        void ExitSafeIO() {
            std::lock_guard<std::mutex> lg(safeIO.mtx);
            safeIO.s = -1;
            safeIO.cv.notify_all();
        }

        void SafeIOResult(int _Result) {
            // 理论上来说,IOCP工作者线程不需要在此处加锁,实际情况未知,我个人是以悲观的态度对待这个问题
            std::lock_guard<std::mutex> lg(safeIO.mtx);
            safeIO.result = _Result;
            safeIO.s = 2;
            safeIO.cv.notify_all();
        }

    private:
        friend class sock;
        friend class netio;
        friend class coio;
        SOCKET fd;
        HANDLE hIocp;
        LPFN_ACCEPTEX AcceptEx;
        LPFN_CONNECTEX ConnectEx;
        LPFN_DISCONNECTEX DisconnectEx;
        int StreamCapacity;
        int Timeout;
        int DataBacklog;
        DWORD WorkerThreadId;

        struct safeio_send_struct {
            sock* s;
            void* buf;
            int len;
        };

        struct SAFEIO {
            std::mutex mtx;
            std::condition_variable cv;
            int s = -1;
            int result = 0;
        }safeIO;
        
    };

    /*直接继承一个std::string来作为套接字的各种缓冲区*/
    class sock_buffer : public std::string {
    public:
        using _Basetype = std::string;
        using _Basetype::_Basetype;
        void preset_length(size_t _Length) {
            // 直接在二进制层面去搞VC的std::string结构,修改std::string::length()的返回值
            // 这么做的好处是,免去了std::string::resize()的拷贝问题。
            // 注意这段代码仅适用于VC,G++的std::string结构和VC不一样。
            struct __stlstr {
                const char str[0x10];
                size_t len;
            };
            if (this->capacity() < _Length)
                this->reserve(_Length);
            ((__stlstr*)this)->len = _Length;
        }
    };

    /**
    * 协程task
    */
    template<typename _Ty>
    struct net_task_t {
        struct promise_type;
        using _Hty = std::coroutine_handle<promise_type>;
        struct promise_type {
            net_task_t get_return_object() { return { _Hty::from_promise(*this) }; }
            // initial_suspend 里返回return std::suspend_always{};表示协程初始化成功之后就挂起
            // 这里就挂起,是为了给set_sock留出操作的时间,否则一个空函数协程,会在创建完之后直接就销毁。
            auto initial_suspend() { return std::suspend_always{}; }

            auto final_suspend() noexcept { 
                s->on_destroy_coroutine(); 
                return std::suspend_never{}; 
            }
            void unhandled_exception() { std::terminate(); }
            void return_void() { }
            _Ty* s = nullptr;
        };
        _Hty _Handle;
        void resume() { _Handle.resume(); }
        void destroy() { _Handle.destroy(); }
        void set_sock(_Ty* _s) { _Handle.promise().s = _s; }
    };

    /**套接字上下文*/
    class sock {
        // 这是扩展OVERLAPPED结构
        struct binding {
            OVERLAPPED ol;
            int opt;
            sock* s;
        };

        /**
        * 返回给协程recv的对象类型
        */
        class sock_data {
            sock_data(sock* _s) : s(_s) {}
        public:
            char* data() { return s->ibuf.data(); }
            void erase(size_t _Count) { s->ibuf.erase(0, _Count); }
            size_t length() { return s->ibuf.length(); }
            void clear() { s->ibuf.clear(); }

        private:
            friend class sock;
            sock* s;
        };

        /**返回给协程connect和accept的对象类型
        * 用于异步send与close,
        * 其他线程也可以利用这个对象通信,已经处理了线程安全问题,但不太效率,因为使用了全局锁。
        */
        class asyncsock {
        public:
            /**
            * send 是未加锁的发送数据
            * 没有多线程需求时,send是安全的
            */
            int send(void* data, int len) {
                if (s->v->WorkerThreadId != GetCurrentThreadId()) {
                    return s->safe_send(data, len);
                }
                else {
                    return s->send(data, len);
                }
            }

            int send(const void* data, int len) {
                if (s->v->WorkerThreadId != GetCurrentThreadId()) {
                    return s->safe_send(data, len);
                }
                else {
                    return s->send(data, len);
                }
            }

            void close() {
                if (s->v->WorkerThreadId != GetCurrentThreadId()) {
                    s->safe_close();
                }
                else {
                    s->close();
                }
            }

            bool isactivated() { return s->isactivated(); }

            operator bool() {
                return (s != nullptr);
            }

            sockaddr_in& getsockaddr() {
                return s->getsockaddr();
            }

            // 响应超时,这是用来给客户端发送心跳包的
            // 心跳机制是基于操作系统函数 RegisterWaitForSingleObject实现的
            // 会基于netio::init传入的Timeout参数的2/3的频率发送消息
            // 也就是说,Timeout并不是一个绝对准确的数值,这就是为了要给客户端留出发心跳包的切入点的代价。
            // 例如Timeout设置为6000, 真正超时的客户端,将会再4000-8000ms后被检查出来
            void ontimeout(void(*proc)(asyncsock)) {
                if (!s)
                    return;
                s->ontimeout = proc;
            }
            
        private:
            bool operator<(const asyncsock& as) const{
                return (size_t)s < (size_t)as.s;
            }
            friend typename std::less<asyncsock>;

        private:
            friend class netio;
            friend class coio;
            friend class sock;
            sock* s = nullptr;
        };
        
        struct recv_awaitable {
            recv_awaitable(sock* s) : data(s) { }

            // 当编译器自动将await_ready以及await_suspend优化为inline时,协程态引发异常
            // 使await_ready强制noline时,没有异常。
            __declspec(noinline) bool await_ready() {
                // 我当前的vs版本是: vs 2022 17.0.1
                // 这里发现一个编译bug,只要await_ready与await_suspend同时被inline优化
                // 最后从流程态切换回协程态时,会获取 __coro_frame_ptr.__resume_address 做为recv_awaitable对象来使用
                // 紧接着就会引发异常
                // 最终我发现,目前vc的协程与lambda函数之间存在bug,
                // 使用lambda作为协程函数时,如果此lambda函数inline,就可能会有各种指针错误。
                // 我已向vs社区报告过此问题,得到的答复时考虑中,也不知道何时修复。

                if (data.s->st & net_status::t_await_undo) {
                    data.s->ibuf.clear();
                    data.s->st &= (~net_status::t_await_undo);
                    return true;
                }
                return false;
            }

            void await_suspend(std::coroutine_handle<> handle) { }
            sock_data await_resume() const { 
                return data; 
            }
            sock_data data;
        };

        struct sock_awaitable {
            sock_awaitable(sock* _s) { s.s = _s; }
            __declspec(noinline) bool await_ready() {
                if (s.s->st & net_status::t_await_undo) {
                    s.s->st &= (~net_status::t_await_undo);
                    return true;
                }
                return false;
            }
            void await_suspend(std::coroutine_handle<> handle) { }
            sock::asyncsock await_resume() { return s; }
            sock::asyncsock s;
        };

        struct close_awaitable {
            close_awaitable(bool _IsSuspend) : IsSuspend(_IsSuspend) { }
            __declspec(noinline) bool await_ready() { return (IsSuspend == false); }
            void await_suspend(std::coroutine_handle<> handle) { }
            void await_resume() { }
            bool IsSuspend;
        };

        struct send_awaitable {
            send_awaitable(sock* _s) : s(_s) {}
            __declspec(noinline) bool await_ready() {
                if (s->st & net_status::t_await_undo) {
                    s->st &= (~net_status::t_await_undo);
                    return true;
                }
                return false;
            }
            void await_suspend(std::coroutine_handle<> handle) { }
            int await_resume() { return s->syncsendlen; }
            sock* s;
        };

    public:
        using opcode = net_status;
        sock(net_base* _v) {
            fd = INVALID_SOCKET;
            v = _v;
            st = 0;
            ontimeout = nullptr;
            memset(&input.ol, 0, sizeof(input.ol));
            memset(&output.ol, 0, sizeof(output.ol));
            
            if (v->Timeout)
                output.ol.hEvent = input.ol.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
            else
                output.ol.hEvent = input.ol.hEvent = NULL;
            output.s = input.s = this;
            output.opt = opcode::s_write;
            ibuf.reserve(v->StreamCapacity);
            obuf.reserve(v->StreamCapacity);
        }

        ~sock() {
            close();
            if (!output.ol.hEvent)
                return;
            CloseHandle(output.ol.hEvent);
            output.ol.hEvent = output.ol.hEvent = NULL;
            if (st & opcode::t_await) 
                co.destroy();
        }

        void on_destroy_coroutine() {
            close();
            st &= (~opcode::t_connector);
        }

        bool isactivated() {
            return ((st & opcode::t_activated) != 0);
        }

        int send(void* data, int len) {
            if (!len)
                return len;
            int n = (int)(obuf.capacity() - obuf.length());
            if (n >= len && !obacklog.length()) {
                obuf.append((char*)data, len);
            }
            else {
                if (v->DataBacklog != 0 && obacklog.length() + len > v->DataBacklog) {
                    //积压值超过限制
                    close();
                    return -1;
                }
                obacklog.append((char*)data, len);
            }
            return (write() == 0) ? len : -1;
        }

        int send(const void* data, int len) {
            return send((void*)data, len);
        }

        int safe_send(void* data, int len) {
            net_base::safeio_send_struct param = { this, data, len };
            return v->SafeIOMessage(opcode::s_write, (ULONG_PTR)&param);
        }

        int safe_send(const void* data, int len) {
            net_base::safeio_send_struct param = { this, (void*)data, len };
            return v->SafeIOMessage(opcode::s_write, (ULONG_PTR)&param);
        }

        int safe_close() {
            return v->SafeIOMessage(opcode::s_close, (ULONG_PTR)this);
        }

        void close() {
            if (INVALID_SOCKET == fd)
                return;
            ontimeout = nullptr;
            closesocket(fd);
            fd = INVALID_SOCKET;
            st &= ~opcode::t_activated;
            st |= opcode::s_close;
            set_timer(false);
            ibuf.clear();
            if (obacklog.capacity() <= 0x0F)
                return;
            sock_buffer tmp;
            obacklog.swap(tmp);
        }

        sockaddr_in& getsockaddr() { return sa; }

    private:
        int initfd() {
            if (INVALID_SOCKET != fd) {
                return 0;
            }
                
            fd = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
            if (INVALID_SOCKET == fd) {
                nlog("%s->创建套接字失败,错误号:%d", __FUNCTION__, WSAGetLastError());
                return -1;
            }
            LINGER linger = { 1, 0 };
            setsockopt(fd, SOL_SOCKET, SO_LINGER, (char*)&linger, sizeof(linger));
            int b = 1;
            setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&b, sizeof(b));
            CreateIoCompletionPort((HANDLE)fd, v->hIocp, 0, 0);
            return 0;
        }

        int bindlocal() {
            sockaddr_in local;
            local.sin_family = AF_INET;
            local.sin_addr.S_un.S_addr = INADDR_ANY;
            local.sin_port = 0;
            if (SOCKET_ERROR == bind(fd, (LPSOCKADDR)&local, sizeof(local))) {
                nlog("%s->绑定本地端口失败,错误号:%d", __FUNCTION__, WSAGetLastError());
                return -1;
            }
            return 0;
        }

        bool set_dest(const std::string& _Dest) {
            return net_base::sockaddr_from_string(sa, _Dest);
        }

        void set_timer(bool _Enable) {
            if (_Enable) {
                if (hTimer)
                    return;
                RegisterWaitForSingleObject(&hTimer, output.ol.hEvent, [](void* Param, BOOLEAN TimerOrWaitFired) {
                    if (!TimerOrWaitFired)
                        return;
                    sock* p = (sock*)Param;
                    PostQueuedCompletionStatus(p->v->hIocp, 0, (ULONG_PTR)p, nullptr);
                }, this, (ULONG)v->Timeout * 2 / 3, WT_EXECUTEDEFAULT);
            }
            else {
                if (!hTimer)
                    return;
                std::ignore = UnregisterWaitEx(hTimer, NULL);
                hTimer = NULL;
            }
        }

        int nat() {
            sockaddr_in _Addr;
            int _AddrLen = sizeof(_Addr);
            if (-1 == getsockname(fd, (sockaddr*)&_Addr, &_AddrLen))
                return -1;
            SOCKET fdNat = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
            LINGER linger = { 1, 0 };
            setsockopt(fdNat, SOL_SOCKET, SO_LINGER, (char*)&linger, sizeof(linger));
            CreateIoCompletionPort((HANDLE)fdNat, v->hIocp, 0, 0);
            if (-1 == bind(fdNat, (sockaddr*)&_Addr, sizeof(_Addr))) {
                closesocket(fdNat);
                return -1;
            }
            close();
            fd = fdNat;
            return connect();
        }

        int accept() {
            if (((st & 0xFF) | opcode::s_close) != opcode::s_close) {
                nlog("%s->当前套接字未断开连接!", __FUNCTION__);
                return -1;
            }

            if (initfd())
                return -1;
            DWORD _Received = 0;
            input.opt = opcode::s_accept;
            st &= (~opcode::s_close);
            st |= opcode::s_accept;
            if (!v->Accept(fd, ibuf.data(), &input.ol)) {
                int _Error = WSAGetLastError();
                if (_Error != ERROR_IO_PENDING) {
                    st &= (~opcode::s_accept);
                    nlog("%s->AcceptEx失败, 错误号:", __FUNCTION__, WSAGetLastError());
                    return -1;
                }
            }
            return 0;
        }

        int connect() {
            if (((st & 0xFF) | opcode::s_close) != opcode::s_close) {
                nlog("%s->当前套接字未断开连接!", __FUNCTION__);
                return -1;
            }
            if (INVALID_SOCKET == fd) {
                if (initfd())
                    return -1;
                if (bindlocal())
                    return -1;
            }
            input.opt = opcode::s_connect;
            st &= (~opcode::s_close);
            st |= opcode::s_connect;

            if (!v->Connect(fd, (sockaddr*)&sa, sizeof(sa), &input.ol)) {
                int _Error = WSAGetLastError();
                if (_Error != ERROR_IO_PENDING) {
                    nlog("%s->ConnectEx失败, 错误号:", __FUNCTION__, WSAGetLastError());
                    return -1;
                }
            }
            return 0;
        }

        int write() {
            if (!(st & opcode::t_activated)) {
                return -1;
            }
            if (st & (opcode::s_write | opcode::s_close | opcode::s_accept | opcode::s_connect))
                return 0;
            if (obacklog.size()) {
                size_t rl = obuf.capacity() - obuf.length();
                if (rl > obacklog.length())
                    rl = obacklog.length();
                if (rl) {
                    obuf.append(obacklog.data(), rl);
                    obacklog.erase(0, rl);
                }
            }
            WSABUF buf = { (ULONG)(obuf.length()), obuf.data() };
            if (!buf.len)
                return 0;
            st |= opcode::s_write;
            DWORD _Sent = 0;
            if (SOCKET_ERROR == WSASend(fd, &buf, 1, &_Sent, 0, &(output.ol), NULL)) {
                int _Error = WSAGetLastError();
                if (WSA_IO_PENDING != _Error) {
                    st &= (~opcode::s_write);
                    return -1;
                }
            }
            return 0;
        }

        int read() {
            if (!(st & opcode::t_activated)) {
                return -1;
            }
            if (st & (opcode::s_read | opcode::s_close | opcode::s_accept | opcode::s_connect))
                return 0;
            WSABUF buf = {
                (ULONG)(ibuf.capacity() - ibuf.length()),
                ibuf.data() + ibuf.length()
            };
            if ((int)buf.len <= 0) {
                return -1;
            }
            DWORD _Received = 0;
            DWORD _Flags = 0;
            st |= opcode::s_read;
            input.opt = opcode::s_read;
            if (SOCKET_ERROR == WSARecv(fd, &buf, 1, &_Received, &_Flags, &(input.ol), NULL)) {
                int _Error = WSAGetLastError();
                if (WSA_IO_PENDING != _Error) {
                    st &= ~(opcode::s_read);
                    return -1;
                }
            }
            return 0;
        }

    private:
        friend class coio;
        friend class netio;
        SOCKET fd;
        sockaddr_in sa;
        net_base* v;
        int st;
        binding input, output;
        sock_buffer ibuf, obuf, obacklog;
        HANDLE hTimer;
        aqx::clock64_t rtime;
        net_task_t<sock> co;
        void (*ontimeout)(asyncsock);
        int syncsendlen;
    };

    // coio是传参给协程函数的操作对象
    class coio {
        coio(sock* _s) : s(_s) {}

    public:
        using asyncsock = sock::asyncsock;
        using sock_awaitable = sock::sock_awaitable;
        using close_awaitable = sock::close_awaitable;
        using send_awaitable = sock::send_awaitable;
        using recv_awaitable = sock::recv_awaitable;

        struct nat_awaitable {
            nat_awaitable(bool _ret) : ret(_ret) {  }
            __declspec(noinline) bool await_ready() { return (ret == false); }
            void await_suspend(std::coroutine_handle<> handle) { }
            bool await_resume() { return ret; }
            bool ret;
        };

        coio() : s(nullptr) {}

        sock_awaitable connect(const std::string& _Dest) {
            if (!s->set_dest(_Dest)) {
                // 设置目标地址失败时,撤销等待。
                s->st |= net_status::t_await_undo;
                return sock_awaitable(s);
            }

            // 我使用的协程initial_suspend中是不挂起的, 
            // 所以一个套接字的首次connect操作基本都是由其他线程引发的
            // 而且很可能在await_suspend之前,IOCP队列就已经完成
            if (GetCurrentThreadId() == s->v->WorkerThreadId) {
                if (s->connect()) {
                    // 连接失败时,撤销等待。
                    s->st |= net_status::t_await_undo;
                    return sock_awaitable(s);
                }
            }
            else {
                // 因此,不是IOCP队列线程引发的connect就发送到IOCP队列去处理
                PostQueuedCompletionStatus(s->v->hIocp, net_status::s_connect, (ULONG_PTR)s, 0);
            }

            s->st |= net_status::t_await_connect;
            return sock_awaitable(s);
        }

        sock_awaitable accept() {
            // 首次accept虽然也是其他线程调用的(一般是main线程)
            // 但首次accept时,IOCP工作线程尚未启动,因此可以无视掉connect的那个问题。
            s->st |= ((!s->accept()) ? net_status::t_await_accept : net_status::t_await_undo);
            return sock_awaitable(s);
        }

        /**
        * 以下几个成员函数中的参数asyncsock _s应该等同于私有成员s,除非强行在外部使用syncio对象
        * 使用参数而不是私有成员的原因是防止在尚未连接前调用IO操作。
        * 私有成员s将专用于accept与connect
        */
        close_awaitable close(asyncsock _s) {
            _s.s->close();
            if ((_s.s->st & 0xFF) == net_status::s_close) {
                // 如果套接字上已经没有任何IO事件,就让awaitable直接唤醒协程
                // 通常这才是正常状态,但如果有其他线程异步send时,可能就会有未决IO存在了。
                return close_awaitable(false);
            }
            _s.s->st |= net_status::t_await_close;
            return close_awaitable(true);
        }

        send_awaitable send(asyncsock _s, void *buf, int len) {
            _s.s->syncsendlen = _s.send(buf, len);
            _s.s->st |= ((_s.s->syncsendlen >= 0) ? net_status::t_await_write : net_status::t_await_undo);
            return sock::send_awaitable(_s.s);
        }

        send_awaitable send(asyncsock _s, const void* buf, int len) {
            _s.s->syncsendlen = _s.send(buf, len);
            _s.s->st |= ((_s.s->syncsendlen >= 0) ? net_status::t_await_write : net_status::t_await_undo);
            return sock::send_awaitable(_s.s);
        }

        recv_awaitable recv(asyncsock _s) {
            int n = _s.s->read();
            if (n < 0) {
                _s.s->st |= net_status::t_await_undo;
            }
            else {
                _s.s->st |= net_status::t_await_read;
            }
            return recv_awaitable(_s.s);
        }

        nat_awaitable nat(asyncsock _s, const std::string& _Dest) {
            if ((_s.s->st & 0xFF) != net_status::t_activated) {
                // nat之前必须保证所有未决IO都已经返回,与打洞服务器保持正常连接状态,否则就是失败。
                // 到这里失败时,依旧与打洞服务器保持着正常连接。
                return nat_awaitable(false);
            }

            sockaddr_in sa = _s.s->sa;
            if (!_s.s->set_dest(_Dest)) {
                // 设置目标地址失败
                // 到这里失败时,依旧与打洞服务器保持着正常连接。
                _s.s->sa = sa;
                return nat_awaitable(false);
            }

            if (_s.s->nat()) {
                // 到这一步失败时,与打洞服务器的连接就有可能会断掉
                // nat失败时,本就应该直接close(); 
                // 都失败了,我想不出还要跟打洞服务器继续苟合的理由。
                // 如果所有状态全都对,还失败,可能就是双方正好属于无法穿透的NAT类型环境下。
                // 我对此研究不多,业界内真正懂行的也不多,资料更是少得可怜,我只知道TCP NAT在代码上的表现为:
                //     1、与打洞服务器保持连接的这个套接字设置了SO_REUSEADDR,确保这个套接字绑定的本地端口可复用。
                //          在这个库里我全都设置了可复用,但主要目的是为了缓解TIME_WAIT,并不是为了穿透。
                //     2、双方通过打洞服务器沟通好各自的远端地址
                //     3、双方都创建一个新的套接字,并将该套接字绑定到本地与打洞服务器进行连接的那个地址(getsockname可以获得)
                //          到第 3 步处理好之后,与打洞服务器连接的那个套接字,已经废了,无法再进行通信,此时应该把它close掉。
                //     4、最后双方都connect对方的地址。
                _s.s->sa = sa;
                return nat_awaitable(false);
            }

            s->st |= net_status::t_await_connect;
            return nat_awaitable(true);
        }

        bool valid() {
            return (s != nullptr);
        }

        operator bool () {
            return valid();
        }

    private:
        friend class netio;
        sock* s;
    };

    /**
    * 可以简单把netio看成是一个容器的作用
    * 它主要用于对接net_base,创建线程,处理IO事件。
    */
    class netio {
        struct IOCP_STATUS {
            DWORD transferred;
            SIZE_T key;
            typename sock::binding* pb;
            BOOL ok;
        };

    public:
        /**listener 只是一种简单的参数包装,只是为了方便构造而已
        * 构造参数:
        * _Dest 要监听的地址和端口,格式为:"a.b.c.d:port"
        * _ListenBacklog 系统函数listen的第2个参数
        * _MaxClients 最多同时接受的客户端数量
        */
        class listener {
        public:
            listener() {
                max_clients = 0;
                listen_backlog = 0;
                addr.sin_addr.S_un.S_addr = INADDR_NONE;
            }

            listener(const std::string& _Dest, int _ListenBacklog, size_t _MaxClients) {
                max_clients = _MaxClients;
                listen_backlog = _ListenBacklog;
                net_base::sockaddr_from_string(addr, _Dest);
            }

        private:
            friend class netio;
            sockaddr_in addr;
            int listen_backlog;
            size_t max_clients;
        };

        using asyncsock = sock::asyncsock;
        using sock_data = sock::sock_data;
        using opcode = net_status;
        using task = net_task_t<sock>;

        int init(int _StreamCapacity = 1440, int _DataBacklog = 0, int _Timeout = 0) {
            std::lock_guard<std::mutex> lg(mtx);
            return nwb.init(_StreamCapacity, _DataBacklog, _Timeout);
        }

        int server(const std::function<task(coio)> &_func, const listener &param) {
            std::lock_guard<std::mutex> lg(mtx);
            if (thd.joinable()) {
                nlog("%s->netio已启动, 请勿重复调用!", __FUNCTION__);
                return 0;
            }

            if (nwb.fd == INVALID_SOCKET)
                return -1;

            cofunc = _func;
            if (param.addr.sin_addr.S_un.S_addr != INADDR_NONE) {
                if (SOCKET_ERROR == bind(nwb.fd, (SOCKADDR*)&param.addr, sizeof(SOCKADDR))) {
                    nlog("%s->绑定端口失败,错误号:%d", __FUNCTION__, WSAGetLastError());
                    nwb.close();
                    return -1;
                }

                if (SOCKET_ERROR == ::listen(nwb.fd, param.listen_backlog)) {
                    nlog("%s->监听失败,错误号:%d", __FUNCTION__, WSAGetLastError());
                    nwb.close();
                    return -1;
                }

                for (int i = 0; i < param.max_clients; i++) {
                    sock* psock = new sock(&nwb);
                    a_list.push_back(psock);
                    psock->st |= opcode::t_acceptor;
                    psock->co = cofunc(coio(psock));
                    psock->co.set_sock(psock);
                    psock->co.resume();
                }
            }
            __start();
            return 0;
        }



        // client是一次性的,专用于客户端
        // 让它返回asyncsock对象的理由是为了给脚本语言预留的
        // 例如可以使用lua去实现类似node.js的那种connect之后不管连没连上就先得到对象去绑定事件的机制。
        asyncsock client(const std::function<task(coio)>& _func) {
            std::lock_guard<std::mutex> lg(mtx);
            coio io;
            asyncsock ret;
            if (!thd.joinable()) {
                // 如果线程未启动,尝试启动线程,这之后如果要回收资源,是需要stop和release的
                if (nwb.fd == INVALID_SOCKET)
                    return ret;
                __start();
            }
            io.s = get_connector();
            ret.s = io.s;
            io.s->co = _func(io);
            io.s->co.set_sock(io.s);
            io.s->co.resume();
            return ret;
        }

        void exec(const std::function<void()>& _func) {
            if (!thd.joinable()) {
                // 如果线程未启动,尝试启动线程,这之后如果要回收资源,是需要stop和release的
                if (nwb.fd == INVALID_SOCKET)
                    return;
                __start();
            }

            nwb.SafeIOMessage(opcode::s_exec, (ULONG_PTR)&_func);
        }

        void stop() {
            std::lock_guard<std::mutex> lg(mtx);
            if (thd.joinable()) {
                PostQueuedCompletionStatus(nwb.hIocp, -1, 0, 0);
                thd.join();
            }
        }

        void release() {
            std::lock_guard<std::mutex> lg(mtx);
            if (thd.joinable()) {
                nlog("%s->nio正在运行,请先stop", __FUNCTION__);
                return;
            }

            for (auto p : a_list) {
                delete p;
            }
            a_list.clear();

            for (auto p : c_list) {
                delete p;
            }
            c_list.clear();
            nwb.close();
        }

    private:
        sock* get_connector() {
            sock* psock = nullptr;
            
            for (auto v : c_list) {
                if ((v->st & opcode::t_connector) == 0 && ((v->st & 0xFF)| opcode::s_close) == opcode::s_close) {
                    psock = v;
                    break;
                }
            }

            if (!psock) {
                psock = new sock(&nwb);
                c_list.push_back(psock);
            }

            psock->st |= opcode::t_connector;
            return psock;
        }

        void on_connect(sock& s) {
            s.ibuf.clear();
            s.obuf.clear();
            s.obacklog.clear();
            s.rtime = aqx::now();
            if (nwb.Timeout != 0)
                s.set_timer(true);
            s.st |= opcode::t_activated;
        }
        
        void on_accept(sock &s) {
            // 懒得去调用GetAcceptExSockAddrs,有硬编码可用
#ifndef _WIN64
            s.sa = *(sockaddr_in*)(s.ibuf.data() + 0x26);
#else
            s.sa = *(sockaddr_in*)(s.ibuf.data() + 0x20);
#endif
            on_connect(s);
        }
        
        bool on_resume(sock& s) {
            if (s.st & opcode::t_await) {
                // 清除所有协程等待标志
                s.st &= (~opcode::t_await);

                // 唤醒协程
                s.co.resume();
                return true;
            }
            return false;
        }

        void on_close(sock& s) {
            if ((s.st & 0xFF) == opcode::s_close) {
                s.st &= ~opcode::s_close;
                on_resume(s);
            }
        }

        bool error_resume(sock &s) {
            int st = s.st & opcode::t_await;
            switch (st) {
            case opcode::t_await_accept:
            case opcode::t_await_connect:
            case opcode::t_await_close:
                s.st &= (~opcode::t_await);
                s.co.resume();
                return true;
            case opcode::t_await_read:
                s.ibuf.clear();
                s.st &= (~opcode::t_await);
                s.co.resume();
                return true;
            case opcode::t_await_write:
                s.syncsendlen = -1;
                s.st &= (~opcode::t_await);
                s.co.resume();
                return true;
            default:
                break;
            }
            return false;
        }

        void on_reset(sock &s) {
            if ((s.st & 0xFF) == opcode::s_close) {
                s.st &= ~opcode::s_close;
                if (s.st & opcode::t_acceptor) {
                    // 如果服务端协程不在一个循环里,协程返回自动销毁后就会这样
                    // 此时的挽救措施就是创建一个新的协程
                    s.co = cofunc(coio(&s));
                }
            }
        }

        void on_completion(IOCP_STATUS& st) {
            sock& s = *(st.pb->s);
            int op = st.pb->opt;
            s.st &= (~op);
            if (s.st & opcode::s_close)
                op = 0;
            //nlog("on_completion:%I64X, %d", &s, op);
            switch (op) {
            case 0:
                break;
            case opcode::s_accept:
                on_accept(s);
                break;
            case opcode::s_connect:
                if (!st.ok && WSAGetLastError() == 1225) {
                    // 出现这种错误,一般是由于服务端没有在监听指定端口,直接被操作系统拒绝了。
                    op = 0;
                    break;
                }
                on_connect(s);
                break;
            case opcode::s_read:
                if (!st.transferred) {
                    op = 0;
                    break;
                }
                s.rtime = aqx::now();
                s.ibuf.preset_length(s.ibuf.length() + st.transferred);
                break;
            case opcode::s_write:
                if (!st.transferred) {
                    op = 0;
                    break;
                }
                s.rtime = aqx::now();
                s.obuf.erase(0, st.transferred);
                if (s.obuf.length() || s.obacklog.length()) {
                    if (s.write()) {
                        op = 0;
                        break;
                    }
                }
                // write操作可能是非协程发起的,协程很可能挂起在recv,因此需要判断一下。
                if (!(s.st & opcode::t_await_write))
                    return;
                break;
            }
            
            //nlog("on_completion2:%I64X, %d", &s, op);
            if (!op) {
                if (error_resume(s))
                    return;
                // 只有当协程被销毁时,error_resume才会返回false
                s.close();
                on_reset(s);
                return;
            }
            
            on_resume(s);
            if (s.st & opcode::s_close)
                return on_close(s);
        }

        void on_msgtimeout(sock *psock) {
            if (aqx::now() - psock->rtime >= nwb.Timeout && (psock->st & opcode::t_activated)) {
                psock->close();
                if (error_resume(*psock))
                    return;
                on_reset(*psock);
                return;
            }

            if (psock->ontimeout != nullptr) {
                asyncsock as;
                as.s = psock;
                psock->ontimeout(as);
            }
        }

        void on_msgconnect(sock* psock) {
            if (psock->connect()) {
                psock->close();
                if (error_resume(*psock))
                    return;
                on_reset(*psock);
            }
        }

        void on_msgwrite(net_base::safeio_send_struct* pss) {
            nwb.SafeIOResult(pss->s->send(pss->buf, pss->len));
        }

        void on_msgclose(sock* psock) {
            psock->close();
            nwb.SafeIOResult(0);
        }

        void __start() {
            thd = std::thread([this]() {
                nwb.WorkerThreadId = GetCurrentThreadId();
                srand((unsigned int)aqx::now() + nwb.WorkerThreadId);
                nwb.InitSafeIO();
                IOCP_STATUS st = { 0,0,0,0 };
                //nlog("netio::worker->I/O工作线程 %d 开始!", nwb.WorkerThreadId);
                
                for (;;) {
                    st.ok = GetQueuedCompletionStatus(nwb.hIocp,
                        &(st.transferred),
                        &(st.key),
                        (OVERLAPPED**)&(st.pb),
                        INFINITE);

                    if (!st.pb) {

                        if (st.transferred == -1) 
                            break;
                        
                        switch (st.transferred) {
                        case 0:
                            on_msgtimeout((sock*)st.key);
                            break;
                        case opcode::s_connect:
                            on_msgconnect((sock*)st.key);
                            break;
                        case opcode::s_write: 
                            on_msgwrite((net_base::safeio_send_struct*)st.key);
                            break;
                        case opcode::s_close:
                            on_msgclose((sock*)st.key);
                            break;
                        case opcode::s_exec:
                            (*((std::function<void()>*)st.key))();
                            nwb.SafeIOResult(0);
                            break;
                        }
                        continue;
                    }
                    on_completion(st);
                }
                
                nwb.ExitSafeIO();
                nwb.WorkerThreadId = 0;
                //nlog("netio::worker->I/O工作线程 %d 已停止!", nwb.WorkerThreadId);
            });
        }

        

    private:
        net_base nwb;
        std::list<sock*> a_list;
        std::list<sock*> c_list;
        std::function<task(coio)> cofunc;
        std::thread thd;
        std::mutex mtx;
    };
}

#pragma warning(pop)

这个库我已经去除了各种耦合,除了日志库,aqx::log我自己写的一个简单的格式化日志库:

logger.hpp
#pragma once
#include <iostream>
#include <string>
#include <time.h>
#include <stdarg.h>
#include <mutex>
#include <vector>

//aqx::log不与aqx其他库耦合
#if defined(_WIN32) || defined(_WIN64)
#ifndef _WINDOWS_
#include <WinSock2.h>
#endif
#define __aqxlog_getpid GetCurrentProcessId
#define __aqxlog_gettid GetCurrentThreadId
#include <io.h>
#else
#if defined(__linux__)
#include <unistd.h>
#include <sys/syscall.h>
#define __aqxlog_getpid getpid
#define __aqxlog_gettid() syscall(__NR_gettid)
#endif
#endif

#pragma warning(push)
#pragma warning(disable:4996)

namespace aqx {

    class log {
    private:
        struct _format_texts {
            std::string time;
            std::string type;
            std::string pid;
            std::string tid;
        };

    public:
        static constexpr auto hs_time{ static_cast<int>(1) };
        static constexpr auto hs_type{ static_cast<int>(2) };
        static constexpr auto hs_pid{ static_cast<int>(4) };
        static constexpr auto hs_tid{ static_cast<int>(8) };

        log() {
            _stdout_fp = stdout;
            fp = stdout;
            _fmtts = { "%Y/%m/%d %H:%M:%S ", "{%s} ",  "[%d] ",  "(%d) " };
            head_style = log::hs_time;
            head_presize = _gethps();
            _EnableInfo = true;
            _EnableError = false;
            _EnableDebug = false;
            _EnableWarn = false;
            _DefType = "info";
            s.reserve(0x1000);
        }

        ~log() {
            if (fp != _stdout_fp)
                fclose(fp);
        }

        void enable(const std::string_view& _Type, bool _Enable) {
            std::lock_guard<std::mutex> lg(_Mtx);
            if (_Type == "info")
                _EnableInfo = _Enable;
            else if (_Type == "error")
                _EnableError = _Enable;
            else if (_Type == "debug")
                _EnableDebug = _Enable;
            else if (_Type == "warn")
                _EnableWarn = _Enable;
        }

        void seths(int hs) {
            std::lock_guard<std::mutex> lg(_Mtx);
            head_style = hs;
            head_presize = _gethps();
        }

        void sethfmt(int _Style, const char* _Fmt) {
            std::lock_guard<std::mutex> lg(_Mtx);
            switch (_Style) {
            case hs_time:
                _fmtts.time = _Fmt;
                break;
            case hs_type:
                _fmtts.type = _Fmt;
                break;
            case hs_pid:
                _fmtts.pid = _Fmt;
                break;
            case hs_tid:
                _fmtts.tid = _Fmt;
                break;
            }
            head_presize = _gethps();
        }

        bool setvfs(const char* _FileName, bool _PutStdout = false) {
            std::lock_guard<std::mutex> lg(_Mtx);
            FILE* _tmp = fopen(_FileName, "ab");
            if (!_tmp)
                return false;
            if (fp != _stdout_fp)
                fclose(fp);
            fp = _tmp;
            PutStdout = _PutStdout;
            return true;
        }

        log& info(const char* _Fmt, ...) {
            std::lock_guard<std::mutex> lg(_Mtx);
            if (!_EnableInfo)
                return *this;
            va_list vl;
            va_start(vl, _Fmt);
            _build("info", _Fmt, vl);
            va_end(vl);
            _putlog();
            return *this;
        }

        log& debug(const char* _Fmt, ...) {
            std::lock_guard<std::mutex> lg(_Mtx);
            if (!_EnableDebug)
                return *this;
            va_list vl;
            va_start(vl, _Fmt);
            _build("info", _Fmt, vl);
            va_end(vl);
            _putlog();
            return *this;
        }

        log& error(const char* _Fmt, ...) {
            std::lock_guard<std::mutex> lg(_Mtx);
            if (!_EnableError)
                return *this;
            va_list vl;
            va_start(vl, _Fmt);
            _build("info", _Fmt, vl);
            va_end(vl);
            _putlog();
            return *this;
        }

        log& warn(const char* _Fmt, ...) {
            std::lock_guard<std::mutex> lg(_Mtx);
            if (!_EnableWarn)
                return *this;
            va_list vl;
            va_start(vl, _Fmt);
            _build("info", _Fmt, vl);
            va_end(vl);
            _putlog();
            return *this;
        }

        log& operator()(const char* _Fmt, ...) {
            std::lock_guard<std::mutex> lg(_Mtx);
            if (!_EnableInfo)
                return *this;
            va_list vl;
            va_start(vl, _Fmt);
            _build(_DefType.c_str(), _Fmt, vl);
            va_end(vl);
            _putlog();
            return *this;
        }

    private:
        void _putlog() {
            fputs(s.data(), fp);
            if (fp != _stdout_fp) {
                //fflush(fp);
                if (PutStdout)
                    fputs(s.data(), _stdout_fp);
            }
        }

        size_t _build(const char* _Type, const char* _Fmt, va_list vl) {
            s.clear();
            size_t n = vsnprintf(nullptr, 0, _Fmt, vl);
            if (n <= 0)
                return _build_head(_Type);
            if (n >= s.capacity()) {
                s.clear();
                s.reserve(n + head_presize);
            }
            size_t _Pos = _build_head(_Type);
            char* p = (char*)s.data();
            _Pos += vsnprintf(p + _Pos, s.capacity(), _Fmt, vl);
            char c = p[_Pos - 1];
#ifdef _WINDOWS_
            if (c != '\r' && c != '\n') {
                p[_Pos++] = '\r';
                p[_Pos++] = '\n';
                p[_Pos] = '\0';
            }

#else
            if (c != '\r' && c != '\n') {
                p[_Pos++] = '\n';
                p[_Pos] = '\0';
            }
#endif

            return _Pos;
        }

        size_t _build_time(size_t _Pos) {
            if (!(head_style & log::hs_time))
                return _Pos;
            time_t t = time(NULL);
            auto _Tm = localtime(&t);
            _Pos += strftime((char*)s.data() + _Pos, head_presize, _fmtts.time.c_str(), _Tm);
            return _Pos;
        }

        size_t _build_type(size_t _Pos, const char* _Type) {
            if (!(head_style & log::hs_type))
                return _Pos;
            _Pos += sprintf((char*)s.data() + _Pos, _fmtts.type.c_str(), _Type);
            return _Pos;
        }

        size_t _build_pid(size_t _Pos) {
            if (!(head_style & log::hs_pid))
                return _Pos;
            auto _Pid = __aqxlog_getpid();
            _Pos += sprintf((char*)s.data() + _Pos, _fmtts.pid.c_str(), _Pid);
            return _Pos;
        }

        size_t _build_tid(size_t _Pos) {
            if (!(head_style & log::hs_tid))
                return _Pos;
            auto _Tid = __aqxlog_gettid();
            _Pos += sprintf((char*)s.data() + _Pos, _fmtts.tid.c_str(), _Tid);
            return _Pos;
        }

        size_t _build_head(const char* _Type) {
            return _build_tid(_build_pid(_build_type(_build_time(0), _Type)));
        }

        size_t _gethps() {
            size_t _Result = 3;
            if (head_style & log::hs_time)
                _Result += ((_fmtts.time.length() << 1) + 30);
            if (head_style & log::hs_type)
                _Result += ((_fmtts.pid.length() << 1) + 12);
            if (head_style & log::hs_pid)
                _Result += ((_fmtts.pid.length() << 1) + 20);
            if (head_style & log::hs_tid)
                _Result += ((_fmtts.pid.length() << 1) + 20);
            return _Result;
        }

    private:
        std::vector<char> s;
        FILE* fp;
        _format_texts _fmtts;
        int head_style;
        size_t head_presize;
        bool PutStdout;
        FILE* _stdout_fp;
        std::mutex _Mtx;
        std::string _DefType;
        bool _EnableInfo;
        bool _EnableDebug;
        bool _EnableError;
        bool _EnableWarn;
    };
}

static aqx::log logger;
#pragma warning(pop)

最后是测试代码:客户端和服务端放在一起了,要分离就从nio.init后面的几个地方分离一下。

// main.cpp
#include <iostream>
#include <aqx/netio.hpp>

int main()
{
    aqx::init_winsock();

    aqx::netio nio;
    nio.init(1440, 0x10000);

    // 一个简单的echo服务器例子:

    nio.server([](aqx::coio io)->aqx::netio::task {
        // 服务端始终应该放在一个死循环里,否则兜底逻辑会反复创建新协程。
        for (;;) {
            // io.accept会返回一个可用于异步send和close的对象
            auto s = co_await io.accept();
            logger("客户端连入:%s", aqx::net_base::sockaddr_to_string(s.getsockaddr()));
            for (;;) {
                auto buf = co_await io.recv(s);
                if (!buf.length()) {
                    logger("断开连接!");
                    break;
                }

                puts(buf.data());
                buf.clear();
                // 异步发送,协程不会在这里挂起
                s.send("收到!", 5);
                
            }
            co_await io.close(s);
            logger("已关闭!");
        }
    }, aqx::netio::listener("0.0.0.0:55554", 100, 100));



    // 我已经懒到让客户端和服务端都放在一起了,要分自己分
    auto sock1 = nio.client([](aqx::coio io)->aqx::netio::task {
        // 客户端只有需要自动重连,才放在循环里处理
        for (;;) {
            auto s = co_await io.connect("127.0.0.1:55554");
            if (!s) {
                co_await io.close(s);
                continue;
            }

            for (;;) {
                auto buf = co_await io.recv(s);
                if (!buf.length()) {
                    break;
                }
                puts(buf.data());
                buf.clear();
            }
            
            co_await io.close(s);
        }
       
    });

    // 我已经懒到让客户端和服务端都放在一起了,要分自己分
    auto sock2 = nio.client([](aqx::coio io)->aqx::netio::task {
        // 客户端只有需要自动重连,才放在循环里处理
        for (;;) {
            auto s = co_await io.connect("127.0.0.1:55554");
            if (!s) {
                co_await io.close(s);
                continue;
            }

            for (;;) {
                auto buf = co_await io.recv(s);
                if (!buf.length()) {
                    break;
                }
                puts(buf.data());
                buf.clear();
            }

            co_await io.close(s);
        }

    });
    
    std::string str;
    for (;;) {
        std::cin >> str;
        if (str == "exit")
            break;

        std::string sd = "sock1:";
        sd += str;
        sock1.safe_send(sd.data(), (int)sd.length() + 1);

        sd = "sock2:";
        sd += str;
        sock2.safe_send(sd.data(), (int)sd.length() + 1);
    }

    nio.stop();
    nio.release();
}

我还是稍微负责一点,既然发现了编译bug,还是跟踪一下吧。

如果 recv_awaitable::await_ready()是inline时,流程态remuse切换到 协程态 时,会经过以下流程
00007FF723AF6000 mov r11,rsp
00007FF723AF6003 mov qword ptr [r11+10h],rbx
00007FF723AF6007 mov qword ptr [r11+18h],rsi
00007FF723AF600B mov qword ptr [r11+20h],rdi
00007FF723AF600F mov qword ptr [r11+8],rcx
00007FF723AF6013 push r12
00007FF723AF6015 push r14
00007FF723AF6017 push r15
00007FF723AF6019 sub rsp,90h
00007FF723AF6020 mov rax,qword ptr [__security_cookie (07FF723AFA008h)]
00007FF723AF6027 xor rax,rsp
00007FF723AF602A mov qword ptr [rsp+80h],rax
00007FF723AF6032 mov rdi,rcx
00007FF723AF6035 mov qword ptr [rsp+50h],rcx
00007FF723AF603A movzx eax,word ptr [rdi+2Ch]
00007FF723AF603E mov word ptr [rsp+48h],ax
00007FF723AF6043 inc ax
00007FF723AF6046 cmp ax,0Ah
00007FF723AF604A ja `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+463h (07FF723AF6463h)
00007FF723AF6050 movsx rax,ax
00007FF723AF6054 lea rdx,[__ImageBase (07FF723AF0000h)]
00007FF723AF605B mov ecx,dword ptr [rdx+rax*4+6494h]
00007FF723AF6062 add rcx,rdx
00007FF723AF6065 jmp rcx
00007FF723AF6067 jmp `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+82h (07FF723AF6082h)
00007FF723AF6069 xor r15d,r15d
00007FF723AF606C mov dword ptr [rdi+1B0h],r15d
00007FF723AF6073 mov r12d,10000h
00007FF723AF6079 jmp `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+2E0h (07FF723AF62E0h)
00007FF723AF607E jmp `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+82h (07FF723AF6082h)
00007FF723AF6080 jmp `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+82h (07FF723AF6082h)
}
}, aqx::netio::listener("0.0.0.0:55554", 100, 100));
00007FF723AF6082 cmp word ptr [rdi+0Ah],0
00007FF723AF6087 je `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+464h (07FF723AF6464h)
00007FF723AF608D mov edx,1B4h
00007FF723AF6092 mov rcx,rdi
00007FF723AF6095 call operator delete (07FF723AF5504h)
00007FF723AF609A jmp `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+464h (07FF723AF6464h)
00007FF723AF609F xor r15d,r15d
00007FF723AF60A2 mov r12d,10000h
00007FF723AF60A8 mov rdx,qword ptr [__coro_frame_ptr] ******************************************** 在这里获取了__coro_frame_ptr.__resume_address
00007FF723AF60AD jmp `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+26Bh (07FF723AF626Bh)
00007FF723AF60B2 xor r15d,r15d
00007FF723AF60B5 mov r12d,10000h
00007FF723AF60BB jmp `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+2C4h (07FF723AF62C4h)
00007FF723AF60C0 xor r15d,r15d
00007FF723AF60C3 mov r12d,10000h

---------------------------------------------------------------------------------------------------

00007FF723AF60A8 mov rdx,qword ptr [__coro_frame_ptr]
它直接拷贝了协程帧结构下 offset=0的__resume_address

00007FF723AF60AD jmp `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+26Bh (07FF723AF626Bh)
紧接着直接跳转过去,就将rdx当作recv_awaitable去进行操作

---------------------------------------------------------------------------------------------------
这个问题我敢肯定100%是编译器bug,导致这个问题的原因,一定不是简简单单的内联因素,绝对会有更深层次的编译逻辑导致此bug,但那是微软的问题。

原文地址:https://www.cnblogs.com/babypapa/p/15638498.html