epoll

1. epoll基础

epoll - I/O event notification facility

epoll是一种I/O事件通知机制,这句话基本上包含了所有需要理解的要点:

I/O事件

基于file descriptor,支持file, socket, pipe等各种I/O方式。

当文件描述符关联的内核读缓冲区可读,则触发可读事件,什么是可读呢?就是内核缓冲区非空,有数据可以读取。

当文件描述符关联的内核写缓冲区可写,则触发可写事件,什么是可写呢?就是内核缓冲区不满,有空闲空间可以写入。

通知机制

通知机制,就是当事件发生的时候,去通知他。通知机制的反面,就是轮询机制

epoll是一种当文件描述符的内核缓冲区非空的时候,发出可读信号进行通知,当写缓冲区不满的时候,发出可写信号通知的机制。

注:epoll不能操作普通文件:epoll仅对通常在读取/写入时表现出阻塞行为的文件描述符(如管道和套接字)有意义。

普通文件描述符总是会立即或多或少地立即返回结果或文件结束,epoll_wait()会一直上报直到文件结束(读)或一直写,因此epoll操作无意义

  • epoll_create

#include <sys/epoll.h>

int epoll_create(int size);

size不用,0即可。

int epoll_create1(int flags);

  • epoll_ctl

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

op: EPOLL_CTL_ADD, EPOLL_CTL_DEL, EPOLL_CTL_MOD;

typedef union epoll_data{
    void *ptr;
    int fd;
    uint32_t u32;
    uint64_t u64;
}epoll_data_t;

typedef epoll_event {
    uint32_t events; /*Epoll events*/
    epoll_data_t data; 
};
events可以是以下几个宏的集合:
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里。
  • epoll_wait

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

int epoll_pwait(int epfd, struct epoll_event *events, int maxevents, int timeout, const sigset_t *sigmask);

timeout: -1死等;0,不等;>0,有限等待时间(ms)。

成功:事件个数,失败-1。

2. 水平触发和边缘触发

epoll除了提供select/poll那种IO事件的水平触发(Level Triggered)外,还提供了边缘触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。

水平触发只要缓冲区有数据就会一直触发,与select和poll相同。如同电路上只要高电平(1)或低电平(0)时就触发通知。

只要文件描述符关联的读内核缓冲区非空,有数据可以读取,就一直发出可读信号进行通知;当文件描述符关联的内核写缓冲区不满,有空间可以写入,就一直发出可写信号进行通知。

LT模式支持阻塞和非阻塞两种方式。epoll默认的模式是LT。

边沿触发:只有在缓冲区增加数据的那一刻才会触发。如同电路上只有电平发生变化(高电平到低电平,或者低电平到高电平)的时候才触发通知。

当文件描述符关联的读内核缓冲区由空转化为非空的时候,则发出可读信号进行通知;当文件描述符关联的内核写缓冲区由满转化为不满的时候,则发出可写信号进行通知。

水平触发和边缘触发模式区别

1)读缓冲区刚开始是空的

2)读缓冲区写入2KB数据

3)水平触发和边缘触发模式此时都会发出可读信号。收到信号通知后,读取了1kb的数据,读缓冲区还剩余1KB数据,水平触发会再次进行通知,而边缘触发不会再进行通知。

所以,边缘触发需要一次性的把缓冲区的数据读完为止,也就是一直读,直到读到EGAIN为止,EGAIN说明缓冲区已经空了,因为这一点,边缘触发需要设置文件句柄为非阻塞。

//水平触发
ret = read(fd, buf, sizeof(buf));

