完成端口之二:服务器代码

一:服务器代码:

1.ServerDlg.cpp中启动或停止服务器

CIOCPServer *iocp;
void CServerDlg::OnBnClickedOk2()
{
    iocp->StartServer(10000, AfxGetMainWnd());
}

2.   IOCPServer.h和IOCPServer.cpp

IOCPServer.h:

#pragma once
#include<Mswsock.h>
#include<windows.h>

#define MAX_BUF_SIZE 4096
#define WM_SHOW_MSG WM_USER+100

//    I/O类型
enum IO_TYPE
{
    IO_TYPE_ACCEPT,
    IO_TYPE_READ,
    IO_TYPE_WRITE,
    IO_TYPE_UNKNOWN
};

//扩展重叠的结构
class COverLappedEX
{
public:
    OVERLAPPED m_OLap;
    IO_TYPE m_IOType;     
    char m_szBuf[MAX_BUF_SIZE];

    COverLappedEX(IO_TYPE ioType)
    {
        ZeroMemory(&m_OLap, sizeof(OVERLAPPED));
        m_IOType = ioType;
        ZeroMemory(m_szBuf, MAX_BUF_SIZE);
    }

private:

};

//单句柄数据, 包含监听Socket和接收的客户端信息
class CPerSocketData
{
public:
    SOCKET m_Socket;
    SOCKET m_AccSocket;

    CPerSocketData()
    {
        m_Socket = INVALID_SOCKET;
        m_AccSocket = INVALID_SOCKET;
    }
    ~CPerSocketData()
    {}

private:

};

class CIOCPServer
{
public:
    CIOCPServer(void);
    ~CIOCPServer(void);

public:
    void SetLastErrorMsg(CString strErrorMsg);
    CString GetLastErrorMsg();
    BOOL WinSockInit();
    void AssociateWnd(CWnd *pWnd);
    BOOL StartServer(UINT uListenPort, CWnd *pWnd);
    //发送异步接收客户端连接请求
    BOOL PostAccept(CPerSocketData *pSockData);
    //发送异步接收客户端数据请求
    BOOL PostRecv(CPerSocketData *pSockData);
    BOOL PostSend(LPCTSTR lpszText, DWORD dwSizeInBytes);
    //线程池的线程函数
    static DWORD WINAPI ThreadPoolProc(LPVOID lpParam);
    //关联某一个Socket到I/O完成端口队列
    CPerSocketData *AssignSockToCompletionPort(SOCKET tSocket);
    void StopServer();

private:
    LONG m_ThreadNums;        //线程池工作线程的个数
    HWND m_pWnd;                //窗口句柄
    CString m_strErrorMsg;        
    SOCKET m_ListenSocket;        //监听Socket
    HANDLE m_hCompletionPort;    //完成端口对象句柄
    CList<CPerSocketData *, CPerSocketData *> m_ArrSocketData;
    CList<COverLappedEX *, COverLappedEX *> m_ArrOverLapEx;
};

IOCPServer.cpp:

#include "stdafx.h"
#include "IOCPServer.h"

CIOCPServer::CIOCPServer()
{
    m_ThreadNums = 0;
    //m_pWnd = NULL;
    m_ListenSocket = INVALID_SOCKET;
}

CIOCPServer::~CIOCPServer()
{
    StopServer();
}

void CIOCPServer::AssociateWnd(CWnd *pWnd)
{
    m_pWnd = *pWnd;
}

void CIOCPServer::SetLastErrorMsg(CString strErrorMsg)
{
    m_strErrorMsg = strErrorMsg;
}

CString CIOCPServer::GetLastErrorMsg()
{
    return m_strErrorMsg;
}

BOOL CIOCPServer::WinSockInit()
{
    WSADATA data = { 0 };
    if (WSAStartup(MAKEWORD(2, 2), &data))
        return FALSE;
    if (LOBYTE(data.wVersion) != 2 || HIBYTE(data.wVersion) != 2)
    {
        WSACleanup();
        return FALSE;
    }
    return TRUE;
}

CPerSocketData * CIOCPServer::AssignSockToCompletionPort(SOCKET tSocket)
{
    ASSERT(tSocket != INVALID_SOCKET);
    CPerSocketData *pSockData = new CPerSocketData();
    pSockData->m_Socket = tSocket;
    m_ArrSocketData.AddTail(pSockData);
    CreateIoCompletionPort((HANDLE)tSocket, m_hCompletionPort, (ULONG_PTR)pSockData, 0);
    return pSockData;
}

