Linux高性能server规划——多进程编程


多进程编程


多进程编程包含例如以下内容:


  1. 复制进程影映像的fork系统调用和替换进程映像的exec系列系统调用。

  2. 僵尸进程以及怎样避免僵尸进程

  3. 进程间通信(Inter-Process CommunicationIPC)最简单的方式:管道

  4. 3种进程间通信方式:信号量,消息队列和共享内存


fork系统调用


#include<unistd.h>


pid_tfork(void);


该函数的每次都用都返回两次,在父进程中返回的是子进程的PID,在子进程中返回的是0.该返回值是兴许代码推断当前进程是父进程还是子进程的根据。

fork调用失败是返回-1,并设置errno


fork函数复制当前进程,在内核进程表中创建一个新的进程表项。新的进程表项有非常多属性和原进程同样,比方堆指针、栈指针和标志寄存器的值。但也有很多属性被赋予新的值,比方该进程的PPID被设置成原进程的PID,信号位图被清除(原进程设置的信号处理函数不在对新进程起作用)。


此外。创建子进程后,父进程中打开的文件描写叙述符默认在子进程也是打开的,且文件描写叙述符的引用计数加1

不仅如此,父进程的用户根文件夹、当前工作文件夹等变量的引用计数均会加1


exec系统系统调用


有时候我们须要在子进程中运行其它程序。即替换当前进程映像,这就须要使用例如以下exec系列函数之中的一个:


#include <unistd.h>


extern char **environ;


int execl(const char *path, const char*arg, ...);


int execlp(const char *file, const char*arg, ...);


int execle(const char *path, const char*arg,..., char * const envp[]);


int execv(const char *path, char *constargv[]);


int execvp(const char *file, char *constargv[]);


int execvpe(const char *file, char *constargv[], char *const envp[]);


         path參数制定能够运行文件的完整路径。file參数能够接受文件名称,该文件的详细位置则在环境变量PATH中搜寻。arg接受可变參数,argv则接受參数数组,他们都会被传递给新程序的main函数。envp參数用于设置新程序的环境变量。

假设未设置。则新程序将使用由全局变量environ指定的环境变量。


         普通情况下,exec函数时不返回的。除非出错。它出错时返回-1,并设置errno

假设没出错。则远程中exec调用之后的代码都不会运行,由于此时原程序已经被exec指定的程序全然替换(包含代码和数据)。


         exec函数不会关闭原程序打开的文件描写叙述符,除非该文件描写叙述符被设置了类似SOCK_CLOEXEC的属性。


处理僵尸进程


对于多进程程序而言,父进程一般须要跟踪子进程的退出状态。因此,当子进程结束执行时,内核不会马上释放该进程的进程表表项。以满足父进程兴许对该子进程退出信息的查询。在子进程结束执行之后。父进程读取器退出状态之前,我们称该子进程处于僵尸态。若父进程结束或者异常终止,而子进程继续执行,此时子进程的PPID将被系统设置为1,即init进程。init进程接管了该子进程。并等待它结束。


子进程停留在僵尸态。占领内核资源,这是绝对不同意的。毕竟内核资源有限。以下这对函数在父进程中调用。以等待子进程结束,并获取子进程的返回信息,从而避免了僵尸进程的产生,或者使子进程的僵尸态马上结束:


#include<sys/types.h>


#include<sys/wait.h>


pid_t wait(int*status);


pid_twaitpid(pid_t pid, int *status, int options);


wait函数将堵塞进程,直到该进程的某个子进程结束执行为止。

它返回结束执行的子进程的PID,并将子进程的退出状态信息(比方WIFEXITED        表示假设子进程正常结束,它就返回一个非0)存储于status參数指向的内存中。


wait函数的组设特性显然不是server程序期望的。而waitpid函数攻克了这个问题。waitpid仅仅等待由pid參数指定的子进程,假设pid取值为-1,则和wait函数同样,即等待随意一个子进程结束。options的參数能够控制waitpid的行为,当该參数取值为WNOHANG时。waitpid将是非堵塞的:假设pid指定的目标子进程还没有结束或意外终止,则waitpid马上返回0;假设目标子进程确实正常退出了,则waitpid返回该子进程的PID。失败返回-1,并设置errno


要在事件已经发生的情况下运行非堵塞调用才干提高程序的效率。对waitpid函数而言。我们最好在某个子进程退出之后再调用它。那么父进程从何得知某个子进程已经退出了?这正是SIGCHLD信号的用途。