//边缘触发
while(true) {
    ret = read(fd, buf, sizeof(buf);
    if (ret == EAGAIN) break;
}
使用ET的例子:nginx。
使用LT的例子:redis。

为什么边沿触发必须使用非阻塞IO?

阻塞IO:当你去读一个阻塞的文件描述符时,如果在该文件描述符上没有数据可读,那么它会一直阻塞(通俗一点就是一直卡在调用函数那里),直到有数据可读。当你去写一个阻塞的文件描述符时,如果在该文件描述符上没有空间(通常是缓冲区)可写,那么它会一直阻塞,直到有空间可写。以上的读和写我们统一指在某个文件描述符进行的操作,不单单指真正的读数据,写数据,还包括接收连接accept(),发起连接connect()等操作...

非阻塞IO:当你去读写一个非阻塞的文件描述符时,不管可不可以读写,它都会立即返回,返回成功说明读写操作完成了,返回失败会设置相应errno状态码,根据这个errno可以进一步执行其他处理。它不会像阻塞IO那样,卡在那里不动!!!

通常来说,et方式是比较危险的方式,如果要使用et方式,那么,应用程序应该

1、将socket设置为non-blocking方式

2、epoll_wait收到event后,read或write需要读到没有数据为止,write需要写到没有数据为止(对于non-blocking socket来说,EAGAIN通常是无数据可读,无数据可写的返回状态);

测试代码(参考:浅析epoll的水平触发和边缘触发,以及边缘触发为什么要使用非阻塞IO):

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/epoll.h>

#define MAX_BUFFER_SIZE        5
#define MAX_EPOLL_EVENTS    20    
#define EPOLL_LT    0
#define EPOLL_ET    1
#define FD_BLOCK    0
#define FD_NONBLOCK    1

int set_nonblock(int fd){
    int old_flags = fcntl(fd, F_GETFL);
    fcntl(fd, F_SETFL, old_flags | O_NONBLOCK);
    return old_flags;
}

// 注册文件描述符到epoll,并设置其事件为EPOLLIN
void addfd_to_epoll(int epfd, int fd, int epoll_type, int block_type){
    struct epoll_event ev;
    ev.data.fd = fd;
    ev.events = EPOLLIN;

    if (epoll_type == EPOLL_ET){
        ev.events |= EPOLLET;
    }

    if (block_type == FD_NONBLOCK){
        set_nonblock(fd);
    }

    epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
}

void epoll_lt(int sockfd){
    char buffer[MAX_BUFFER_SIZE];
    int ret;

    printf("-------------------LT recv...
");
    memset(buffer, 0, MAX_BUFFER_SIZE);
    ret = recv(sockfd, buffer, MAX_BUFFER_SIZE-1, 0);
    printf("recv bytes: %d
", ret);
    if (ret > 0){
/*        printf("recv bytes:");
        for(i = 0; i < MAX_BUFFER_SIZE; i++){
            printf("%2x ", buffer[i]);
        }
        printf("
");
*/
        printf("recv bytes: %s
", buffer);

    } else {
        if (ret == 0){
            printf("client close
");
        }
        close(sockfd);
    }
    printf("LT deal with over
");
}

void epoll_et_loop(int sockfd){
    char buffer[MAX_BUFFER_SIZE];
    int ret ;

    printf("-------------------ET loop recv...
");
    while(1){
        memset(buffer, 0, MAX_BUFFER_SIZE);
        ret = recv(sockfd, buffer, MAX_BUFFER_SIZE-1, 0);
        printf("ET loop recv bytes: %d
", ret);
        if (ret > 0){
            printf("ET loop recv %d bytes: %s
", ret, buffer);
        } else if (ret < 0){
            if ((errno == EAGAIN) || errno == EWOULDBLOCK){
                printf("ET loop recv all data...
");
                break;
            }
            close(sockfd);
            break;
        } else { //if (ret == 0){
            printf("client close
");
            close(sockfd);
            break;
        }
    }
    printf("ET loop deal with over
");
}

void epoll_et_nonloop(int sockfd){
    char buffer[MAX_BUFFER_SIZE];
    int ret ;

    printf("--------------------ET nonloop recv...
");
    memset(buffer, 0, MAX_BUFFER_SIZE);
    ret = recv(sockfd, buffer, MAX_BUFFER_SIZE-1, 0);

    printf("ET nonloop recv bytes: %d
", ret);
    if (ret > 0){
        printf("ET loop recv %d bytes: %s
", ret, buffer);
    } else if (ret < 0){
        close(sockfd);
    } else { //if (ret == 0){
        printf("client close
");
        close(sockfd);
    }

    printf("ET nonloop deal with over
");
}

void epoll_process(int epfd, struct epoll_event *events, int number, 
        int sockfd, int epoll_type, int block_type)
{
    struct sockaddr_in client_addr;
    socklen_t client_addrlen;
    int newfd, confd;
    int i;

    for (i = 0; i <number; i++){
        newfd = events[i].data.fd;
        if (newfd == sockfd){
            printf("---------------accept()-------------
");
       // 模拟服务器繁忙,无法立即accept() printf(
"sleep 3s... "); sleep(3); printf("sleep 3s over "); client_addrlen = sizeof(client_addr); confd = accept(sockfd, (struct sockaddr *)&client_addr, &client_addrlen); printf("confd= %d ", confd); addfd_to_epoll(epfd, confd, epoll_type, block_type); printf("accept() over!!! "); } else if (events[i].events & EPOLLIN){ if (epoll_type == EPOLL_LT){ epoll_lt(newfd); } else if (epoll_type == EPOLL_ET){ epoll_et_loop(newfd); //epoll_et_nonloop(newfd); } } else { printf("other events... "); } } } void err_exit(char *msg){ perror(msg); exit(1); } int create_socket(const char *ip, const int portnumber){ struct sockaddr_in server_addr; int sockfd , reuse = 1; memset(&server_addr, 0, sizeof(server_addr)); server_addr.sin_family = AF_INET; server_addr.sin_port = htons(portnumber); if(inet_pton(PF_INET, ip, &server_addr.sin_addr)== -1){ err_exit("inet_pton() error"); } if ((sockfd = socket(PF_INET, SOCK_STREAM, 0)) == -1){ err_exit("socket() error"); } if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) == -1){ err_exit("setsockopt() error"); } if (bind(sockfd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1){ err_exit("bind() error"); } if (listen(sockfd, 5) == -1){ err_exit("listen() error"); } return sockfd; } int main(int argc, char *argv[]) { if (argc < 3){ fprintf(stderr, "usage: %s ip_address port_number ", argv[0]); exit(1); } int sockfd, epfd, number; sockfd = create_socket(argv[1], atoi(argv[2])); struct epoll_event events[MAX_EPOLL_EVENTS]; if ((epfd = epoll_create1(0)) == -1){ err_exit("epoll_create() error"); } // 以下设置针对监听的sockfd,当epoll_wait返回时,必定有事件发生 // 所以这里忽略罕见的情况外,设置阻塞IO没有意义,我们设置非阻塞IO // // 水平触发 非阻塞 addfd_to_epoll(epfd, sockfd, EPOLL_LT, FD_NONBLOCK); // 边缘触发 非阻塞 //addfd_to_epoll(epfd, sockfd, EPOLL_ET, FD_NONBLOCK); while(1){ number = epoll_wait(epfd, events, MAX_EPOLL_EVENTS, -1); if (number == -1){ err_exit("epoll_wait() error"); } else{ // 水平触发 阻塞 //epoll_process(epfd, events, number, sockfd, EPOLL_LT, FD_BLOCK); // 水平触发 非阻塞 //epoll_process(epfd, events, number, sockfd, EPOLL_LT, FD_NONBLOCK); // 边缘触发 阻塞 //epoll_process(epfd, events, number, sockfd, EPOLL_ET, FD_BLOCK); // 边缘触发 非阻塞 epoll_process(epfd, events, number, sockfd, EPOLL_ET, FD_NONBLOCK); } } close(sockfd); return 0; }

水平触发、边缘触发与阻塞和非阻塞的组合如下:

水平触发阻塞sockfd,边缘触发阻塞sockfd:针对sockfd,当epoll_wait()返回时总有事件发生,除特殊情况外,设置阻塞IO没有意义,一般设置sockfd为非阻塞IO。

水平触发非阻塞sockfd:不会丢连接,因为只要有连接未读取,就会触发epoll_wait()。

边缘触发非阻塞sockfd:会丢失连接,因为可能不会一次处理完或读取所有客户端连接。

水平触发阻塞confd:可以正确读取完数据。

水平触发非阻塞confd:可以正确读取完数据,与阻塞效果相同。

边缘触发阻塞confd:每次数据改变均触发一次epoll_wait(),若本次未读取完,下次触发时继续接着读取原来的数据,所以必须一次性读取完数据;一次读取完(循环读取)数据时会阻塞,导致程序不能继续运行(不能accept或epoll)。

边缘触发非阻塞confd:一次性读写完数据,不丢失数据,不阻塞。

结论:

1.对于监听的sockfd,最好使用水平触发模式(非阻塞),边缘触发模式会导致高并发情况下,有的客户端会连接不上。如果非要使用边缘触发,网上有的方案是用while来循环accept()。

2.对于读写的connfd,水平触发模式下,阻塞和非阻塞效果都一样,不过为了防止特殊情况,还是建议设置非阻塞。

3.对于读写的connfd,边缘触发模式下,必须使用非阻塞IO,并要一次性全部读写完数据。

经典示例

#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>

#define MAXLINE 5
#define OPEN_MAX 100
#define LISTENQ 20
#define SERV_PORT 5000
#define INFTIM 1000

void setnonblocking(int sock)
{
    int opts;
    opts=fcntl(sock,F_GETFL);
    if(opts<0)
    {
        perror("fcntl(sock,GETFL)");
        exit(1);
    }
    opts = opts|O_NONBLOCK;
    if(fcntl(sock,F_SETFL,opts)<0)
    {
        perror("fcntl(sock,SETFL,opts)");
        exit(1);
    }
}

int main(int argc, char* argv[])
{
    int i, listenfd, connfd, sockfd,epfd,nfds, portnumber;
    ssize_t n;
    char line[MAXLINE];
    socklen_t clilen;


    if ( 2 == argc )
    {
        if( (portnumber = atoi(argv[1])) < 0 )
        {
            fprintf(stderr,"Usage:%s portnumber
",argv[0]);
            return 1;
        }
    }
    else
    {
        fprintf(stderr,"Usage:%s portnumber
",argv[0]);
        return 1;
    }

    //声明epoll_event结构体的变量,ev用于注册事件,数组用于回传要处理的事件
    struct epoll_event ev,events[20];

    //生成用于处理accept的epoll专用的文件描述符
    epfd=epoll_create(1);
    struct sockaddr_in clientaddr;
    struct sockaddr_in serveraddr;
    listenfd = socket(AF_INET, SOCK_STREAM, 0);

    //把socket设置为非阻塞方式
    setnonblocking(listenfd);

    //设置与要处理的事件相关的文件描述符
    ev.data.fd=listenfd;
    //设置要处理的事件类型  水平触发非阻塞
    ev.events=EPOLLIN;

    //注册epoll事件
    epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);

    bzero(&serveraddr, sizeof(serveraddr));
    serveraddr.sin_family = AF_INET;
    char *local_addr="127.0.0.1";
    inet_aton(local_addr,&(serveraddr.sin_addr));
    serveraddr.sin_port=htons(portnumber);
    bind(listenfd,(struct sockaddr *)&serveraddr, sizeof(serveraddr));
    listen(listenfd, LISTENQ);
    for ( ; ; ) {
        //等待epoll事件的发生
        nfds=epoll_wait(epfd, events, 20, 500);

        //处理所发生的所有事件
        for(i=0;i<nfds;++i)
        {
            if(events[i].data.fd==listenfd)//如果新监测到一个SOCKET用户连接到了绑定的SOCKET端口,建立新的连接。
            {
                connfd = accept(listenfd, (struct sockaddr *)&clientaddr, &clilen);
                if(connfd<0){
                    perror("connfd<0");
                    exit(1);
                }

                setnonblocking(connfd);
                char *str = inet_ntoa(clientaddr.sin_addr);
                printf("accapt a connection from %s
", str);
                //设置用于读操作的文件描述符
                ev.data.fd=connfd;
                //设置用于注测的读操作事件, 边缘触发非阻塞
                ev.events=EPOLLIN|EPOLLET;

                //注册ev
                epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev);
            }
            else if(events[i].events&EPOLLIN)//如果是已经连接的用户,并且收到数据,那么进行读入。
            {
                printf("EPOLLIN
");
                if ( (sockfd = events[i].data.fd) < 0)
                    continue;
                while(1){
                    memset(line, 0, MAXLINE);
                    n = read(sockfd, line, MAXLINE-1);
                    if (n > 0){
                        printf("read(%ld): %s
", n, line);
                    } else if( n < 0){
                        if ((errno == EAGAIN) || errno == EWOULDBLOCK){
                            printf("read over
");

                            //设置用于写操作的文件描述符
                            ev.data.fd=sockfd;
                            //设置用于注测的写操作事件
                            ev.events=EPOLLOUT|EPOLLET;
                            //修改sockfd上要处理的事件为EPOLLOUT
                            epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);
                            break;
                        }
                        //} else if (errno == ECONNRESET) {
                        close(sockfd);
                        events[i].data.fd = -1;
                        printf("readline error
");
                        break;
                    } else { //if (n == 0) {
                        close(sockfd);
                        events[i].data.fd = -1;
                        break;
                    }
                }
            } 
            else if(events[i].events&EPOLLOUT) // 如果有数据发送
            {
                sockfd = events[i].data.fd; // 应该用struct管理fd和数据line
                write(sockfd, line, n);
                //设置用于读操作的文件描述符
                ev.data.fd=sockfd;
                //设置用于注册读操作事件
                ev.events=EPOLLIN|EPOLLET;
                //修改sockfd上要处理的事件为EPOLIN
                epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);
            }
        }
    }

    return 0;
}

附:EPOLLRDHUP处理 

在对系统问题进行排查时,我发现了一个奇怪的现象:明明是对方断开请求,系统却报告一个查询失败的错误,但从用户角度来看请求的结果正常返回,没有任何问题。

对这个现象深入分析后发现,这是一个基于 epoll 的连接池实现上的问题,或者说是特性 。

首先解释一下导致这个现象的原因。

在使用 epoll 时,对端正常断开连接(调用 close()),在服务器端会触发一个 epoll 事件。在低于 2.6.17 版本的内核中,这个 epoll 事件一般是 EPOLLIN,即 0x1,代表连接可读。

连接池检测到某个连接发生 EPOLLIN 事件且没有错误后,会认为有请求到来,将连接交给上层进行处理。这样一来,上层尝试在对端已经 close() 的连接上读取请求,只能读到 EOF,会认为发生异常,报告一个错误。

因此在使用 2.6.17 之前版本内核的系统中,我们无法依赖封装 epoll 的底层连接库来实现对对端关闭连接事件的检测,只能通过上层读取数据时进行区分处理。

不过,2.6.17 版本内核中增加了 EPOLLRDHUP 事件,代表对端断开连接,关于添加这个事件的理由可以参见 “[RFC] epoll and half closed TCP connections”。

在使用 2.6.17 之后版本内核的服务器系统中,对端连接断开触发的 epoll 事件会包含 EPOLLIN | EPOLLRDHUP,即 0x2001。有了这个事件,对端断开连接的异常就可以在底层进行处理了,不用再移交到上层。

重现这个现象的方法很简单,首先 telnet 到 server,然后什么都不做直接退出,查看在不同系统中触发的事件码。

注意,在使用 2.6.17 之前版本内核的系统中,sys/epoll.h 的 EPOLL_EVENTS 枚举类型中是没有 EPOLLRDHUP 事件的,所以带 EPOLLRDHUP 的程序无法编译通过。

参考:http://www.linuxidc.com/Linux/2016-04/129819.htm

参考:

1. epoll使用详解 简书

2. 处理大并发之二 对epoll的理解,epoll客户端服务端代码

3. http://www.linuxidc.com/Linux/2016-04/129819.htm

4.浅析epoll的水平触发和边缘触发,以及边缘触发为什么要使用非阻塞IO

5.Linux Epoll介绍和程序实例

6. linux 网络编程 epoll libevent 高并发epoll示例

原文地址:https://www.cnblogs.com/embedded-linux/p/5023862.html