BOOL CIOCPServer::PostAccept(CPerSocketData *pSockData)
{
    DWORD dwBytesRecv = 0;
    ASSERT(pSockData != NULL);
    COverLappedEX *m_pOverLap = new COverLappedEX(IO_TYPE_ACCEPT);
    m_ArrOverLapEx.AddTail(m_pOverLap);
    pSockData->m_AccSocket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
    BOOL bRet = AcceptEx(pSockData->m_Socket, pSockData->m_AccSocket, m_pOverLap->m_szBuf, 0, sizeof(sockaddr_in) + 16, sizeof(sockaddr_in) + 16, &dwBytesRecv, &m_pOverLap->m_OLap);
    if (!bRet)
    {
        if (WSAGetLastError() != WSA_IO_PENDING)
        {
            return FALSE;
        }
    }
    return TRUE;
}

BOOL CIOCPServer::PostRecv(CPerSocketData *pSockData)
{
    ASSERT(pSockData != NULL);
    WSABUF wsaBuf = { 0 };
    COverLappedEX *m_pOverLap = new COverLappedEX(IO_TYPE_READ);
    m_ArrOverLapEx.AddTail(m_pOverLap);
    wsaBuf.buf = m_pOverLap->m_szBuf;
    wsaBuf.len = MAX_BUF_SIZE;
    DWORD dwBytesRecv = 0, dwFlags = 0;
    int iRet = WSARecv(pSockData->m_Socket, &wsaBuf, 1, &dwBytesRecv, &dwFlags, &(m_pOverLap->m_OLap), NULL);
    if (!iRet)
    {
        if (WSAGetLastError() != WSA_IO_PENDING)
        {
            return FALSE;
        }
    }
    return TRUE;
}

BOOL CIOCPServer::PostSend(LPCTSTR lpszText, DWORD dwSizeInBytes)
{
    DWORD dwSend = 0;
    CPerSocketData *pSocketData = NULL;
    COverLappedEX *pOverLapEx = NULL;

    POSITION pos = m_ArrSocketData.GetHeadPosition();
    while (pos != NULL)
    {
        pSocketData = m_ArrSocketData.GetNext(pos);
        if (pSocketData->m_Socket != m_ListenSocket)
        {
            pOverLapEx = new COverLappedEX(IO_TYPE_WRITE);
            memcpy_s(pOverLapEx->m_szBuf, MAX_BUF_SIZE, lpszText, dwSizeInBytes);
            WSABUF wsaBuf = { 0 };
            wsaBuf.buf = pOverLapEx->m_szBuf;
            wsaBuf.len = dwSizeInBytes;
            m_ArrOverLapEx.AddTail(pOverLapEx);
            WSASend(pSocketData->m_Socket, &wsaBuf, 1, &dwSend, 0, &(pOverLapEx->m_OLap), NULL);
        }
    }
    return TRUE;
}

DWORD WINAPI CIOCPServer::ThreadPoolProc(LPVOID lpParam)
{
    COverLappedEX *pOverLaps = NULL;
    CPerSocketData *pPerSockData = NULL;
    CIOCPServer *pThis = (CIOCPServer *)lpParam;
    ASSERT(pThis != NULL);

    //工作线程+1
    InterlockedIncrement(&pThis->m_ThreadNums);
    BOOL bIORet = FALSE;
    DWORD dwTrans = 0;
    
    while (TRUE)
    {
        bIORet = GetQueuedCompletionStatus(pThis->m_hCompletionPort, &dwTrans, (PULONG_PTR)&pPerSockData, (LPOVERLAPPED *)&pOverLaps, INFINITE);
        
        //客户端退出
        if (dwTrans == 0 && (pOverLaps->m_IOType == IO_TYPE_READ || pOverLaps->m_IOType == IO_TYPE_WRITE))
        {
            closesocket(pPerSockData->m_Socket);
            pThis->m_ArrSocketData.RemoveAt(pThis->m_ArrSocketData.Find(pPerSockData));
            delete pPerSockData;
            pThis->m_ArrOverLapEx.RemoveAt(pThis->m_ArrOverLapEx.Find(pOverLaps));
            delete pOverLaps;
            continue;
        }
        if (bIORet&&pOverLaps&&pPerSockData)
        {
            switch (pOverLaps->m_IOType)
            {
            case IO_TYPE_READ:
            {
                //pThis->m_pWnd->SendMessage(WM_SHOW_MSG, 0, (LPARAM)pOverLaps->m_szBuf);
                //CString* msg = new CString(pOverLaps->m_szBuf);
                CString str((char*)pOverLaps->m_szBuf);
                ::SendMessage(pThis->m_pWnd, WM_SHOW_MSG, (WPARAM)pOverLaps->m_szBuf, (LPARAM)pOverLaps->m_szBuf);
                pThis->PostRecv(pPerSockData);
                pThis->m_ArrOverLapEx.RemoveAt(pThis->m_ArrOverLapEx.Find(pOverLaps));
                delete pOverLaps;
                break;
            }
            case IO_TYPE_WRITE:
            {
                pThis->m_ArrOverLapEx.RemoveAt(pThis->m_ArrOverLapEx.Find(pOverLaps));
                delete pOverLaps;
                break;
            }
            case IO_TYPE_ACCEPT:
            {
                if (pPerSockData->m_AccSocket == INVALID_SOCKET)
                    continue;
                //新接收的客户端与完成端口关联到一起
                CPerSocketData *pData = pThis->AssignSockToCompletionPort(pPerSockData->m_AccSocket);
                //新来一个客户端,激活一个工作线程,最多激活与电脑CPU个数一样多
                QueueUserWorkItem(ThreadPoolProc, pThis, WT_EXECUTELONGFUNCTION);
                pThis->PostRecv(pData);
                pThis->PostAccept(pPerSockData);
                pThis->m_ArrOverLapEx.RemoveAt(pThis->m_ArrOverLapEx.Find(pOverLaps));
                delete pOverLaps;
                break;
            }
            default:
                break;
            }
        }
        else if (!pOverLaps&&!pPerSockData)
        {
            //exit the thread   服务器要关闭
            break;
        }

    }
    InterlockedDecrement(&pThis->m_ThreadNums);
    return TRUE;
}

