epoll 惊群处理

  1 #include <sys/types.h>
  2 #include <sys/socket.h>
  3 #include <sys/epoll.h>
  4 #include <netdb.h>
  5 #include <string.h>
  6 #include <stdio.h>
  7 #include <unistd.h>
  8 #include <fcntl.h>
  9 #include <stdlib.h>
 10 #include <errno.h>
 11 #include <sys/wait.h>
 12 #include <unistd.h>
 13 #include <semaphore.h>
 14 #include <sys/shm.h>
 15 #define IP   "127.0.0.1"
 16 #define PORT  8888
 17 #define PROCESS_NUM 4
 18 #define MAXEVENTS 64
 19 
 20 static int
 21 create_and_bind ()
 22 {
 23     int fd = socket (PF_INET, SOCK_STREAM, 0);
 24     struct sockaddr_in serveraddr;
 25     serveraddr.sin_family = AF_INET;
 26     inet_pton (AF_INET, IP, &serveraddr.sin_addr);
 27     serveraddr.sin_port = htons (PORT);
 28     bind (fd, (struct sockaddr *) &serveraddr, sizeof (serveraddr));
 29     return fd;
 30 }
 31 
 32 static int
 33 make_socket_non_blocking (int sfd)
 34 {
 35     int flags, s;
 36     flags = fcntl (sfd, F_GETFL, 0);
 37     if (flags == -1)
 38     {
 39         perror ("fcntl");
 40         return -1;
 41     }
 42     flags |= O_NONBLOCK;
 43     s = fcntl (sfd, F_SETFL, flags);
 44     if (s == -1)
 45     {
 46         perror ("fcntl");
 47         return -1;
 48     }
 49     return 0;
 50 }
 51 
 52 void
 53 worker (int sfd, int efd, struct epoll_event *events, int k, sem_t * sem)
 54 {
 55     /* The event loop */
 56     struct epoll_event event;
 57     // struct epoll_event *events;
 58     efd = epoll_create (MAXEVENTS);
 59     if (efd == -1)
 60     {
 61         perror ("epoll_create");
 62         abort ();
 63     }
 64     int epoll_lock = 0;
 65     while (1)
 66     {
 67         int n, i;
 68         int s;
 69         event.data.fd = sfd;
 70         event.events = EPOLLIN;
 71         if (0 == sem_trywait (sem))
 72         {
 73             //拿到锁的进程将listen 描述符加入epoll
 74             if (!epoll_lock)
 75             {
 76                 fprintf (stderr, "%d  >>>get lock
", k);
 77                 s = epoll_ctl (efd, EPOLL_CTL_ADD, sfd, &event);
 78                 if (s == -1)
 79                 {
 80                     perror ("epoll_ctl");
 81                     abort ();
 82                 }
 83                 epoll_lock = 1;
 84             }
 85         }
 86         else
 87         {
 88             fprintf (stderr, "%d not lock
", k);
 89             //没有拿到锁的进程 将lisfd 从epoll 中去掉
 90             if (epoll_lock)
 91             {
 92                 fprintf (stderr, "worker  %d return from epoll_wait!
", k);
 93                 if (-1 == epoll_ctl (efd, EPOLL_CTL_DEL, sfd, &event))
 94                 {
 95                     if (errno == ENOENT)
 96                     {
 97                         fprintf (stderr, "EPOLL_CTL_DEL
");
 98                     }
 99                 }
100                 epoll_lock = 0;
101             }
102         }
103         //epoll_ctl (efd, EPOLL_CTL_ADD, sfd, &event);
104         // fprintf(stderr, "ok
");
105         //不能设置为-1  为了能让拿不到锁的进程再次拿到锁
106         n = epoll_wait (efd, events, MAXEVENTS, 300);
107         for (i = 0; i < n; i++)
108         {
109             if (sfd == events[i].data.fd)
110             {
111                 /* We have a notification on the listening socket, which means one or more incoming connections. */
112                 struct sockaddr in_addr;
113                 socklen_t in_len;
114                 int infd;
115                 char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
116                 in_len = sizeof in_addr;
117                 while ((infd = accept (sfd, &in_addr, &in_len)) > 0)
118                 {
119                     fprintf(stderr, "get one
");
120                     close (infd);
121                 }
122             }
123         }
124         if (epoll_lock)
125         {
126             //这里将锁释放
127             sem_post (sem);
128             epoll_lock = 0;
129             epoll_ctl (efd, EPOLL_CTL_DEL, sfd, &event);
130         }
131     }
132 }
133 
134 int
135 main (int argc, char *argv[])
136 {
137     int shmid;
138     sem_t *acctl;
139     //建立共享内存
140     shmid = shmget (IPC_PRIVATE, sizeof (sem_t), 0600);
141     acctl = (sem_t *) shmat (shmid, 0, 0600);
142     //进程间信号量初始化   要用到上面的共享内存
143     sem_init (acctl, 1, 1);
144     int sfd, s;
145     int efd;
146     // struct epoll_event event;
147     // struct epoll_event *events;
148     sfd = create_and_bind ();
149     if (sfd == -1)
150     {
151         abort ();
152     }
153     s = make_socket_non_blocking (sfd);
154     if (s == -1)
155     {
156         abort ();
157     }
158     s = listen (sfd, SOMAXCONN);
159     if (s == -1)
160     {
161         perror ("listen");
162         abort ();
163     }
164     efd = 0;
165     int k;
166     for (k = 0; k < PROCESS_NUM; k++)
167     {
168         printf ("Create worker %d
", k + 1);
169         int pid = fork ();
170         if (pid == 0)
171         {
172             struct epoll_event *events;
173             events = calloc (MAXEVENTS, sizeof (struct epoll_event));
174             worker (sfd, efd, events, k, acctl);
175             break;
176         }
177     }
178     int status;
179     wait (&status);
180     close (sfd);
181     return EXIT_SUCCESS;
182 }
183 /*
184  * 这里处理惊群 用到了进程的锁(信号量, 共享内存), 根据试验的结果多个进程时accept接收客户端连接的效率并没有提高太多
185  * 但是处理其他可读可写(非监听描述符)时, 要比单个进程要快很多。
186 */
187 
188  
View Code

  在早期的kernel中, 多线程或多进程调用accept就会出现如下情况, 当前多个进程阻塞在accept中, 此时有客户端连接时, 内核就会通知阻塞在accept的所有进程, 这时就会造成惊群现象, 也就是所有accept都会返回 但是只有一个能拿到有效的文件描述符, 其他进程最后都会返回无效描述符。但在linux kernel 版本2.6 以上时, accept惊群的问题已经解决, 大致方案就是选一个阻塞在accept的进程返回。

  但是在IO复用中, select/poll/epoll 还是存在这种现象,其原因就是这些阻塞函数造成了以上同样的问题。这里就给出了类似Nginx的解决方案, 给监听描述符竞争枷锁, 保证只有一个进程处理监听描述符。 这里还可以控制锁的频率,如果一个进程接受连接的数量达到一定数量就不再申请锁。  这里要注意的是epoll_create的位置要在fork之后的子进程中, 这是因为若在父进程中create 那么fork之后子进程保留这个描述符副本,epoll_create其实是内核中建立的文件系统 保存在内核中, 那么其子进程都会共用这个文件系统, 那么多任务的epoll_ctl就会出现问题。子进程中建立各自的epoll fd 就可以避免这种情况。

原文地址:https://www.cnblogs.com/MaAce/p/7755749.html