一个epoll事件实现的高并发服务/客户端(C语言实现,服务端存储基于hashtable)

  代码路径:https://github.com/prophetss/epoll-event

  之前实现了一个简单高效的hashtable(点这里),这里结合epoll基于reactor模型实现了一个高并发服务器。大体思想是将epoll接到的每一个服务请求存储到hashtable里来管理,每一个请求都可以设置独立的回调函数。具体可以先看代码,注释已经写得很详细。代码实现了一个简单server端实例,client端我写了两个例子,一个为Linux C实现,一个是Python实现(主要为了可以在不同系统跑)。C client是fork了产生1024个进程来模仿高并发请求,在同一台机子上测试结果平均每秒读响应34万多次,参考结果如下:

Python client是创建1000个线程来分别不断发送数据,在局域网内两台机子上测试平均结果为每秒31000次左右。比上面慢的多,分析主要原因应该是网络IO瓶颈造成的,参考结果如下:

最后将client代码贴到下面充下长度..

client(C):

  1 #include <unistd.h>
  2 #include <sys/mman.h>
  3 #include <netinet/in.h>
  4 #include <semaphore.h>
  5 #include <sys/types.h>
  6 #include <sys/socket.h>
  7 #include <arpa/inet.h>
  8 #include <fcntl.h>
  9 #include <wait.h>
 10 #include <string.h>
 11 #include <netdb.h>
 12 #include <stdlib.h>
 13 #include "error.h"
 14 
 15 
 16 /*进程共享文件用于统计创建进程个数*/
 17 #define PFILE_NAME    "count"
 18 
 19 /*需要创建的进程数*/
 20 #define PROCESS_NUM        100
 21 
 22 /*每个进程请求次数*/
 23 #define REQUEST_TIMES    10000
 24 
 25 
 26 struct shared {
 27     sem_t mutex;    /*信号量用于加锁*/
 28     int count;        /*进程个数*/
 29 } shared;
 30 
 31 
 32 void request(const char *server_ip, int server_port)
 33 {
 34     struct sockaddr_in client_addr;
 35     bzero(&client_addr, sizeof(client_addr));
 36     client_addr.sin_family = AF_INET;
 37     client_addr.sin_addr.s_addr = INADDR_ANY;
 38     client_addr.sin_port = htons(0);
 39     
 40     int client_socket = socket(AF_INET, SOCK_STREAM, 0);
 41     if(client_socket < 0) exit_throw("create client socket fail");
 42     
 43     struct sockaddr_in server_addr;
 44     bzero((char *)&server_addr, sizeof(server_addr));
 45 
 46     server_addr.sin_family = AF_INET;
 47 
 48     struct hostent *server = gethostbyname(server_ip);
 49     if(!server) exit_throw("fail to get host name");
 50 
 51     bcopy((char *)server->h_addr, (char *)&server_addr.sin_addr.s_addr, server->h_length);
 52 
 53     server_addr.sin_port = htons(server_port);
 54     socklen_t server_addr_len = sizeof(server_addr);
 55 
 56     if(connect(client_socket, (struct sockaddr*) &server_addr, server_addr_len) == -1 ) {
 57         exit_throw("connent to server fail");
 58     }
 59 
 60     int pid = getpid();
 61 
 62     char content[64] = {0};
 63     sprintf(content, "%s, pid:%d
", "i am client!", pid);
 64     for (int i = 0; i < REQUEST_TIMES; ++i) {
 65         send(client_socket, content, strlen(content), 0);
 66         usleep(10000);    //10ms
 67     }
 68 
 69     close(client_socket);
 70 
 71     exit(0);
 72 }
 73 
 74 /*
 75  * 参数1为serverip,参数2为server端口号
 76  */
 77 int main(int argc,char *argv[])
 78 {
 79     if(argc != 3) exit_throw("parameter error!
");
 80 
 81     char *server_ip = argv[1];
 82     int server_port = atoi(argv[2]);
 83 
 84     struct shared *psh;
 85 
 86     /*创建共享文件*/
 87     int fd = open(PFILE_NAME, O_RDWR | O_CREAT | O_TRUNC, 0666);
 88     /*初始化0*/
 89     int ret_len = write(fd, &shared, sizeof(struct shared));
 90     if (ret_len != sizeof(struct shared)) {
 91         exit_throw("write error!
");
 92     }
 93     /*映射内存*/
 94     psh = mmap(NULL, sizeof(struct shared), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
 95     close(fd);
 96 
 97     sem_init(&psh->mutex, 1, 1);
 98     
 99     int i, pid[PROCESS_NUM];
100     for (i = 0; i < PROCESS_NUM; i++) {
101         pid_t fpid = fork();
102         if (0 == fpid) {
103             pid[i]=getpid();
104             sem_wait(&psh->mutex);
105             psh->count++;
106             printf("%d processes was created!
", psh->count);
107             sem_post(&psh->mutex);
108             request(server_ip, server_port);
109         }
110         else if (fpid > 0) {
111         }
112         else {
113             exit_throw("fork error!");
114         }
115     }
116 
117     /*等待所有子进程创建完毕*/
118     while (psh->count < PROCESS_NUM) {
119         sleep(0);
120     }
121 
122     wait(NULL);
123 
124     remove(PFILE_NAME);
125     
126     printf("exit all!
");
127 
128     return 0;
129 }

client(Python ):

 1 # -*- coding: UTF-8 -*-
 2 
 3 import threading
 4 import random
 5 import socket
 6 import time
 7 import sys
 8 
 9 class tThread (threading.Thread):
10     counter = 0
11     def __init__(self, threadID, name, ip, port):
12         threading.Thread.__init__(self)
13         self.threadID = threadID
14         self.name = name
15         self.ip = ip
16         self.port = port
17     def run(self):
18         print("开始线程:", self.name+str(self.threadID))
19         sendData(self.name, self.threadID, self.counter, self.ip, self.port)
20         print("退出线程:", self.name+str(self.threadID))
21 
22 def sendData(name, threadID, counter, ip, port):
23     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
24     s.connect((ip, port))
25     for i in range(1000000):
26         counter = counter + 1
27         current_time = time.asctime()
28         try:
29             s.send(("%d times: i am %s and now is %s.
"%(counter, str(name)+str(threadID), str(current_time))).encode('utf-8'))
30         except Exception as e:
31             pass
32     s.close()
33 
34 '''
35     参数1:server端ip
36     参数2:server端口号
37 '''
38 if __name__ == '__main__':
39     threads = []
40     for i in range(1000):
41         t = tThread(i, "Thread", sys.argv[1], int(sys.argv[2]))
42         t.start()
43         threads.append(t)
44     for x in threads:
45         x.join()

 2018-07-08 19:29:25更新:添加测试

原文地址:https://www.cnblogs.com/prophet-ss/p/9180977.html