LINUX网络编程 IO 复用

参考《linux高性能服务器编程》

LINUX下处理多个连接时候,仅仅使用多线程和原始socket函数,效率十分低下

于是就出现了selelct poll  epoll等IO复用函数。

这里讨论性能最优的epoll IO复用

用户将需要关注的socket连接使用IO复用函数放进一个事件表中,每当事件表中有一个或者多个SOCKET连接出现读写请求时候,则进行处理

事件表使用一个额外的文件描述符来标识。文件描述符使用 epoll_create函数创建

#inlclude <sys/epoll.h>

int epoll_create(int size); size参数目前不起作用

操作事件表使用epoll_ctl函数进行控制

操作类型有以下3中 

EPOLL_CTL_ADD  EPOLL_CTL_MOD  EPOLL_CTL_DEL

在一段超时时间内等待事件表上的事件 使用epoll_wait();

函数范例如下:

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>

#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 10

int setnonblocking( int fd )
{
    int old_option = fcntl( fd, F_GETFL );
    int new_option = old_option | O_NONBLOCK;
    fcntl( fd, F_SETFL, new_option );
    return old_option;
}

void addfd( int epollfd, int fd, bool enable_et )
{
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN;
    if( enable_et )
    {
        event.events |= EPOLLET;
    }
    epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
    setnonblocking( fd );
}


