socket I/O模型

  IO模型分为四种,同步阻塞IO、同步非阻塞IO、IO多路复用,异步IO。采用select/ poll/ epoll都可以实现I/O多路复用模型,I/O多路复用就是指多个客户端的连接socket可以通过一个管理器来处理其读写等操作,一般配合同步非阻塞IO来使用。异步IO模型在Windows下可以通过iocp来实现。

1、select

  int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

  nfds:windows下第一个参数被忽略,linux下为监听的套接字中最大的那个加1。
  readfds:为监听读套接字集合,如果关心套接字的未决读事件则将套接字加入到这个集合中。
  writefds:为监听写套接字集合,如果关心套接字的未决写事件则将套接字加入到这个集合中。
  exceptfds:为监听异常套接字集合,如果关心套接字的未决异常事件则将套接字加入到这个集合中。
                fd_set为套接字集类型,可以将一个套接字加入到这个集合中,当select返回的时候如果该套接字上没有发生相应的事件则会将该套接字从这个集合中移除。
  timeout:超时时间,NULL为一直等待事件发生。

  添加到fd_set结构的套接字数量是有限制的,通常是FD_SETSIZE。

  windows下FD_SETSIZE在winsock2.h中定义为64,如果想要增加监听的套接字数量则可以自定义FD_SETSIZE的值(必须在包含winsock2,h之前定义),不过自定义的值也不能超过底层的最大支持(通常是1024)。 linux下FD_SETSIZE定义为1024,如果要修改其大小的话必须重新编译内核。

  以下为Windows下select模型示例代码:  

#include <stdio.h>
#include <winsock2.h>
#include <Ws2tcpip.h>
#include <windows.h>

#pragma comment(lib, "WS2_32")    // 链接到WS2_32.lib