当一个进程结束时,它将给其父进程发送一个SIGCHLD信号。因此,我们能够在父进程中捕获SIGCHLD信号。并在信号处理函数中调用waitpid函数以彻底结束一个子进程。例如以下所看到的:


static voidhandle-child( int sig)


{


         pid_t pid;


         int stat;


         while((pid = waitpid(-1, &stat,WNOHANG)) > 0)


{


         //对子进程进行善后处理


}


}


管道


pipe用于创建管道。管道是父进程和子进程间的经常使用通信手段。管道能在父、子进程之间传递数据,利用的是fork调用之后两个管道文件描写叙述符(fd[0]fd[1])都保持打开。一堆这种文件描写叙述符仅仅能保证父、子进程间一个方向的传输数据,父进程和子进程必须有一个关闭fd[0[,还有一个关闭fd[1]


要实现父、子进程之间的双向传输数据。就必须使用两个管道。socket编程接口提供了一个创建全双工管道的系统调用:socketpair


信号量


当多个进程表同一时候訪问系统上的某个资源的时候,比方同一时候写一个数据库的某条记录,或者同一时候改动某个文件。就须要考虑进城的同步问题,以确保任一时刻仅仅有一个进程能够拥有对资源的独占式訪问。

通常,程序对共享资源的訪问的代码仅仅是非常短的一段,你就是这一段代码引发了进程之间的竞态条件。

我们称这段代码为关键代码段。或者临界区。


信号量是一种特殊的变量,它仅仅能取自然数并仅仅支持两种操作:等待(wait)和信号(signal),这两种操作更常见的称呼是PV操作。

如果有信号量SV,则对它的PV操作含义例如以下:


P(SV),假设SV的值大于0,就将它减1:;假设SV的值为0,则挂起进程的运行。


V(SV)。假设有其它进程由于等待SV二挂起,。则换星之。假设没有。则将SV1


信号口粮的却仅仅能够是不论什么自然是,但最经常使用的、最简单的信号量是二进制信号量,它仅仅能取01两个值。


信号量API主要包括3个系统调用:semgetsemopsemctl。它们都被设计为操作一组信号量。即信号量集,而不是单个信号量。


semget系统调用


semget系统调用创建一个新的信号量集,或者获取一个已经存在的信号量集。


#include <sys/types.h>


#include <sys/ipc.h>


#include <sys/sem.h>


int semget(key_t key, int nsems, intsemflg);


key參数是一个键值,用来标识一个全局唯一的信号量集,就像文件名称全局唯一地标识一个文件一样。要通过信号量通信的进程须要使用同样的键值来创建/获取该信号量。


nsems參数指定要创建/获取的信号量集中信号量的数目。假设是创建信号量、则该值必须被指定;假设是获取已经存在的信号量,则能够把它设置为0


semflg參数指定一组标志。它低端的9个比特是该信号量的权限,其格式和含义都与系统调用openmode參数同样。此外。它还能够和IPC_CREATE标志做按位或运算创建新的信号量集。

此时即使信号量已经存在。semget也不会产生错误。

还能够联合使用IPC_CREATEIPC_EXCL标志来确保创建一组新的、唯一的信号量集。在这样的情况下,假设信号量集已经存在。则semget返回错误并设置errnoEEXIT


semget成功时返回一个正整数值。它是信号量集的标示符;semget失败时返回-1,并设置errno


semop系统调用


semop系统调用改变信号量的值,即运行PV操作。

在讨论semop之前。我们须要先介绍与每一个信号量关联的一些和总要的内核变量:


unsigned shortsemval;                                        //信号量的值


unsigned shortsemzcnt;                                               //等待信号量值变为0的进程数量


unsigned shortsemncnt;                                     //等待信号量值添加的进程数量


pid_t sempid;                                                          //最后一次运行semop操作的进程ID


semop对信号量的操作实际上就是对这些内核变量的操作。

semop的定义例如以下:


#include<sys/types.h>


#include<sys/ipc.h>


#include<sys/sem.h>


int semop(intsemid, struct sembuf *sops, unsigned nsops);


当中。semid參数是由semget调用返回的信号量集标示符。用于指定被操作的目标信号量集。sops參数指向一个sembuf结果类型的数组,sembuf结果体的定义例如以下:


struct sembuf{


         unsigned short int sem_num;


         short int sem_op;


         short int sem_flag;


};


当中,sem_num成员是信号量集中信号的编号,0表示信号量集中的第一个信号量。sem_op成员指定操作类型,其可选值为正整数。0和负整数。每种类型的操作的行为又受到sem_flag成员的影响。sem_flag的可选值是IPC_NOWAITSEM_UNDO

IPC_NOWAIT含义是。不管信号量操作是否成功,semop调用都将马上返回,这类似非堵塞I/O操作。SEM_UNDO的含义是,当进程时取消正在进行的semop操作。详细来说,sem_opsem_flag将依照例如以下方式影响semop的行为:


假设sem_op大于0,则semop将被操作的信号量的值semval添加sem_op。该操作要求调用进程对被操作信号量集拥有写权限,此时若设置了SEM_UNDO标志,则系统将更新进程的semadj变量(用于跟踪对信号量的改动情况)。


假设sem_op等于0,则表示这是一个等待0操作。该操作要求进程对被操作的信号量集拥有读权限。

假设此时信号量的值是0,则调用马上返回。

假设信号量的值不是0,则semop依据sem_flag的设置情况运行失败返回或者堵塞以等待信号量变为0


假设sem_op小于0。则表示对信号量值进行减操作。即期望获得信号量。该操作要求进程对操作信号量集拥有写权限。假设信号量的值semval大于或等于sem_op的绝对值。则semop操作成功。调用进程马上获得信号量。而且系统将该信号量的semval值减去sem_op的绝对值。此时假设设置了SEM_UNDO标志,则系统将更新进程的semadj变量。假设信号量的值小于sem_op的绝对值值。则semop依据sem_flag的设置情况运行失败返回或者堵塞以等待信号量变可用。


semop系统调用的第3个參数num_sem_ops指定要运行的操作个数。即sem_ops数组中元素的个数。semop对数组sops中的每一个成员依照数组顺序依次运行操作,而且该过程是院子操作,以避免别的进程在同一时刻依照不同的顺序对该信号集中的信号量运行semop操作导致的竞态条件。


semop成功时返回0。失败则返回-1并设置errno。失败的时候。sops中数组指定全部操作都不被运行。


semctl系统调用


semctl系统调用同意调用者对信号量进行直接控制。


#include<sys/types.h>


#include<sys/ipc.h>


#include<sys/sem.h>


int semctl(intsemid, int semnum, int cmd, ...);


semid參数是由semget调用返回的信号量标识符,用以指定被操作的信号量集。semnum參数指定被操作的信号量在信号量集中的编号。cmd參数指定要运行的命令。

有的命令须要调用者传递第四个參数(比如cmdIPC_RMID时,表示马上一处信号量集,唤醒全部等待该信号量集的进程,须要第四个參数)。


一般第四个參数的推荐格式例如以下:


union semun {


               int              val;    /* Value for SETVAL */


               struct semid_ds *buf;    /* Buffer for IPC_STAT, IPC_SET */


               unsigned short  *array; /* Array for GETALL, SETALL */


               struct seminfo  *__buf; /* Buffer for IPC_INFO


                                          (Linux-specific) */


           }


semtl成功时返回值取决于cmd參数,失败是返回-1并设置errno


特殊键值IPC_PRIVATE


semget调用者能够给其Key參数传递一个特殊的键值IPC_PRICATE(其值为0),这样不管该信号量是否已经存在,semget都将创建一个新的信号量。使用该键值创建的信号量并不是像其它的名字声称的那样是进程私有的。其它进程,尤其是子进程。也有方法来訪问这个信号量。例如以下展示了使用IPC_PRIVATE信号量的演示样例:


#include <sys/sem.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/wait.h>

union semun
{
     int val;                  
     struct semid_ds* buf;     
     unsigned short int* array;
     struct seminfo* __buf;    
};

void pv( int sem_id, int op )
{
    struct sembuf sem_b;
    sem_b.sem_num = 0;
    sem_b.sem_op = op;
    sem_b.sem_flg = SEM_UNDO;
    semop( sem_id, &sem_b, 1 );
}

int main( int argc, char* argv[] )
{
    int sem_id = semget( IPC_PRIVATE, 1, 0666 );

    union semun sem_un;
    sem_un.val = 1;
    semctl( sem_id, 0, SETVAL, sem_un );

    pid_t id = fork();
    if( id < 0 )
    {
        return 1;
    }
    else if( id == 0 )
    {
        printf( "child try to get binary sem
" );
        pv( sem_id, -1 );
        printf( "child get the sem and would release it after 5 seconds
" );
        sleep( 5 );
        pv( sem_id, 1 );
        exit( 0 );
    }
    else
    {
        printf( "parent try to get binary sem
" );
        pv( sem_id, -1 );
        printf( "parent get the sem and would release it after 5 seconds
" );
        sleep( 5 );
        pv( sem_id, 1 );
    }

    waitpid( id, NULL, 0 );
    semctl( sem_id, 0, IPC_RMID, sem_un );
    return 0;
}


运行结果例如以下:


parenttry to get binary sem


parentget the sem and would release it after 5 seconds


childtry to get binary sem


childget the sem and would release it after 5 seconds


共享内存


共享内存是最高效的IPC机制,由于它不涉及进程之间的不论什么传输数据。这样的高效带来的问题是,我们必须使用其它辅助手段来同步进程对内存的訪问。否则会产生竞态条件。因此,共享内存通常和其它进程间通信方式一起使用。


Linux共享内存的API都定义在sys/shm.h头文件里,包含4个系统调用:shmgetshmatshmdtshmctl


shmget系统调用


shmget系统调用创建一段新的共享内存,或者获取一段已经存在的共享内存。

其定义例如以下:


#include <sys/ipc.h>


#include <sys/shm.h>


int shmget(key_t key, size_t size, intshmflg);


shmget系统调用一样。key參数是一个键值,用来标识一段全局唯一的共享内存。

size參数指定共享的大小。单位是字节。假设是创建新的共享内存,则size值必须被指定。

假设是获取已经存在的共享内存,则能够把size设置为0


shmflg參数的使用和含义与semget系统调用的semflag參数同样,只是shmget支持两个额外的标志——SHM_HUGETLBSHM_NORESERVE。他们的含义例如以下:


SHM_HUGETLB。类似于mmapMAP_HUGETLB标志。系统将使用“大页面“来为共享分配空间。


SHM_NORESERVE,类似于mmapMAP_NORESERVE标志,不为共享内存保留交换分区。这样。当物理内存不足时,对共享内存运行写操作将触发SIGSEGV信号。


shmget成功时返回一个正整数,它是共享内存的标识符。

shmget失败时返回-1,并设置errno


假设shmget用于创建共享内存,则这段共享内存的全部字节都被初始化为0,与之关联的内核数据结构shmid_ds将被创建并初始化。


shmatshmdt系统调用


共享内存被创建/获取之后。我们不能马上訪问它。而是须要先将它关联到进程的地址空间中。

使用完共享内存之后,我们也须要将它从地址空间中分离。

这两项任务分别由例如以下两个系统调用实现:


#include <sys/types.h>


#include <sys/shm.h>


void *shmat(int shmid, const void *shmaddr,int shmflg);


int shmdt(const void *shmaddr);


当中shmid參数是由shmget调用返回的共享内存标识符。shmaddr參数指定共享内存关联到进程的哪块地址空间。终于的效果还是受到shmflg參数的可选标志SHM_RND的影响:


  1. 假设shmaddrNULL,则被关联的地址由操作系统选择。这是推荐的做法。以确保可移植性。

  2. 假设shmadd非空,而且SHM_RND标志未被设置,则共享内存关联到addr指定的地质处。

  3. 假设shm_addr非空。而且设置了SHM_RND标志。则被关联的地址是[ shmaddr - (shmadrr % SHMLBA)]SHMLBA的含义是“段低端边界地址倍数“,它必须是内存页面大小的整数倍。

    如今在Linux中,等于一个内存页大小,SHM_RND的含义是将共享内存被关联的地址向下圆整到离shm_addr近期的SHMLBA的证书倍地质处。

    除了SHM_RND标志外,shmflg參数还支持例如以下标志:

    SHM_RDONLY:进程仅能读取共享内存中的内容。假设没有该标志,则进程能够同一时候对共享内存读写。

    SHM_REMAP:假设shmaddr已经被关联到一段共享内存上,则冲向关联。

    SHM_EXEC:它指定对共享内存段的运行权限。

    shmat成功时返回共享内存被关联到的地址,失败是则返回(void*-1并设置errno

    shmdt函数将关联到shm_addr处的共享内存分离。成功时返回0,失败则返回-1并设置errno


shmctl系统调用


shmctl系统调用控制共享内存的某些属性。

#include<sys/ipc.h>


#include <sys/shm.h>


int shmctl(int shmid, int cmd, structshmid_ds *buf);


shmid參数是由shmget调用返回的共享内存标识符。cmd參数指定要运行的命令。

shmctl成功时的返回值取决于cmd參数,失败时返回-1,并设置errno


共享内存的POSIX方法


我们介绍过mmap函数。利用Map_ANONYMOUS标志我们能够实现父、子进程之间的匿名内存共享。通过打开同一个文件。mmap也能够实现无关进程之间的内存共享。Linux提供了第二种利用mmap在无关进程之间共享内存的方式。这样的方式无须不论什么文件的支持。但它须要先使用例如以下函数来创建或打开一个POSIX共享内存对象:


#include<sys/mman.h>


#include <sys/stat.h>        /* For mode constants */


#include<fcntl.h>           /* For O_*constants */


intshm_open(const char *name, int oflag, mode_t mode);


shm_open的用法与open系统调用全然同样。name參数指定要创建/打开的共享内存对象。从可移植的角度考虑。该參数应该使用”/somename”的格式:以”/”開始。后接多个字符,且这些字符都不是”/”。以””结尾,长度不超过NAME_MAX


oflag指定创建方式。

它能够是下列标志中的一个或者多个的按位或:


O_RDONLY。以制度方式打开共享内存对象。


O_RDWR。以可读、可写方式打开共享内存对象。


O_CREAT。假设共享内存对象不存在,则创建之。此时mode參数的最低9位将制定该共享内存对象的訪问权限。共享内存对象被创建的时候,其初试长度为0


O_EXCL

O_CREAT一起使用。假设由name指定的共享内存对象已经存在。则shm_open调用返回错误。否则就创建一个新的共享内存对象。


O_TRUNC。共享内存对象已经存在。则把它截断,使其长度为0


shm_open调用成功时返回一个文件描写叙述符。

该文件描写叙述符可用于兴许的mmap调用,从而将共享内存关联到调用进程。sh_open失败时返回-1。并设置errno


和打开的文件最后须要关闭一样。由shm_open创建的共享内存对象使用完之后也须要被删除。


#include<sys/mman.h>


#include <sys/stat.h>        /* For mode constants */


#include<fcntl.h>           /* For O_*constants */


intshm_unlink(const char *name);


该函数将name參数指定的共享内存对象标记为等待删除。当全部使用该共享内存对象的进程都是用ummap将它从进程中分离之后,系统将销毁这个共享内存对象所占领的资源。


共享内存实例


聊天室server程序:一个多进程server,一个子进程处理一个客户连接。同一时候,我们将全部客户socket连接的读缓冲设计为一块内存共享,server程序如程序清单1所看到的:


消息队列


消息队列是在两个进程之间传递二进制块数据的一种鸡蛋有效的方式。

每一个数据块都有一个特定的类型,接收方能够依据类型来有选择的接收数据。而不一定像管道和匿名管道那样必须以先进先出的方式接收数据。


Linux消息队列的4API包含四个系统调用:msggetmsgsndmsgcrvmsgctl


msgget系统调用


msgget系统调用创建一个消息队列,或者获取一个已有的消息队列。


#include <sys/types.h>


#include <sys/ipc.h>


#include <sys/msg.h>


 


int msgget(key_t key, int msgflg);


msgget系统调用一样,key參数是一个键值。用来标识一个全局唯一的消息队列。


msgflg參数的使用和含义与semget系统调用的sem_flags參数同样。


msgget成功时返回一个正整数值。它是消息队列的标识符。msgget失败时返回-1。并设置errno


假设msgget用于创建消息队列,则与之关联的内核数据结构msqid_ds将被创建并初始化。msqid_ds结构体的定义例如以下:


struct msqid_ds{


         structipc_perm msg_perm;             //消息队列的操作权限


         time_tmsg_stime;                             //最后一次调用msgsnd的时间


         time_tmsg_rtime;                              //最后一次调用msgrcv的时间


time_t msg_ctime;                             //最后一次被改动的时间


         unsignedlong __msg_cbytes;                   `//消息队列中已有的字节数


         msgqnum_tmsg_q num;                  //消息队列中已有的消息树=


         msglen_t msg_qbytes;                    //消息队列同意的最大字节数


         pid_tmsg_lspid;                                  //最后运行msgsnd的进程的PID


         pid_tmsg_lrpid;                                   //最后运行msgrcv的进程的PID


}


msgsnd系统调用


#include <sys/types.h>


#include <sys/ipc.h>


#include <sys/msg.h>


int msgsnd(int msqid, const void *msgp,size_t msgsz, int msgflg);


msqid是由msgget调用返回的消息队列标识符。


msg_ptr參数指向一个准备发送的消息,消息必须被定义例如以下类型:


struct msgbuf {


               long mtype;       /* message type, must be > 0 */


               char mtext[1];    /* message data */


          };


当中。mtype何曾元指定消息的类型。它必须是一个正整数。mtext是消息数据。msg_sz參数是消息的数据部分(mtext)的长度。这个长度能够为0,表示没有消息数据。


msgflg參数控制msgsnd的行为。

它通常仅支持IPC_NOWAIT标志。即以非堵塞的方式发送消息。默认情况下。发送消息假设消息队列满了。则msgsnd将堵塞。

IPC_NOWAIT标志被指定。则msgsnd调用可能马上返回并设置errnoEAGAIN


msgrcv系统调用


msgrcv系统调用从消息队列中获取消息:


#include <sys/types.h>


#include <sys/ipc.h>


#include <sys/msg.h>


ssize_t msgrcv(int msqid, void *msgp,size_t msgsz, long msgtyp, int msgflg);


msqid是消息队列标识符,msgp用于存储接收的消息,msgsz指的是消息数据部分的长度。msgtype參数指定接收何种类型的消息。我们能够使用例如以下几种方式指定消息类型:


msgtype等于0:读取消息队列中的第一个消息。


msgtype大于0:读取消息队列中的第一个类型为msgtype的消息。


msggtype小于0:读取消息队列中第一个类型值比msgtype绝对值小的消息。


msgflag控制msgrcv函数的行为。

它能够是例如以下标志的按位或:


IPC_NOWAIT:假设消息队列中没有消息,则msgrcv调用马上返回并设置errnoENOMSG


MSG_CEXEPT:假设msgtype大于0,则接受消息队列中第一个非msgtype类型的消息。


MSG_NOERROR:假设消息数据部分的长度超过了msgsz就将它截断。

,


msgctl系统调用


msgctl系统调用控制消息队列的某些属性,定义例如以下:


#include <sys/types.h>


#include <sys/ipc.h>


#include <sys/msg.h>


int msgctl(int msqid, int cmd, structmsqid_ds *buf);


msgqid是消息队列的标识符。cmd參数运行要运行的命令(比如IPC_STAT指将消息队列关联的内核数据结构拷贝到buf中)


程序清单1
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>

#define USER_LIMIT 5
#define BUFFER_SIZE 1024
#define FD_LIMIT 65535
#define MAX_EVENT_NUMBER 1024
#define PROCESS_LIMIT 65536

struct client_data
{
    sockaddr_in address;
    int connfd;
    pid_t pid;
    int pipefd[2];
};

static const char* shm_name = "/my_shm";
int sig_pipefd[2];
int epollfd;
int listenfd;
int shmfd;
char* share_mem = 0;
client_data* users = 0;
int* sub_process = 0;
int user_count = 0;
bool stop_child = false;

int setnonblocking( int fd )
{
    int old_option = fcntl( fd, F_GETFL );
    int new_option = old_option | O_NONBLOCK;
    fcntl( fd, F_SETFL, new_option );
    return old_option;
}

void addfd( int epollfd, int fd )
{
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLET;
    epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
    setnonblocking( fd );
}

void sig_handler( int sig )
{
    int save_errno = errno;
    int msg = sig;
    send( sig_pipefd[1], ( char* )&msg, 1, 0 );
    errno = save_errno;
}

void addsig( int sig, void(*handler)(int), bool restart = true )
{
    struct sigaction sa;
    memset( &sa, '', sizeof( sa ) );
    sa.sa_handler = handler;
    if( restart )
    {
        sa.sa_flags |= SA_RESTART;
    }
    sigfillset( &sa.sa_mask );
    assert( sigaction( sig, &sa, NULL ) != -1 );
}

void del_resource()
{
    close( sig_pipefd[0] );
    close( sig_pipefd[1] );
    close( listenfd );
    close( epollfd );
    shm_unlink( shm_name );
    delete [] users;
    delete [] sub_process;
}

void child_term_handler( int sig )
{
    stop_child = true;
}

int run_child( int idx, client_data* users, char* share_mem )
{
    epoll_event events[ MAX_EVENT_NUMBER ];
    int child_epollfd = epoll_create( 5 );
    assert( child_epollfd != -1 );
    int connfd = users[idx].connfd;
    addfd( child_epollfd, connfd );
    int pipefd = users[idx].pipefd[1];
    addfd( child_epollfd, pipefd );
    int ret;
    addsig( SIGTERM, child_term_handler, false );

    while( !stop_child )
    {
        int number = epoll_wait( child_epollfd, events, MAX_EVENT_NUMBER, -1 );
        if ( ( number < 0 ) && ( errno != EINTR ) )
        {
            printf( "epoll failure
" );
            break;
        }

        for ( int i = 0; i < number; i++ )
        {
            int sockfd = events[i].data.fd;
            if( ( sockfd == connfd ) && ( events[i].events & EPOLLIN ) )
            {
                memset( share_mem + idx*BUFFER_SIZE, '', BUFFER_SIZE );
                ret = recv( connfd, share_mem + idx*BUFFER_SIZE, BUFFER_SIZE-1, 0 );
                if( ret < 0 )
                {
                    if( errno != EAGAIN )
                    {
                        stop_child = true;
                    }
                }
                else if( ret == 0 )
                {
                    stop_child = true;
                }
                else
                {
                    send( pipefd, ( char* )&idx, sizeof( idx ), 0 );
                }
            }
            else if( ( sockfd == pipefd ) && ( events[i].events & EPOLLIN ) )
            {
                int client = 0;
                ret = recv( sockfd, ( char* )&client, sizeof( client ), 0 );
                if( ret < 0 )
                {
                    if( errno != EAGAIN )
                    {
                        stop_child = true;
                    }
                }
                else if( ret == 0 )
                {
                    stop_child = true;
                }
                else
                {
                    send( connfd, share_mem + client * BUFFER_SIZE, BUFFER_SIZE, 0 );
                }
            }
            else
            {
                continue;
            }
        }
    }

    close( connfd );
    close( pipefd );
    close( child_epollfd );
    return 0;
}

int main( int argc, char* argv[] )
{
    if( argc <= 2 )
    {
        printf( "usage: %s ip_address port_number
", basename( argv[0] ) );
        return 1;
    }
    const char* ip = argv[1];
    int port = atoi( argv[2] );

    int ret = 0;
    struct sockaddr_in address;
    bzero( &address, sizeof( address ) );
    address.sin_family = AF_INET;
    inet_pton( AF_INET, ip, &address.sin_addr );
    address.sin_port = htons( port );

    listenfd = socket( PF_INET, SOCK_STREAM, 0 );
    assert( listenfd >= 0 );

    ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
    assert( ret != -1 );

    ret = listen( listenfd, 5 );
    assert( ret != -1 );

    user_count = 0;
    users = new client_data [ USER_LIMIT+1 ];
    sub_process = new int [ PROCESS_LIMIT ];
    for( int i = 0; i < PROCESS_LIMIT; ++i )
    {
        sub_process[i] = -1;
    }

    epoll_event events[ MAX_EVENT_NUMBER ];
    epollfd = epoll_create( 5 );
    assert( epollfd != -1 );
    addfd( epollfd, listenfd );

    ret = socketpair( PF_UNIX, SOCK_STREAM, 0, sig_pipefd );
    assert( ret != -1 );
    setnonblocking( sig_pipefd[1] );
    addfd( epollfd, sig_pipefd[0] );

    addsig( SIGCHLD, sig_handler );
    addsig( SIGTERM, sig_handler );
    addsig( SIGINT, sig_handler );
    addsig( SIGPIPE, SIG_IGN );
    bool stop_server = false;
    bool terminate = false;

    shmfd = shm_open( shm_name, O_CREAT | O_RDWR, 0666 );
    assert( shmfd != -1 );
    ret = ftruncate( shmfd, USER_LIMIT * BUFFER_SIZE ); 
    assert( ret != -1 );

    share_mem = (char*)mmap( NULL, USER_LIMIT * BUFFER_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shmfd, 0 );
    assert( share_mem != MAP_FAILED );
    close( shmfd );

    while( !stop_server )
    {
        int number = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
        if ( ( number < 0 ) && ( errno != EINTR ) )
        {
            printf( "epoll failure
" );
            break;
        }

        for ( int i = 0; i < number; i++ )
        {
            int sockfd = events[i].data.fd;
            if( sockfd == listenfd )
            {
                struct sockaddr_in client_address;
                socklen_t client_addrlength = sizeof( client_address );
                int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
                if ( connfd < 0 )
                {
                    printf( "errno is: %d
", errno );
                    continue;
                }
                if( user_count >= USER_LIMIT )
                {
                    const char* info = "too many users
";
                    printf( "%s", info );
                    send( connfd, info, strlen( info ), 0 );
                    close( connfd );
                    continue;
                }
                users[user_count].address = client_address;
                users[user_count].connfd = connfd;
                ret = socketpair( PF_UNIX, SOCK_STREAM, 0, users[user_count].pipefd );
                assert( ret != -1 );
                pid_t pid = fork();
                if( pid < 0 )
                {
                    close( connfd );
                    continue;
                }
                else if( pid == 0 )
                {
                    close( epollfd );
                    close( listenfd );
                    close( users[user_count].pipefd[0] );
                    close( sig_pipefd[0] );
                    close( sig_pipefd[1] );
                    run_child( user_count, users, share_mem );
                    munmap( (void*)share_mem,  USER_LIMIT * BUFFER_SIZE );
                    exit( 0 );
                }
                else
                {
                    close( connfd );
                    close( users[user_count].pipefd[1] );
                    addfd( epollfd, users[user_count].pipefd[0] );
                    users[user_count].pid = pid;
                    sub_process[pid] = user_count;
                    user_count++;
                }
            }
            else if( ( sockfd == sig_pipefd[0] ) && ( events[i].events & EPOLLIN ) )
            {
                int sig;
                char signals[1024];
                ret = recv( sig_pipefd[0], signals, sizeof( signals ), 0 );
                if( ret == -1 )
                {
                    continue;
                }
                else if( ret == 0 )
                {
                    continue;
                }
                else
                {
                    for( int i = 0; i < ret; ++i )
                    {
                        switch( signals[i] )
                        {
                            case SIGCHLD:
                            {
	                        pid_t pid;
	                        int stat;
	                        while ( ( pid = waitpid( -1, &stat, WNOHANG ) ) > 0 )
                                {
                                    int del_user = sub_process[pid];
                                    sub_process[pid] = -1;
                                    if( ( del_user < 0 ) || ( del_user > USER_LIMIT ) )
                                    {
                                        printf( "the deleted user was not change
" );
                                        continue;
                                    }
                                    epoll_ctl( epollfd, EPOLL_CTL_DEL, users[del_user].pipefd[0], 0 );
                                    close( users[del_user].pipefd[0] );
                                    users[del_user] = users[--user_count];
                                    sub_process[users[del_user].pid] = del_user;
                                    printf( "child %d exit, now we have %d users
", del_user, user_count ); 
                                }
                                if( terminate && user_count == 0 )
                                {
                                    stop_server = true;
                                }
                                break;
                            }
                            case SIGTERM:
                            case SIGINT:
                            {
                                printf( "kill all the clild now
" );
                                //addsig( SIGTERM, SIG_IGN );
                                //addsig( SIGINT, SIG_IGN );
                                if( user_count == 0 )
                                {
                                    stop_server = true;
                                    break;
                                }
                                for( int i = 0; i < user_count; ++i )
                                {
                                    int pid = users[i].pid;
                                    kill( pid, SIGTERM );
                                }
                                terminate = true;
                                break;
                            }
                            default:
                            {
                                break;
                            }
                        }
                    }
                }
            }
            else if( events[i].events & EPOLLIN )
            {
                int child = 0;
                ret = recv( sockfd, ( char* )&child, sizeof( child ), 0 );
                printf( "read data from child accross pipe
" );
                if( ret == -1 )
                {
                    continue;
                }
                else if( ret == 0 )
                {
                    continue;
                }
                else
                {
                    for( int j = 0; j < user_count; ++j )
                    {
                        if( users[j].pipefd[0] != sockfd )
                        {
                            printf( "send data to child accross pipe
" );
                            send( users[j].pipefd[0], ( char* )&child, sizeof( child ), 0 );
                        }
                    }
                }
            }
        }
    }

    del_resource();
    return 0;
}


版权声明:本文博主原创文章,博客,未经同意不得转载。

原文地址:https://www.cnblogs.com/yxwkf/p/4853443.html