BOOL CIOCPServer::StartServer(UINT uListenPort, CWnd *pWnd)
{
    m_pWnd = *pWnd;
    if (!WinSockInit())
    {
        SetLastErrorMsg(TEXT("Socket库初始化失败!"));
        return FALSE;
    }

    m_ListenSocket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
    if (m_ListenSocket == INVALID_SOCKET)
    {
        SetLastErrorMsg(TEXT("新建Socket失败!"));
        return FALSE;
    }

    sockaddr_in service;
    service.sin_family = AF_INET;
    service.sin_addr.s_addr = INADDR_ANY;
    service.sin_port = htons(uListenPort);

    if (bind(m_ListenSocket, (sockaddr *)&service, sizeof(sockaddr_in)) == SOCKET_ERROR)
    {
        SetLastErrorMsg(TEXT("绑定端口失败!"));
        goto _Error_End;
    }

    if (listen(m_ListenSocket, SOMAXCONN) == SOCKET_ERROR)
    {
        SetLastErrorMsg(TEXT("监听失败!"));
        goto _Error_End;
    }

    m_hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
    if (m_hCompletionPort == NULL)
    {
        SetLastErrorMsg(TEXT("完成端口创建失败!"));
        goto _Error_End;
    }

    QueueUserWorkItem(ThreadPoolProc, this, WT_EXECUTELONGFUNCTION);
    CPerSocketData *pSockData = AssignSockToCompletionPort(m_ListenSocket);
    
    PostAccept(pSockData);
    goto _Error_End;

_Error_End:
    /*if (m_ListenSocket != NULL)
        closesocket(m_ListenSocket);
    if (m_hCompletionPort != NULL)
        CloseHandle(m_hCompletionPort);

    WSACleanup();*/
    return TRUE;
}

void CIOCPServer::StopServer()
{
    CPerSocketData *pSocketData = NULL;
    POSITION pos = m_ArrSocketData.GetHeadPosition();

    while (pos != NULL)
    {
        pSocketData = m_ArrSocketData.GetNext(pos);
        closesocket(pSocketData->m_Socket);
        delete pSocketData;
    }
    m_ArrSocketData.RemoveAll();

    COverLappedEX *pOverLap = NULL;
    pos = m_ArrOverLapEx.GetHeadPosition();

    while (pos != NULL)
    {
        pOverLap = m_ArrOverLapEx.GetNext(pos);
        delete pOverLap;
    }
    m_ArrSocketData.RemoveAll();

    while (m_ThreadNums>0)
    {
        PostQueuedCompletionStatus(m_hCompletionPort, 0, 0, NULL);
        Sleep(100);
    }

    if (m_hCompletionPort != NULL)
        CloseHandle(m_hCompletionPort);

    WSACleanup();
}

二   运行结果:

111
原文地址:https://www.cnblogs.com/zwj-199306231519/p/13946630.html