void et( epoll_event* events, int number, int epollfd, int listenfd )
{
    char buf[ BUFFER_SIZE ];
    for ( int i = 0; i < number; i++ )
    {
        int sockfd = events[i].data.fd;
        if ( sockfd == listenfd )
        {
            struct sockaddr_in client_address;
            socklen_t client_addrlength = sizeof( client_address );
            int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
            addfd( epollfd, connfd, true );
        }
        else if ( events[i].events & EPOLLIN )
        {
            printf( "event trigger once
" );
            while( 1 )
            {
                memset( buf, '', BUFFER_SIZE );
                int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
                if( ret < 0 )
                {
                    if( ( errno == EAGAIN ) || ( errno == EWOULDBLOCK ) )
                    {
                        printf( "read later
" );
                        break;
                    }
                    close( sockfd );
                    break;
                }
                else if( ret == 0 )
                {
                    close( sockfd );
                }
                else
                {
                    printf( "get %d bytes of content: %s
", ret, buf );
                }
            }
        }
        else
        {
            printf( "something else happened 
" );
        }
    }
}

int main( int argc, char* argv[] )
{
    if( argc <= 2 )
    {
        printf( "usage: %s ip_address port_number
", basename( argv[0] ) );
        return 1;
    }
    const char* ip = argv[1];
    int port = atoi( argv[2] );

    int ret = 0;
    struct sockaddr_in address;
    bzero( &address, sizeof( address ) );
    address.sin_family = AF_INET;
    inet_pton( AF_INET, ip, &address.sin_addr );
    address.sin_port = htons( port );

    int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
    assert( listenfd >= 0 );

    ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
    assert( ret != -1 );

    ret = listen( listenfd, 5 );
    assert( ret != -1 );

    epoll_event events[ MAX_EVENT_NUMBER ];
    int epollfd = epoll_create( 5 );
    assert( epollfd != -1 );
    addfd( epollfd, listenfd, true );

    while( 1 )
    {
        int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
        if ( ret < 0 )
        {
            printf( "epoll failure
" );
            break;
        }
    
        et( events, ret, epollfd, listenfd );
    }

    close( listenfd );
    return 0;
}

  

在尝试一个多线程代码;

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>

#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 10

struct fds
{
   int epollfd;
   int sockfd;
};

void* worker( void* arg )
{
    int sockfd = ( (fds*)arg )->sockfd;
    int epollfd = ( (fds*)arg )->epollfd;
    printf( "start new thread to receive data on fd: %d
", sockfd );
    char buf[ BUFFER_SIZE ];
    memset( buf, '', BUFFER_SIZE );
    while( 1 )
    {
        int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
        if( ret == 0 )
        {
            close( sockfd );
            printf( "foreiner closed the connection
" );
            break;
        }
        else if( ret < 0 )
        {
            if( errno == EAGAIN )
            {
               // reset_oneshot( epollfd, sockfd );
                printf( "read later
" );
                break;
            }
        }
        else
        {
            printf( "get content: %s
", buf );
            sleep( 5 );
        }
    }
    printf( "end thread receiving data on fd: %d
", sockfd );
}

int setnonblocking(int fd)
{
	int old_option = fcntl(fd,F_GETFL);
	int new_option = old_option | O_NONBLOCK;
	fcntl(fd,F_SETFL,new_option);
	return old_option;
}

void addfd(int epollfd,int fd,bool enable_et)
{
	epoll_event event;
	event.data.fd = fd;
	event.events = EPOLLIN;
	
	if(enable_et){
		event.events |= EPOLLET;
	}
	epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&event);
	setnonblocking(fd);
}

void et(epoll_event* events,int number,int epollfd,int listenfd){
	char buf[BUFFER_SIZE];
	for(int i = 0; i < number;i++){
		int sockfd = events[i].data.fd;
		if(sockfd == listenfd){
			struct sockaddr_in client_address;
			socklen_t client_addrlength = sizeof(client_address);
			int connfd = accept(listenfd,(struct sockaddr*)&client_address,&client_addrlength);
			addfd(epollfd,connfd,true);
		}else if(events[i].events & EPOLLIN){
			/*
			printf("event trigger once
");
			while(1){
				memset(buf,'',BUFFER_SIZE);
				int ret = recv(sockfd,buf,BUFFER_SIZE-1,0);
				if(ret < 0){
					if((errno == EAGAIN) || (errno == EWOULDBLOCK)){
						printf("read later
");
						break;
					}
					close(sockfd);
					break;
				}else if(ret == 0){
					close(sockfd);
				}else{
					printf("get %d bytes of content: %s 
",ret,buf);
				}
			}*/
			 pthread_t thread;
                fds fds_for_new_worker;
                fds_for_new_worker.epollfd = epollfd;
                fds_for_new_worker.sockfd = sockfd;
                pthread_create( &thread, NULL, worker, ( void* )&fds_for_new_worker );
		}else{
			printf("something else happened 
");
		}
	}
}

int main()
{
	int ret = 0;
	struct sockaddr_in address;
	bzero(&address,sizeof(address));
	address.sin_family = AF_INET;
	inet_pton(AF_INET,"127.0.0.1",&address.sin_addr);
	address.sin_port = htons(1134);
	
	int listenfd = socket(AF_INET,SOCK_STREAM,0);
	assert(listenfd >= 0);
	
	ret = bind(listenfd,(struct sockaddr*)&address,sizeof(address));
	assert(ret != -1);
	
	ret = listen(listenfd,5);
	assert(ret != -1);
	
	epoll_event events[MAX_EVENT_NUMBER];
	int epollfd = epoll_create(5);
	assert(epollfd != -1);
	addfd(epollfd,listenfd,true);
	
	while(1){
		int ret = epoll_wait(epollfd,events,MAX_EVENT_NUMBER,-1);
		if(ret < 0){
			printf("epoll failure
");
			break;
		}
		et(events,ret,epollfd,listenfd);
	}
	
	close(listenfd);
	return 0;
}

  

多线程版本中 如果我们 连续多次输入会出现一个问题

如果我们连续多次在telnet的客户端输入

服务端会开启多个线程 接受这个socket的输入

显示如下:

fd: 5, sockfdget content: dfdsf
...
start new thread to receive data on fd: 5
fd: 5, sockfdget content: sd
...
start new thread to receive data on fd: 5
fd: 5, sockfdget content: sd
...
start new thread to receive data on fd: 5
fd: 5, sockfdget content: sd
...
start new thread to receive data on fd: 5
fd: 5, sockfdget content: sd
...
start new thread to receive data on fd: 5
fd: 5, sockfdget content: sd
...
start new thread to receive data on fd: 5
fd: 5, sockfdget content: sd

如果多个线程同时操作一个socket 可能出现各种意外情况

那么我们就需要设置socket的EPOLLONESHOT标志

对于连续操作,操作系统最多触发该socket上一次读写异常事件。

代码如下:

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>

#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 10

struct fds
{
   int epollfd;
   int sockfd;
};

void reset_oneshot( int epollfd, int fd )
{
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
    epoll_ctl( epollfd, EPOLL_CTL_MOD, fd, &event );
}


void* worker( void* arg )
{
    int sockfd = ( (fds*)arg )->sockfd;
    int epollfd = ( (fds*)arg )->epollfd;
    printf( "start new thread to receive data on fd: %d
", sockfd );
    char buf[ BUFFER_SIZE ];
    memset( buf, '', BUFFER_SIZE );
    while( 1 )
    {
        int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
        if( ret == 0 )
        {
            close( sockfd );
            printf( "foreiner closed the connection
" );
            break;
        }
        else if( ret < 0 )
        {
            if( errno == EAGAIN )
            {
                reset_oneshot( epollfd, sockfd );
                printf( "read later
" );
                break;
            }
        }
        else
        {
            printf( "get content: %s
", buf );
            sleep( 5 );
        }
    }
    printf( "end thread receiving data on fd: %d
", sockfd );
}

int setnonblocking(int fd)
{
	int old_option = fcntl(fd,F_GETFL);
	int new_option = old_option | O_NONBLOCK;
	fcntl(fd,F_SETFL,new_option);
	return old_option;
}

void addfd(int epollfd,int fd,bool setOneShot)
{
	epoll_event event;
	event.data.fd = fd;
	event.events = EPOLLIN;
	

	event.events |= EPOLLET;
	if(setOneShot)
		event.events |= EPOLLONESHOT;
	epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&event);
	setnonblocking(fd);
}



void et(epoll_event* events,int number,int epollfd,int listenfd){
	char buf[BUFFER_SIZE];
	for(int i = 0; i < number;i++){
		int sockfd = events[i].data.fd;
		if(sockfd == listenfd){
			struct sockaddr_in client_address;
			socklen_t client_addrlength = sizeof(client_address);
			int connfd = accept(listenfd,(struct sockaddr*)&client_address,&client_addrlength);
			addfd(epollfd,connfd,true);
		}else if(events[i].events & EPOLLIN){
			/*
			printf("event trigger once
");
			while(1){
				memset(buf,'',BUFFER_SIZE);
				int ret = recv(sockfd,buf,BUFFER_SIZE-1,0);
				if(ret < 0){
					if((errno == EAGAIN) || (errno == EWOULDBLOCK)){
						printf("read later
");
						break;
					}
					close(sockfd);
					break;
				}else if(ret == 0){
					close(sockfd);
				}else{
					printf("get %d bytes of content: %s 
",ret,buf);
				}
			}*/
			 pthread_t thread;
                fds fds_for_new_worker;
                fds_for_new_worker.epollfd = epollfd;
                fds_for_new_worker.sockfd = sockfd;
                pthread_create( &thread, NULL, worker, ( void* )&fds_for_new_worker );
		}else{
			printf("something else happened 
");
		}
	}
}

int main()
{
	int ret = 0;
	struct sockaddr_in address;
	bzero(&address,sizeof(address));
	address.sin_family = AF_INET;
	inet_pton(AF_INET,"127.0.0.1",&address.sin_addr);
	address.sin_port = htons(1134);
	
	int listenfd = socket(AF_INET,SOCK_STREAM,0);
	assert(listenfd >= 0);
	
	ret = bind(listenfd,(struct sockaddr*)&address,sizeof(address));
	assert(ret != -1);
	
	ret = listen(listenfd,5);
	assert(ret != -1);
	
	epoll_event events[MAX_EVENT_NUMBER];
	int epollfd = epoll_create(5);
	assert(epollfd != -1);
	addfd(epollfd,listenfd,false);
	
	while(1){
		int ret = epoll_wait(epollfd,events,MAX_EVENT_NUMBER,-1);
		if(ret < 0){
			printf("epoll failure
");
			break;
		}
		et(events,ret,epollfd,listenfd);
	}
	
	close(listenfd);
	return 0;
}

  

输入

Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
1
2
3
4
5
sdfufghbskdjhkbaskjhbkjlsadvfblkajwsvdfbljkashdbkfjhasdfhasjkhD

输出

start new thread to receive data on fd: 5
get content: 1

get content: 2
3
4

get content: 5
sdfufg
get content: hbskdjhkb
get content: askjhbkjl
get content: sadvfblka
get content: jwsvdfblj
get content: kashdbkfj
get content: hasdfhasj
get content: khD
hasj
read later
end thread receiving data on fd: 5

作 者: itdef
欢迎转帖 请保持文本完整并注明出处
技术博客 http://www.cnblogs.com/itdef/
B站算法视频题解
https://space.bilibili.com/18508846
qq 151435887
gitee https://gitee.com/def/
欢迎c c++ 算法爱好者 windows驱动爱好者 服务器程序员沟通交流
如果觉得不错,欢迎点赞,你的鼓励就是我的动力
阿里打赏 微信打赏
原文地址:https://www.cnblogs.com/itdef/p/6444742.html