int main()
{
    WSADATA wsaData;
    WORD sockVersion = MAKEWORD(2, 2);
    if (::WSAStartup(sockVersion, &wsaData) != 0)
    {
        int iErrno = WSAGetLastError();
        printf("WSAStartup() failed with errno: %d
", iErrno);
        return -1;
    }

    USHORT nPort = 4567;    // 此服务器监听的端口号

    SOCKET sListen = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (sListen == INVALID_SOCKET)
    {
        int iErrno = WSAGetLastError();
        printf("socket() failed: %d
", iErrno);
        goto end;
    }

    unsigned long ul = 1;
    if(ioctlsocket(sListen, FIONBIO, &ul))
    {
        int iErrno = WSAGetLastError();
        printf("ioctlsocket() for listenSocket failed: %d
", iErrno);
        goto end;
    }
    
    sockaddr_in sin;
    sin.sin_family = AF_INET;
    sin.sin_port = htons(nPort);
    sin.sin_addr.S_un.S_addr = INADDR_ANY;//绑定本机所有IP
    if (::bind(sListen, (sockaddr*)&sin, sizeof(sin)) == SOCKET_ERROR)
    {
        int iErrno = WSAGetLastError();
        printf(" Failed bind() :%d
", iErrno);
        goto end;
    }
    
    if (::listen(sListen, SOMAXCONN))
    {
        int iErrno = WSAGetLastError();
        printf(" Failed listen() :%d
", iErrno);
        goto end;
    }

    // select模型处理过程
    // 1)初始化一个套节字集合fdSocket,用于存储监听套接字和所有的连接套接字
    fd_set fdSocket;        
    FD_ZERO(&fdSocket);
    FD_SET(sListen, &fdSocket);//添加监听套节字句柄到fdSocket套接字集合
    fd_set fdRead;
    sockaddr_in addrRemote;
    int nAddrLen = sizeof(addrRemote);
    while (TRUE)
    {
        // 2)将fdSocket的一个拷贝fdRead传递给select函数的第二个参数,说明我们只关心这些套接字是否可读(监听套接字上有连接到来则监听套接字变为可读),
                //当fdRead中的套接字上有未决读事件发生时,select()返回,并将fdRead中没有未决I/O事件的套节字移除。
        fdRead = fdSocket;
        int nRet = ::select(0, &fdRead, NULL, NULL, NULL);
        if (nRet > 0)
        {
            // 3)select()返回后,检查套接字是否还在fdRead中,在的话说明该套接字上有未读事件发生或有连接到来,然后进一步处理这些未决I/O。
            for (int i = 0; i<(int)fdSocket.fd_count; i++)
            {
                if (FD_ISSET(fdSocket.fd_array[i], &fdRead))
                {
                    if (fdSocket.fd_array[i] == sListen)
                    {
                        if (fdSocket.fd_count < FD_SETSIZE)
                        {
                            memset(&addrRemote, 0, sizeof(addrRemote));
                            SOCKET sNew = ::accept(sListen, (SOCKADDR*)&addrRemote, &nAddrLen);
                            if (sNew == INVALID_SOCKET)
                            {
                                int iErrno = WSAGetLastError();
                                if (iErrno == WSAEWOULDBLOCK)
                                {
                                    continue;
                                }
                                else
                                {
                                    printf("accept() error: %d
", iErrno);
                                    for (int j = 0; j < (int)fdSocket.fd_count; j++)
                                    {
                                        ::closesocket(fdSocket.fd_array[j]);
                                    }
                                    goto end;
                                }
                            }
                            else
                            {
                                unsigned long arg = 1;
                                if (ioctlsocket(sNew, FIONBIO, &arg))
                                {
                                    int iErrno = WSAGetLastError();
                                    printf("ioctlsocket() for clientSocket failed: %d
", iErrno);
                                }

                                FD_SET(sNew, &fdSocket);
                                char ipBuf[64] = { 0 };
                                printf("accepted a connection(%s: %d)
", inet_ntop(AF_INET, (VOID*)&addrRemote.sin_addr, ipBuf, sizeof(ipBuf)), ntohs(addrRemote.sin_port));
                            }
                        }
                        else
                        {
                            printf(" Too much connections, can not accept ! 
");
                            continue;
                        }
                    }
                    else
                    {
                        char szText[4096];
                        int nRecv = ::recv(fdSocket.fd_array[i], szText, sizeof(szText), 0);
                        if (nRecv > 0)                        
                        {
                            szText[nRecv] = '';
                            printf("received data from a connection: %s 
", szText);

                            int nSend = ::send(fdSocket.fd_array[i], "hello", strlen("hello"), 0);
                            if (nSend == SOCKET_ERROR)
                            {
                                int iErrno = WSAGetLastError();
                                if (iErrno == WSAEWOULDBLOCK)
                                {
                                    //The socket is marked as nonblocking and the requested operation would block, for example send tcp buffer is full
                                    printf("send would block
");
                                    ::closesocket(fdSocket.fd_array[i]);
                                    FD_CLR(fdSocket.fd_array[i], &fdSocket);
                                }
                                else
                                {
                                    printf("send() error: %d
", iErrno);
                                    ::closesocket(fdSocket.fd_array[i]);
                                    FD_CLR(fdSocket.fd_array[i], &fdSocket);
                                }
                            }
                        }
                        else     if(nRecv == 0)
                        {
                            ::closesocket(fdSocket.fd_array[i]);
                            FD_CLR(fdSocket.fd_array[i], &fdSocket);
                            printf("a connection has shutdown send operations
");
                        }
                        else
                        {
                            int iErrno = WSAGetLastError();
                            if (iErrno == WSAEWOULDBLOCK)
                            {
                                //The socket is marked as nonblocking and the receive operation would block, for example, tcp recv buffer is empty
                            }
                            else
                            {
                                ::closesocket(fdSocket.fd_array[i]);
                                FD_CLR(fdSocket.fd_array[i], &fdSocket);
                                printf("recv() error: %d
", iErrno);
                            }
                        }
                    }
                }
            }
        }
        else
        {
            int iErrno = WSAGetLastError();
            printf("select() error: %d
", iErrno);
            break;
        }
    }

end:;

    if (sListen != INVALID_SOCKET)
        closesocket(sListen);

    ::WSACleanup();
    printf("server end
");
    return 0;
}
View Code

    以下为Linux下select模型示例代码:

#include <stdio.h>
#include <netinet/in.h>
#include <sys/select.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <errno.h>

#define SERV_ADDRESS "192.168.152.130"
#define SERV_PORT 7000
#define LISTEN_BACKLOG 1024
#define RECV_BUF_SIZE 4096

int main()
{
    int i, maxi, listensock, maxsock, tempsock;
    int nready, clientsockary[FD_SETSIZE];
    for(i = 0; i<FD_SETSIZE; i++)
        clientsockary[i] = -1;
    ssize_t n;
    char recvbuf[RECV_BUF_SIZE];
    char ipBuf[64];
    
    listensock = socket(AF_INET, SOCK_STREAM, 0);
    if(listensock < 0)
    {
        printf("socket() error: %s
", strerror(errno));
        goto end;
    }

    int flags = fcntl(listensock, F_GETFL);
    if(flags < 0)
    {
        printf("fcntl(F_GETFL) error: %s
", strerror(errno));
        goto end;
    }
    int iRet = fcntl(listensock, F_SETFL, flags | O_NONBLOCK);
    if(iRet < 0)
    {
        printf("fcntl(F_SETFL) error: %s
", strerror(errno));
        goto end;
    }

    struct sockaddr_in servaddr;
    bzero(&servaddr, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    inet_pton(AF_INET, SERV_ADDRESS, (void*)&servaddr.sin_addr);
    servaddr.sin_port = htons(SERV_PORT);
    iRet = bind(listensock, (struct sockaddr*)&servaddr, sizeof(servaddr));
    if(iRet < 0)
    {
        printf("bind() error: %s
", strerror(errno));
        goto end;
    }

    iRet = listen(listensock, LISTEN_BACKLOG);
    if(iRet < 0)
    {
        printf("listen() error: %s
", strerror(errno));
        goto end;
    }

    maxsock = listensock;
    maxi = -1;

    fd_set rset, allset;
    FD_ZERO(&allset);
    FD_SET(listensock, &allset);
    struct sockaddr_in cliaddr;
    socklen_t clilen = sizeof(cliaddr);    
    for(;;)
    {
        rset = allset;
        nready = select(maxsock+1, &rset, NULL, NULL, NULL);//nready save ready descriptors numbers
        if(nready < 0)
        {
            if(errno == EINTR)
            {
                continue;
            }
            else
            {
                printf("select() error:%s
", strerror(errno));
                goto end;
            }                        
        }

        if(FD_ISSET(listensock, &rset))
        {
            //new client connection;
            bzero(&cliaddr, sizeof(cliaddr));
            accept_again:
            tempsock = accept(listensock, (struct sockaddr*)&cliaddr, &clilen);
            if(tempsock < 0)
            {
                if(errno == EINTR)
                {
                    goto accept_again;
                }
                else if(errno == EWOULDBLOCK || errno == ECONNABORTED || errno == EPROTO)
                {
                    //瀵规柟connect()鍚庣珛鍗冲叧闂�彲鑳藉�鑷磋繖绉嶉敊璇?                }
                else
                {
                    printf("accpet() error:%s
", strerror(errno));
                    goto end;
                }
            }
            else
            {
                for(i = 0; i< FD_SETSIZE; i++)
                {
                    if(clientsockary[i] < 0)
                    {
                        clientsockary[i] = tempsock;
                        break;
                    }
                }
                if(i != FD_SETSIZE)
                {
                    FD_SET(tempsock, &allset);
                    if(tempsock > maxsock)
                        maxsock = tempsock;
                    if(i > maxi)
                        maxi = i;//max index in clientsockary[] array
                    
                    memset(ipBuf, 0, sizeof(ipBuf));
                    printf("accept a connect from: %s: %d
", inet_ntop(AF_INET, (void*)&cliaddr.sin_addr, ipBuf, (socklen_t)sizeof(ipBuf)), ntohs(cliaddr.sin_port));
                }
                else
                {
                    close(tempsock);
                    printf("too many clients, can not accpet
");
                }

                if(--nready <= 0)//no more readable descriptors
                    continue;
            }

        }

        for(i = 0; i<= maxi; i++)
        {
            if((tempsock = clientsockary[i]) < 0)
                continue;
            if(FD_ISSET(tempsock, &rset))
            {
                recv_again:
                n = recv(tempsock, recvbuf, RECV_BUF_SIZE, MSG_DONTWAIT);
                if(n > 0)
                {    
                    //received data
                    recvbuf[n] = '';
                    printf("received text from a connection: %s
", recvbuf);
                    send_again:
                    n = send(tempsock, "hello
", strlen("hello"), MSG_DONTWAIT | MSG_NOSIGNAL);
                    if(n < 0)
                    {
                        if(errno == EINTR)
                        {
                            goto send_again;
                        }
                        else if(errno == EAGAIN || errno == EWOULDBLOCK)
                        {
                            //send() would block 
                            printf("send() would block: %s
", strerror(errno));
                            close(tempsock);
                            FD_CLR(tempsock, &allset);
                            clientsockary[i] = -1;
                        }
                        else
                        {
                            //socket error
                            printf("send error: %s
", strerror(errno));
                            close(tempsock);
                            FD_CLR(tempsock, &allset);
                            clientsockary[i] = -1;
                        }
                    }
                }
                else if(n == 0)
                {
                    //peer shutdown send operations
                    printf("a connection shutdown send operations
");
                    close(tempsock);
                    FD_CLR(tempsock, &allset);
                    clientsockary[i] = -1;
                }
                else
                {
                    if(errno == EINTR)
                    {
                        goto recv_again; 
                    }
                    else if(errno == EAGAIN || errno == EWOULDBLOCK)
                    {
                        //recv() would block
                    }
                    else
                    {
                        //socket error
                        printf("recv() error: %s
", strerror(errno));
                        close(tempsock);
                        FD_CLR(tempsock, &allset);
                        clientsockary[i] = -1;
                    }
                    
                }

                if(--nready <= 0)
                    break;//no more readable descriptors
            }
        }
    }

end:;

    if(listensock != -1)
        close(listensock);
    for(i = 0; i<= maxi; i++)
    {
        if(clientsockary[i] < 0)
            continue;
        close(clientsockary[i]);
        clientsockary[i] = -1;
    }
    printf("server end
");
    
    return 0;
}
View Code

 2、WSAAsyncSelect  

  int WSAAsyncSelect(
  SOCKET s,     //套接字句柄
  HWND hWnd, //管理的窗口, 套接字的通知消息将被发到此窗口中
  u_int wMsg,    //消息值,可以在WM_USER以上数值中任意指定一个值
  long IEvent     //指定感兴趣的事件:
           //FD_READ可以读套接字
           //FD_WRITE 可以写套接字
           //FD_ACCEPT 监听套接字有连接接入
           //FD_CONNET 如果套接字连接对方主机,连接完成后会收到这个通知
           //FD_CLOSE 检测到套接字对应的连接被关闭
  )

  WSAAsyncSelect允许应用程序以Windows消息的方式接收网络I/O事件通知,我们一般将WSAAsyncSelect关联到一个窗口上,当有网络事件发生时,WSAAsyncSelect会向关联的窗口发送设定的消息(wParam为发生事件的套接字,lParam为具体事件)。所以我们一般会给这个窗口添加这个消息的消息处理函数,在这个消息处理函数中处理网络事件。

  WSAAsyncSelect()会自动把套接字设为非阻塞模式,它适用于窗口程序且对性能要求不高的网络应用。

 3、WSAEventSelect

  DWORD WSAWaitForMultipleEvents(
  _In_ DWORD cEvents,         //下面事件对象数组中个数
  _In_ const WSAEVENT *lphEvents,  //事件对象数组
  _In_ BOOL fWaitAll,           //是否等待所有事件对象受信
  _In_ DWORD dwTimeout,       //等待时间,毫秒
  _In_ BOOL fAlertable         //一般设为FALSE
  );

   WSAEventSelect模型的基本思路是将事件对象与socket关联起来,当有网络I/O事件的时候相关联的事件对象则会受信。基本流程使用WSACreateEvent()创建事件对象,再调用WSAEventSelect函数将socket和事件对象关联起来并注册关心的网络事件(FD_READ/FD_WRITE/FD_CLOSE),调用WSAWaitForMultipleEvents函数在一个或多个事件对象上等待,当网络事件发生后调用WSAEnumNetworkEvents函数来获得具体发生了什么网络事件。

  WSAEventSelect也会自动把套接字设为非阻塞模式,这个模型的缺点是WSAWaitForMultipleEvents()最多可以等待WSA_MAXIMUM_WAIT_EVENTS(64)个事件对象。

4、epoll

 select模型有一些缺点,如监听的套接字数量最多为1024,即最大并发为1024。而select返回后要依次检查所有监听的套接字是否还在fd_set结构中,这样通过轮询的方式来获得I/O事件的发生会使程序性能随连接数目的增加而线性下降。

  epoll没有select的缺点,它没有最大并发连接数的限制;在有事件发生时,epoll通过回调把该fd放到就绪队列中,只需返回就绪队列即可,而不用遍历监听的全部fd;同时epoll使用mmap加速内核与用户空间的消息传递,进一步提高了效率;epoll函数同时也是线程安全的。

  epoll的接口非常简单,共有三个函数:epoll_create()、epoll_ctl()、epoll_wait(),当然,不使用epoll的时候还需要调用close()来关闭epoll描述符。使用epoll模型的大体流程是调用epoll_create()创建epoll描述符,调用epoll_ctl()将监听套接字加入到epoll中以监听连接到来,然后在无限循环中调用epoll_wait()来等待epoll上的套接字上事件。当有事件发生的时候epoll_wait()返回发生的事件的总数,并且将发生的事件保存在第二个参数中,然后我们就可以逐个检测事件。如果事件是属于监听套接字,这表示有新的连接到来,accept新连接后将连接套接字加入到epoll中以监听连接套接字上的事件。如果事件属于连接套接字则处理对应的读写事件。

  epoll有2种工作方式:水平触发LT和边缘触发ET。LT(level-triggered)水平触发,这是缺省的工作方式,并且同时支持block和no-block socket。在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,传统的select/poll都是这种模型的代表。ET(edge-triggered)边缘触发,这是是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些I/O操作导致那个文件描述符不再为就绪状态了。如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知,不过在TCP协议中,ET模式的加速效用仍需要更多的benchmark确认。对于LT 模式,如果注册了EPOLLOUT,只要该socket可写(发送缓冲区未满),那么就会触发EPOLLOUT。对于ET模式,如果注册了EPOLLOUT, 只有当socket从不可写变为可写(发送缓冲区从满变为有可用空间)的时候才会触发EPOLLOUT。

  以下为linux上使用epoll的示例代码:

#include <sys/epoll.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <string.h>
#include <errno.h>

#define MAX_EVENTS 16
#define LISTENQ 5
#define BUFFER_SIZE 1024

int main(int argc,char **argv)
{
    //step1, create socket

    int listening_socket;

    //int socket(int domain, int type, int protocol)
    if((listening_socket = socket(AF_INET, SOCK_STREAM, 0)) < 0)
    {
        fprintf(stderr,"create new TCP socket failed:%s
",strerror(errno));
        exit(1);
    }
    fprintf(stdout,"create a new TCP socket:%d
",listening_socket);

    //step1.5, set SO_REUSEADDR & SO_REUSEPORT options
    //SO_REUSEADDR用来设置端口重用:
    //1、一般来说,一个端口释放后会等待两分钟之后才能再被使用,SO_REUSEADDR让端口释放后立即就可以被再次使用,这在服务器程序重启中很有用。
    //2、使多个监听套接字可以绑定到同一端口号上(但每个监听套接字绑定的IP地址不能相同),比如一台电脑上有多块网卡。
    //3、在UDP多播中,SO_REUSEADDR相当于SO_REUSEPORT。
    //SO_REUSEPORT用来设置地址和端口重用:使多个监听套接字可以绑定到同一个地址和端口号上。
    int value;
    value = 1;
    //int setsockopt(int sockfd, int level, int optname,const void *optval, socklen_t optlen)
    setsockopt(listening_socket, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value));
#ifdef SO_REUSEPORT
    value = 1;
    setsockopt(listening_socket, SOL_SOCKET, SO_REUSEPORT, &value, sizeof(value));
#endif

    //step2, bind

    struct sockaddr_in local_ipv4_address;
    memset(&local_ipv4_address, 0, sizeof(local_ipv4_address));

    local_ipv4_address.sin_family = AF_INET;
    local_ipv4_address.sin_addr.s_addr = inet_addr(argv[1]);
    local_ipv4_address.sin_port = htons(atoi(argv[2]));

    //int bind(int sockfd, const struct sockaddr *addr,socklen_t addrlen);
    if(bind(listening_socket, (struct sockaddr *)&local_ipv4_address, sizeof(local_ipv4_address)) < 0)
    {
        fprintf(stderr,"bind() falied:%s
",strerror(errno));
        close(listening_socket);
        exit(1);
    }
    fprintf(stdout,"bound to %s:%s
",argv[1],argv[2]);

    //step3, listen

    //int listen(int sockfd, int backlog)
    if(listen(listening_socket, LISTENQ) < 0)
    {
        fprintf(stderr,"listen() failed:%s
",strerror(errno));
        close(listening_socket);
        exit(1);
    }
    fprintf(stdout,"listening ...
");

    //step4, accept

    //int epoll_create(int size)   创建epoll文件描述符
    //size:监听的套接字的数量
    //成功返回一个非负文件描述符,失败返回-1

    int epfd;
    if((epfd = epoll_create(1024)) < 0)
    {
        fprintf(stderr,"epoll_create() failed:%s
",strerror(errno));
        close(listening_socket);
        exit(1);
    }
    fprintf(stdout,"epoll_create() successfully, epfd = %d
", epfd);

#if 0
    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) 添加/修改/删除epoll上的套接字

    epfd: epoll文件描述符
    op:   动作,EPOLL_CTL_ADD:添加
                EPOLL_CTL_MOD:修改
                EPOLL_CTL_DEL:删除
    fd:   套接字
    event:监听事件

    成功返回0,失败返回-1

    struct epoll_event 
    {
        __uint32_t   events;      /* Epoll events */           事件类型
        epoll_data_t data;        /* User data variable */     套接字相关数据
    };

    enents: EPOLLIN  套接字可读
            EPOLLOUT 套接字可写
            EPOLLPRI 套接字有紧急数据可读
            EPOLLERR 套接字发生错误
            EPOLLHUP 套接字被挂断
            EPOLLET:设置边缘触发(Edge Triggered)模式,默认为水平触发(Level Triggered)模式
        
    typedef union epoll_data 
    {
        void    *ptr;
        int      fd;    套接字
        uint32_t u32;
        uint64_t u64;
    }epoll_data_t;
    
#endif

    struct epoll_event event;
    memset(&event, 0, sizeof(event));
    event.data.fd = listening_socket;
    event.events = EPOLLIN | EPOLLOUT; 

    if(epoll_ctl(epfd, EPOLL_CTL_ADD, listening_socket, &event) < 0)
    {
        fprintf(stderr,"epoll_ctl() failed:%s
",strerror(errno));
        close(epfd);  //关闭epoll描述符
        close(listening_socket);
        exit(1);
    }

    int n;
    struct epoll_event events[MAX_EVENTS];

    for(;;)
    {
        //int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) 等待epoll内某个文件描述符的事件产生
        //epfd:     epoll文件描述符
        //events:   保存发生的事件
        //maxevents:每次epoll_wait处理的最大事件个数,发生的事件超过这个值的话事件会被下一次epoll_wait调用处理
        //timeout:  等待时间(毫秒),-1为无限期等待

        //成功返回准备好的文件描述符个数,失败返回-1,没有文件描述符准备好(时间到)返回0

        if((n = epoll_wait(epfd, events, MAX_EVENTS, 1000)) < 0)
        {
            fprintf(stderr,"epoll_wait() failed:%s
",strerror(errno));
            //how to do?
        }
        else if(n == 0)
        {
            fprintf(stdout,"Timeout...
");
        }
        else
        {
            int i;
            for(i=0;i<n;i++)
            {
                if(events[i].data.fd == listening_socket)//listening_socket可读,表示有新的连接
                {
                    fprintf(stdout,"ready to accept new incomming connection!
");

                    int new_accepted_socket;
                    struct sockaddr_in peer_ipv4_address;
                    socklen_t peer_ipv4_address_length;
                    peer_ipv4_address_length = sizeof(peer_ipv4_address);

                    //int accept(int sockfd, struct sockaddr *addr, socklen_t * addrlen);
                    if((new_accepted_socket = accept(events[i].data.fd, (struct sockaddr *)&peer_ipv4_address, &peer_ipv4_address_length)) < 0)
                    {
                        fprintf(stderr,"accept() failed:%s
",strerror(errno));
                        //how to do ?
                    }
                    else
                    {
                        fprintf(stdout,"accept a new connection #%d from %s:%d.
",new_accepted_socket, inet_ntoa(peer_ipv4_address.sin_addr), ntohs(peer_ipv4_address.sin_port));

                        event.data.fd = new_accepted_socket;
                        event.events = EPOLLIN | EPOLLOUT | EPOLLERR;
                        if(epoll_ctl(epfd, EPOLL_CTL_ADD, new_accepted_socket, &event) < 0)//将新的连接套接字加入到epoll以监听其事件
                        {
                            fprintf(stderr,"epoll_ctl() failed:%s
",strerror(errno));
                            close(new_accepted_socket);
                        }
                        else
                        {
                            fprintf(stdout,"add socket #%d to epoll watched descriptors.
", new_accepted_socket);
                        }
                    }
                }
                else
                {
                    if(events[i].events & EPOLLIN) //套接字可读
                    {
                        fprintf(stdout,"socket #%d is ready for reading.
", events[i].data.fd);

                        char buffer[BUFFER_SIZE];
                        ssize_t read_bytes;

                        //ssize_t read(int fd, void *buf, size_t count) 
                        if((read_bytes = read(events[i].data.fd, buffer, sizeof(buffer)-1)) < 0) 
                        {
                            fprintf(stderr,"read() failed on socket #%d :%s
",events[i].data.fd, strerror(errno));
                            //how to do ?
                        }
                        else if(read_bytes == 0)
                        {
                            fprintf(stdout,"on socket #%d ,connection closed by peer.
", events[i].data.fd);
                            close(events[i].data.fd);
                        }
                        else
                        {
                            buffer[read_bytes] = '';
                            fprintf(stdout,"read %d bytes on socket #%d :'%s'
",read_bytes, events[i].data.fd, buffer);
                        }
                    }

                    if(events[i].events & EPOLLOUT) //套接字可写
                    {
                        fprintf(stdout,"socket #%d is ready for writting.
", events[i].data.fd);
                        //write();     
                    }
                    
                    if(events[i].events & EPOLLERR) //套接字出错
                    {
                        fprintf(stdout,"socket #%d error!
", events[i].data.fd);
                        //how to do ?     
                    }
                }
            }
        }
    }

    close(epfd);
    close(listening_socket);
    return 0;
}
View Code

 5、Reactor和Proactor

   网络程序的整体设计有两种模式:反应器模式Reactor和前摄器模式Proactor。Reactor和Proactor的核心区别就是I/O操作分别是同步和异步的,这就导致了两种模式在设计和处理上的不同。 知乎上网友“闹钟”对于二者有一个简单诙谐的说明:  (https://www.zhihu.com/question/26943938

   reactor:能收了你跟俺说一声.

   proactor: 你给我收十个字节,收好了跟俺说一声.

   采用Reactor的一般流程是先在Reactor中注册感兴趣的就绪事件,并在注册时候指定handler(回调函数),当有数据请求的时候,在Reactor中会触发刚才注册的事件,并调用对应的handler去处理就绪的事件。Reactor的核心是I/O多路复用:select/poll/epoll,采用Reactor模式的网络库有libevent、muduo、netty。

   与Reactor不同的是,Proactor关注的不是就绪事件而是完成事件:当提交的事件异步完成后Proactor才会触发对应的回调handler。由此可见,windows下的iocp编程方法符合Proactor的设计思想,而采用Proactor设计模式的网络库有boost的asio。

   常见的Reactor模式程序结构有以下四种(以下转载和参考自陈硕的博客园博文:Muduo 多线程模型:一个 Sudoku 服务器演变):

   ①、sinle reactor

   特点:单个Reactor,并且在一个线程中处理I/O请求和计算,不太适合 CPU 密集或执行时间比较长的应用。上面的epoll示例代码就是这种结构。

    

  ②、 single reactor + thread pool

   特点: 把I/O交给reactor,计算交给thread pool,适合计算密集或执行时间比较长的应用。如果 IO 的压力比较大,一个 reactor 忙不过来,可以试试③ multiple reactors。

    

  ③、multiple reactors 

   特点:有一个 main reactor 负责 accept 连接,然后把连接挂在某个 sub reactor 中,该连接的所有操作(I/O + 计算)都在那个 sub reactor 所处的线程中完成,sub reactor的个数通常根据 CPU 核数确定。这是一个适应性很强的多线程 IO 模型,能够处理突发大量I/O和兼顾计算密集,因此是muduo的默认线程模型。

    

  ④、multiple reactors + thread pool

    特点:在③multiple reactors的基础上, sub reactor处理I/O操作,线程池来处理计算,能够适应突发I/O + 计算密集的应用。

    

6、IOCP

  个人感觉windows上的IOCP用起来有些难度,其难点在于多个线程共用一个IOCP完成事件分离器带来的同步问题和异步I/O的完成通知机制。IOCP的一个特点就是提前投递一些操作而不是像select/epoll那样等事件到来的时候再进行相关操作,比如IOCP是提前投递accept和read操作,等这些操作完成的时候会得到通知,然后再进行相应处理。IOCP常用的函数有CreateIoCompletionPort、GetQueuedCompletionStatus、AcceptEx、WSARecv、WSASend等。

  CreateIoCompletionPort()有两个功能,一个是创建完成端口对象,一个是关联套接字到完成端口上以监听套接字的完成事件,函数声明如下:

  CreateIoCompletionPort(
    __in HANDLE FileHandle,
    __in_opt HANDLE ExistingCompletionPort,
    __in ULONG_PTR CompletionKey,
    __in DWORD NumberOfConcurrentThreads
  );

  当创建完成端口对象的时候前三个参数我们可以忽略,分别传INVALID_HANDLE_VALUE, 0, 0就行,第四个参数指定在完成端口上同时运行的工作线程的数量,0为将这个参数设为当前CPU核心数量。如果是计算密集型任务(也就是一些复杂运算,逻辑处理),那么cpu使用率较高,线程数一般可以设为CPU数量 + 1,对于I/O密集型,cpu使用率较低,程序中会存在大量I/O操作占据时间,导致线程空余时间出来,通常就需要开cpu核数*2 + 1的线程数,当线程进行I/O操作cpu空暇时启用其他线程继续使用cpu,提高cpu使用率。在这里还要解释一下I/O密集型任务对CPU使用率较低的情况,当前线程进行I/O调用的时候实际上是由DMA进行大部分的I/O工作,I/O完成后DMA会通知CPU,在DMA进行I/O操作的这段时间里CPU是一直等待通知,所以实际上是空闲出来的,也就是当前线程是空闲的(阻塞等待I/O完成),所以我们可以再多开个线程来利用这里空闲下来的CPU去干别的任务,这就是对于I/O密集型要开两倍CPU的线程数的原因。

  如果是关联套接字到完成端口上则第一个参数为要关联的套接字,第二个参数为完成端口对象,第三个参数为完成键,GetQueuedCompletionStatus()函数的第三个参数又可以获得这个值,所以当投递的accept完成后我们将新的连接套接字关联到完成端口上的时候,这个参数就可以传递保存这个连接的相关信息的内存地址(这块内存又称为per-handle Data句柄唯一数据),这样当该连接上有异步I/O事件完成的时候我们可以通过GetQueuedCompletionStatus()函数获得连接的相关信息。第四个参数我们将它忽略,设为0。

  GetQueuedCompletionStatus()可以获得投递的异步I/O的结果,函数声明如下:

  GetQueuedCompletionStatus(
    __in HANDLE CompletionPort,
    __out LPDWORD lpNumberOfBytesTransferred,
    __out PULONG_PTR lpCompletionKey,
    __out LPOVERLAPPED *lpOverlapped,
    __in DWORD dwMilliseconds
  );

  第一个参数为完成端口对象,第二个参数可以获得异步I/O传输的字节数,第三个参数可以获得上面所说的完成键,即per-handle Data句柄唯一数据,通过第四个参数可以获得投递异步I/O时的per-I/O Data即I/O唯一数据,这个I/O唯一数据里包含了投递的异步I/O的相关信息。第五个参数指定等待时间。

  其它相关的异步函数为AcceptEx、WSARecv、WSASend,分别用来投递异步accept、异步recv、异步send,这几个函数包含一个OVERLAPPED对象指针的参数,通过这个参数可以传递给这些异步I/O一个上文所说的I/O唯一数据,当GetQueuedCompletionStatus()返回的时候又可以通过lpOverlapped参数获得这个I/O唯一数据。

  使用IOCP的大体流程:首先还是创建监听socket,bind绑定本机IP地址和端口号,listen设置监听模式;然后调用CreateIoCompletionPort()创建完成端口,调用CreateIoCompletionPort()将监听套接字关联到
完成端口上,在监听套节字上投递一定数量的AcceptEx;然后再创建工作线程,在工作者线程中调用GetQueuedCompletionStatus()来获得投递的异步I/O操作的完成情况,比如如果是AcceptEx完成则表示一个
连接accept成功,这时候应该调用CreateIoCompletionPort()将这个连接套接字关联到完成端口,然后再向这个连接套接字投递几个WSARecv()操作以备数据到来。如果投递的WSARecv()完成GetQueuedCompletionStatus()
同样会获得通知。创建工作线程的数量一般是多于创建完成端口时指定的线程同时运行的数量,但在完成端口上同时运行的工作线程只会是创建完成端口时指定的数量,这样做的目的是某个工作线程被blocking时,
会有备用的线程被唤醒来处理完成事件。

  IOCP模型的两个问题

  1、缓冲区锁定问题

    每次调用异步I/O函数的时候其发送或接收缓冲区都会被锁定,系统对锁定数量有一个上限,如果我们对于每个连接都投递大量的读操作,那么当达到这个上限的时候异步I/O操作会返回WSAENOBUFS错误。解决方法一个是控制连接数量,另一个办法是对于每个连接到  来后我们只投递一个0字节缓冲区的读操作,因为缓冲区大小为0,所以这时候缓冲区不会被锁定。当0字节的读操作返回的时候说明有数据到来,这时候可以直接调用非阻塞的同步读操作来读数据,直到非阻塞读操作返回WSAEWOULDBLOCK错误后说明数据已经读取  完,也可以投递一个或多个异步读操作来接收数据。

  2、数据乱序问题

    比如我们在一个连接上投递了两个读操作,当两个读操作都连续完成的时候因为线程切换问题可能我们先收到的是后一个读操作的完成通知,这就造成了接收数据的乱序。解决方法是我们给每一个读操作添加一个序号(per-I/O数据里增加一个变量来保存序号),每个  连接添加一个要读取的序号(连接的per-handle数据增加一个变量来保存),当读操作完成的时候我们先检查这个序号是否就是要读取的序号,如果是的话就直接处理数据,如果不是的话先在乱序数据容器中查找是否有要读取的数据,然后将这个读操作的相关数据(per-I/O数据)保存到乱序数据的容器中。

原文地址:https://www.cnblogs.com/milanleon/p/7057900.html