Windows socket I/O模型 之 select(2)

Windows socket I/O模型 之  select(1)中。我们仅仅是在console中简单的模拟了select的处理方法。


还有非常多特性不能改动。比方仅仅能写,不能读。

没使用线程。也没有同步不同的读写线程。


先谈谈个人眼下对select的理解。

select就是监控一组套接字的变化情况。

比方一个fd_set的变量(暂且定义为fdRead)里面有5个套接字。当你传给select后,如果仅仅有2个被触发。

那么这个fdRead也就改变了。这是为什么select须要从全局fd_set拷贝一份给select的原因。


然后我们依据套接字的变化情况做对应的处理就OK了。可是大并发量还没有測试。


今晚我改进了。以下大家见代码。

。。。

// Select_Server.cpp : Defines the entry point for the console application.
// 服务端

#include "stdafx.h"


#define STR_SERVER_IP           "127.0.0.1"
#define INT_SERVER_PORT         8001
#define INT_DATABUFFER_SIZE     256


SOCKET g_soClient;
typedef std::list<SOCKET>   LstSocket;
LstSocket                   g_lstSoClient;

SOCKET g_soServer;
fd_set g_fdSocketSet;

// for thread synchronize
CCriSec     g_criSec;

DWORD WINAPI ThreadRead(LPVOID lpvParam)
{
    int iResult = 0;
    sockaddr_in addrAccept;
    int iAcceptLen = sizeof(addrAccept);
    SOCKET soClient;

    FD_ZERO(&g_fdSocketSet);
    FD_SET(g_soServer, &g_fdSocketSet);
    fd_set fdRead, fdWrite;
    while( TRUE ) {
        // initialize
        FD_ZERO(&fdRead);
        FD_ZERO(&fdWrite);
        fdRead = g_fdSocketSet;
        fdWrite = g_fdSocketSet;

        int n1 = fdRead.fd_count;
        int n2 = fdWrite.fd_count;
        int n3 = g_fdSocketSet.fd_count;
        int iResult = select(0, &fdRead, &fdWrite, NULL, NULL);
        if( iResult == SOCKET_ERROR) {
            break;
        }

        if(FD_ISSET(g_soServer, &fdRead)) {
            soClient = accept(g_soServer, (sockaddr*)&addrAccept, &iAcceptLen);

            CCriSecLock lock(g_criSec);
            if(soClient == INVALID_SOCKET) {
                continue;
            } else {
                printf("
[%s:%d] has connected to server!
", inet_ntoa(addrAccept.sin_addr),
                    ntohs(addrAccept.sin_port));
                FD_SET(soClient, &g_fdSocketSet);
            }
        } else {
            // check read
            for(int i=0; i < (int)fdRead.fd_count; i++) {
                if ( fdRead.fd_array[i] == g_soServer ) {
                    continue;
                }

                if( FD_ISSET(fdRead.fd_array[i], &g_fdSocketSet) ) {
                    sockaddr_in name;
                    int namelen = sizeof(sockaddr_in);
                    getpeername(fdRead.fd_array[i], (sockaddr *)&name, &namelen);

                    char buf[256] = {0};
                    int len = 256;
                    int ret = recv(fdRead.fd_array[i], buf, len, 0);
                    CCriSecLock lock(g_criSec);
                    if( ret == SOCKET_ERROR ) {
                        int nErr = GetLastError();
                        if( nErr == 10054 ) {
                            // Connection reset by peer.
                            FD_CLR(fdRead.fd_array[i], &g_fdSocketSet);
                            printf("
[%s:%d] disconnect from server.
", inet_ntoa(name.sin_addr), ntohs(name.sin_port) );
                        } else {
                            printf("
fdread failed with %d
", nErr);
                        }
                    } else {
                        printf("
Recv from [%s:%d] : %s
", inet_ntoa(name.sin_addr), ntohs(name.sin_port), buf);
                    }
                }
            }
            
            // check write
            static bool b11 = false;
            for(int i=0; i < (int)fdWrite.fd_count; i++) {
                if( FD_ISSET(fdWrite.fd_array[i], &g_fdSocketSet) ) {
                    char buf[256] = "abcd";
                    int len = 256;
                    if( !b11 ) {
                        b11 = true;
                        //send(fdWrite.fd_array[i], buf, len ,0);
                    }
                }
            }
        }
    }

    return 0;
}

DWORD WINAPI ThreadWrite(LPVOID lpvParam)
{
    std::string str;
    {
        CCriSecLock lock(g_criSec);
        std::cout << "Please input message to client: ";
    }
    while( getline(std::cin, str) ) {
        if( str.compare("exit") == 0 ) {
            {
                CCriSecLock lock(g_criSec);
                printf("close write thread
");
            }
            break;
        }

        for(int i = 1; i < (int)g_fdSocketSet.fd_count; i++) {
            send(g_fdSocketSet.fd_array[i], str.data(), (int)str.size(), 0);
        }
    }
    
    return 0;
}

int _tmain(int argc, _TCHAR* argv[])
{
    WORD dwVersion = MAKEWORD(2, 2);
    WSAData wsaData;
    WSAStartup(WINSOCK_VERSION,&wsaData);

    g_soServer = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (INVALID_SOCKET == g_soServer) {
        printf("Failed to create socket!
");
        WSACleanup();
        return -1;
    }

    sockaddr_in addrServer;
    memset(&addrServer,0,sizeof(sockaddr_in));
    addrServer.sin_family = AF_INET;
    addrServer.sin_port = htons(INT_SERVER_PORT);
    addrServer.sin_addr.S_un.S_addr = htonl(INADDR_ANY);

    int iResult;

    bool bReuseAddr = true;
    iResult = setsockopt(g_soServer, SOL_SOCKET, SO_REUSEADDR, (char *)&bReuseAddr, sizeof(bReuseAddr));
    if(SOCKET_ERROR == iResult) {
        printf("Failed to set resueaddr socket!
");
        WSACleanup();
        return -1;
    }

    //设置非堵塞方式连接
    unsigned long cmd = 1;
    iResult = ioctlsocket(g_soServer, FIONBIO, &cmd);

    iResult = bind(g_soServer, (sockaddr *)&addrServer, sizeof(addrServer));
    if (SOCKET_ERROR == iResult) {
        printf("Failed to bind address!
");
        WSACleanup();
        return -1;
    }

    if (0 != listen(g_soServer, 5)) {
        printf("Failed to listen client!
");
        WSACleanup();
        return -1;
    }

    printf("Start server...
");

    HANDLE hWorkRead = CreateThread(NULL, 0, ThreadRead, NULL, 0, NULL);
    HANDLE hWorkWrite = CreateThread(NULL, 0, ThreadWrite, NULL, 0, NULL);

    ::WaitForSingleObject(hWorkRead, INFINITE);
    ::WaitForSingleObject(hWorkWrite, INFINITE);

    WSACleanup();

    return 0;
}

以下是client代码:

// Select_Client.cpp : Defines the entry point for the console application.
//

#include "stdafx.h"



#define INT_SERVER_PORT 8001
#define STR_SERVER_IP "127.0.0.1"
#define INT_DATABUFFER_SIZE 256
#define STR_EXIT "exit"
#define STR_RECV "recv"

//
SOCKET g_soClient;
fd_set g_fdSocketSet;

// for thread synchronize
CCriSec     g_criSec;

DWORD WINAPI ThreadWorker(LPVOID lpvParam)
{
    FD_ZERO(&g_fdSocketSet);
    FD_SET(g_soClient, &g_fdSocketSet);
    fd_set fdRead, fdWrite;
    
    while( TRUE ) {
        // initialize
        FD_ZERO(&fdRead);
        FD_ZERO(&fdWrite);
        fdRead = g_fdSocketSet;
        fdWrite = g_fdSocketSet;
        
        int iResult = select(0, &fdRead, &fdWrite, NULL, NULL);
        if( iResult == SOCKET_ERROR) {
            break;
        } else if( iResult == 0 ) {
            printf("Time limit expired
");
        } else {
            // check read
            if (FD_ISSET(fdRead.fd_array[0], &g_fdSocketSet)) {
                sockaddr_in name;
                int namelen = sizeof(sockaddr_in);
                getpeername(fdRead.fd_array[0], (sockaddr *)&name, &namelen);

                char buf[256] = {0};
                int len = 256;
                int ret = recv(fdRead.fd_array[0], buf, len, 0);
                CCriSecLock lock(g_criSec);
                if( ret == SOCKET_ERROR ) {
                    int nErr = GetLastError();
                    if( nErr == 10054 ) {
                        // Connection reset by peer.
                        FD_CLR(fdRead.fd_array[0], &g_fdSocketSet);
                        printf( "
[%s:%d] is closed.
", inet_ntoa(name.sin_addr), ntohs(name.sin_port) );
                    } else {
                        printf("fdread failed with %d
", nErr);
                    }
                } else {
                    CCriSecLock lock(g_criSec);
                    printf("
Recv from [%s:%d] : %s
", inet_ntoa(name.sin_addr), ntohs(name.sin_port), buf);
                }
            }

            // check write
            if (FD_ISSET(fdWrite.fd_array[0], &g_fdSocketSet)) {
                int a=2;
                int b=a;
            }
        }
    }
    
    return 0;
}

void main(void)
{
    WSAData wsaData;
    WSAStartup(WINSOCK_VERSION,&wsaData);
  
    g_soClient = socket(AF_INET,SOCK_STREAM, IPPROTO_TCP);
    if (INVALID_SOCKET == g_soClient) {
        printf("Failed to create client!
");
        WSACleanup();
    }
  
    sockaddr_in addrServer;
    addrServer.sin_addr.S_un.S_addr = inet_addr(STR_SERVER_IP);
    addrServer.sin_family = AF_INET;
    addrServer.sin_port = htons(INT_SERVER_PORT);

    int iResult;

    //设置非堵塞方式连接
    //unsigned long ul = 1;
    //iResult = ioctlsocket(g_soClient, FIONBIO, (unsigned long*)&ul);
    
    iResult = connect(g_soClient, (sockaddr *)&addrServer, sizeof(sockaddr_in));
    if (SOCKET_ERROR == iResult) {
        printf("Failed to connect server!(Error: %d)
", ::WSAGetLastError());
        WSACleanup();
        return;
    }
    
    HANDLE hWorker = CreateThread(NULL, 0, ThreadWorker, NULL, 0, NULL);

    std::string str;
    std::cout << "Please input message to server: ";
    while( getline(std::cin, str) ) {
        send(g_soClient, str.data(), str.size(), 0);
        std::cout << "Please input message to client: ";
    }

    closesocket(g_soClient);
    WSACleanup();
}

头文件

// stdafx.h : include file for standard system include files,
// or project specific include files that are used frequently, but
// are changed infrequently
//

#pragma once

#ifndef _WIN32_WINNT		// Allow use of features specific to Windows XP or later.                   
#define _WIN32_WINNT 0x0501	// Change this to the appropriate value to target other versions of Windows.
#endif						

#include <stdio.h>
#include <tchar.h>
#include <string>
#include <iostream>
#include <WINSOCK2.H>

#pragma comment(lib,"ws2_32.lib")



// TODO: reference additional headers your program requires here
#include "CriticalSection.h"


// CriticalSection.h

/*
 * Copyright: JessMA Open Source (ldcsaa@gmail.com)
 *
 * Version	: 2.3.2
 * Author	: Bruce Liang
 * Website	: http://www.jessma.org
 * Project	: https://github.com/ldcsaa
 * Blog		: http://www.cnblogs.com/ldcsaa
 * Wiki		: http://www.oschina.net/p/hp-socket
 * QQ Group	: 75375912
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
 
/****************************************************************************
*																			*
* CriticalSection.h 														*
*																			*
* Create by :																*
* Kingfisher	2003-10-15													*
* 																			*
* Description: 																*
* 封装Win32临界量对象和相互排斥量内核对象											*
****************************************************************************/

#pragma once

#include <windows.h>

class CCriSec
{
public:
	CCriSec()		{::InitializeCriticalSection(&m_crisec);}
	~CCriSec()		{::DeleteCriticalSection(&m_crisec);}

	void Lock()		{::EnterCriticalSection(&m_crisec);}
	void Unlock()	{::LeaveCriticalSection(&m_crisec);}

private:
	CCriSec(const CCriSec& cs);
	CCriSec operator = (const CCriSec& cs);

private:
	CRITICAL_SECTION    m_crisec;
};

class CCriSec2
{
public:
	CCriSec2(BOOL bInitialize = TRUE)
	{
		if(bInitialize)
		{
			m_pcrisec = new CRITICAL_SECTION;
			::InitializeCriticalSection(m_pcrisec);
		}
		else
			m_pcrisec = NULL;
	}

	~CCriSec2() {Reset();}

	void Attach(CRITICAL_SECTION* pcrisec)
	{
		Reset();
		m_pcrisec = pcrisec;
	}

	CRITICAL_SECTION* Detach()
	{
		CRITICAL_SECTION* pcrisec = m_pcrisec;
		m_pcrisec = NULL;
		return pcrisec;
	}

	void Lock()		{::EnterCriticalSection(m_pcrisec);}
	void Unlock()	{::LeaveCriticalSection(m_pcrisec);}

private:
	CCriSec2(const CCriSec2& cs);
	CCriSec2 operator = (const CCriSec2& cs);

	void Reset()
	{
		if(m_pcrisec)
		{
			::DeleteCriticalSection(m_pcrisec);
			delete m_pcrisec;
			m_pcrisec = NULL;
		}
	}

private:
	CRITICAL_SECTION*    m_pcrisec;
};


template<class CLockObj> class CLocalLock
{
public:
	CLocalLock(CLockObj& obj) : m_lock(obj) {m_lock.Lock();}
	~CLocalLock() {m_lock.Unlock();}
private:
	CLockObj& m_lock;
};

typedef CLocalLock<CCriSec>		CCriSecLock;
typedef CLocalLock<CCriSec2>	CCriSecLock2;


原文地址:https://www.cnblogs.com/lytwajue/p/6834850.html