Linux下的进程间通信(二)

一、     XSI IPC

1. 介绍

XSI IPC包括消息队列、信号量和共享内存,它们都用一个非负的标示符(identifier)加以引用。标识符是IPC对象的内部名,为了使进程间能够在同一个IPC上相连,就要有一个外部名。键(key)就是这个外部名,每个IPC对象都与一个键相关联。

  几种使客户进程与服务进程关联的方法

(1)    服务进程可以指定键IPC_PRIVATE创建一个新IPC结构,将返回的标识符存在某处(如:一个文件)以便客户进程读取。注意:使用IPC_PRIVATE只能是创建一个新IPC,且只有服务进程将标识符写到文件中后,客户进程才能获取到。

(2)    在公用都文件中定义一个客户进程和服务进程都认可的键。然后通过服务进程指定此键创建一个新的IPC结构。如已存在,需要进行错误处理(删除已存在的IPC,创建新的)

(3)    客户进程和服务进程认同一个路径名和项目ID(ID为0~255的字符),通过ftok将这两个值变换为一个键,然后再通过(2)进行后面的操作。

#include <sys/ipc.h>

key_t ftok(const char* path, int id);

返回值:如成功,返回键值,否则返回key_t-1

  权限

每个IPC结构都有一个ipc_perm结构,该结构规定了权限和所有者。在创建时赋值,并可以通过msgctl、semctl或shmctl进行更改操作,且进行这些操作必须是IPC的创建者或是超级用户。

  限制

三种形式的XSI IPC都有内置限制,这些限制大多可以通过重新配置内核进行更改。

  优缺点

(1)主要问题:

IPC结构是在系统范围内起作用的,没有访问计数。例如,如果一个进程创建了一个消息队列,在该队列中放入几个消息,然后终止,但是该消息队列及其内容并不会被删除,直到有一个进程调用msgrcv或msgctl读取消息队列或删除队列;或某个进程调用ipcrm命令删除消息队列;或由正在再启动的系统删除消息队列。

IPC结构在文件系统中没有文件名,所以所有文件系统的操作函数都无法使用,必须重新定义对应的新函数来对IPC进行操作。

(2) 优点

 可靠:因为客户进程和服务进程都在一台机器上

流是可控的:有阻塞控制之类的流控制策略

面向记录:数据不会因为一方的关闭而丢失

可以用非先进先出方式处理:消息有自己的优先级

IPC类型

无连接?

可靠?

流可控?

记录?

优先级?

消息队列

×

STREAMS

×

UNIX域流套接字

×

×

×

UNIX域数据报套接字

×

×

FIFO(非STREAMS

×

×

×

             

二、     信号量(semaphore)

  又称为信号灯,与管道、FIFO以及消息队列不同,它是一个计数器,用于多进程对共享数据对象的访问控制。它是用来协调不同进程间的数据对象的,而最主要的应用是共享内存方式的进程间通信。相当于内存中的标志,进程可以根据它判定是否能够访问某些共享资源,同时,进程也可以修改该标志。除了用于访问控制外,还可用于进程同步。

分为二值信号灯和计算信号灯;前者只能是0或者1,类似于互斥锁;后者可以取任意非负值,类似于读写锁。

它是随内核持续的,也就是说,必须重启系统或者显式的删除一个信号量集时,该信号量集才会真正被删除。

为了获取共享资源,进程需要执行下列操作:

(1)    测试控制该资源的信号量

(2)    若此信号量的值为正,则进程可以使用该资源,进程将信号量减1,表示它使用了一个资源单位

(3)    若此信号量的值为0,则进程进入休眠状态,直至信号量的值大于0。进程被唤醒,返回第(1)步

当进程不再使用有一个信号量控制的共享资源时,该信号量值加1。如果有进程正在休眠等待此信号量,则唤醒它们。所有的加1减1操作都是原子操作。

特性:

(1)                信号量可能是多个非负值的集合,在创建时要指定集合中信号量值的数量。

(2)                创建信号量(semget)与对其赋初值(semctl)分开。

(3)                没有进程使用的XSI IPC不会像管道那样自动直接释放,而是如果没有手动释放就会一直保留。

内核为每个信号量集合设置了一个semid_ds结构,里面包含了该信号量的一些数据,如信号量集合中的信号量数量,最后修改时间等。

1. 函数

l  int semget(key_t key, int nsems, int flag)

头文件:sys/sem.h

参数:key为信号量的键值,nsems为信号量集合的信号量数量,flag为创建属性,如IPC_CREAT,IPC_EXCL等

返回值:如果成功,返回信号量ID,否则返回-1

l  int semctl(int semid, int semnum, int cmd, .../* union semun arg */);

头文件:sys/sem.h

参数:semid为信号量ID;semnum为该信号量集合中的一个成员(0~nsems-1);cmd为命令参数:IPC_STAT,IPC_SET,IPC_RMID,GETVAL,SETVAL,GETPID,GETNCNT,GETZCNT,GETALL和SETALL等;semun为一个联合体,成员针对于cmd中的不同命令。

返回值:除了GET命令,函数返回相应的值,其他都返回0

l  int semop(int semid, struct sembuf semoparray[], size_t nops)

头文件:sys/sem.h

参数:semid为信号量的ID;semoparray指向信号量操作数组,信号量操作由sembuf结构表示:

struct sembuf{

       unsigned short sem_num;       /*member # in set 0 ~ nsems-1 */

       short              sem_op;          /*operation (negative , 0, or positive*/

       short              sem_flg;          /*IPC_NOWAIT, SEM_UNDO*/

}

sem_num对应信号集中的信号灯,0对应第一个信号灯。

sem_op的值大于0,等于0以及小于0确定了对sem_num指定的信号灯进行的三种操作。

sem_flg可取IPC_NOWAIT以及SEM_UNDO两个标志。如果设置了SEM_UNDO标志,那么在进程结束时,相应的操作将被取消,这是比较重要的一个标志位。如果设置了该标志位,那么在进程没有释放共享资源就退出时,内核将代为释放。如果为一个信号灯设置了该标志,内核都要分配一个sem_undo结构来记录它,为的是确保以后资源能够安全释放。

nops规定该数组中操作的数量,此值可以是负值、0或正值:

n  如果为正,表示进程释放占用的资源数,sem_op值加到信号量的值上,如果指定了undo标志,则也从该进程的此信号量调整值中减去sem_op。

n  如果为负,表示要获取由该信号量控制的资源。

如果信号量值小于sem_op的绝对值(即资源不满足要求),则:

(a)    若指定了IPC_NOWAIT,则semop出错,返回EAGAIN

(b)    若未指定IPC_NOWAIT,则该信号量的semncnt值加1(因为调用进程将进入休眠状态),然后调用进程被挂起直至下列事件之一发生:

ⅰ此信号量编程大于等于sem_op的绝对值,此信号量的semncnt减1(因为挂起已恢复)并且从信号量值中减去sem_op的绝对值,如果指定了undo标记,则sem_op的绝对值也加到该进程的此信号量调整值上。

ⅱ从系统中删除此信号量,函数出错,返回EIDRM

ⅲ进程扑捉到一个信号,并从信号处理程序返回。在此情况下,此信号量的semncnt值减1(因为调用进程不再等待),函数出错返回EINTR

n  如果sem_op为0,表示调用进程希望等到该信号量值变成0。

如果信号量值当前为0,则函数立即返回;如果非0,则:

(a)  若指定了IPC_NOWAIT,则出错返回EAGAIN

(b)  若未指定IPC_NOWAIT,则该信号量的semzcnt值加1(因为调用进程将进入休眠状态,然后调用进程挂起,直至下列事件之一发生:

ⅰ此信号量值变为0,此信号量的semzcnt值减1(因为调用进程结束休眠)

ⅱ从系统中删除此信号量,函数出错,返回EIDRM

ⅲ进程扑捉到一个信号,并从信号处理程序返回。在此情况下,此信号量的semzcnt值减1(因为调用进程不再等待),函数出错返回EINTR。

该函数具有原子性。

l  exit时的信号量调整

只要信号量操作指定了SEM_UNDO标志,然后分配资源(sem_op值小于0),那么内核就会记住对于该特定信号量,分配给调用经常多少资源,当该进程终止时,不论自愿还是不自愿,内核都将检验该进程是否还有尚未处理的信号量调整值,如果有,则按调整值对相应量值进行处理。

如果用带SETVAL或SETALL命令的semctl设置一信号量的值,则在所有进程中,对于该信号量的调整值都设置为0。

2. 信号量与记录锁对比

记录锁时间消耗比信号量锁多60%左右。

如果只使用锁一个资源并且不需要使用XSI信号量的所有花哨的功能,则宁可使用记录锁,理由是使用简单,且进程终止时系统会处理任何遗留下来的锁。

3. 代码:自己写一个互斥锁

#include <iostream.h>
#include <sys/sem.h>

#define LOCK_PATH "/tmp/mylock"
#define MAX_TRIES 20
#define SEM_ID    10

int semid, initflag;
struct sembuf postop, waitop;


void my_lock(int fd)
{
    int  oflag,i;
    union semun arg;
    struct semid_ds  seminfo;
    
    if(initflag == 0)
    {
        oflag = IPC_CREAT | IPC_EXCL | SVSEM_MODE;
        if((semid = semget(ftok(LOCK_PATH, fd), 1, oflag)) >= 0)
        {    
            arg.val = SEM_ID;        
            semctl(semid, 0, SETVAL, arg);
        }
        else if(errno == EEXIST)
        {
            semid = semget(ftok(LOCK_PATH, fd), 1, SVSEM_MODE);
            for(i = 0; i < MAX_TRIES; i++)
            {
                semctl(semid, 0, IPC_STAT, arg);
                if(arg.buf->sem_otime != 0)
                {
                    break;
                } 
                sleep(1); 
            }
            if(i == MAX_TRIES)
            {
               perror("semget OK,but semaphore not initialized");    
               return -1;
            }             
        }
        else
        {
            perror("semget Error");
            return -1;
        } 
        initflag = 1;         
        postop.sem_num = 0;        
        postop.sem_op = 1;        
        postop.sem_flag = SEM_UNDO;        
        waitop.sem_num = 0;        
        waitop.sem_op = -1;        
        waitop.sem_flag = SEM_UNDO;
    }
    semop(semid,&waitop, 1); //从信号量中减去1 
}

void my_unlock(int fd)
{
    semop(semid, &postop, 1);    //信号量中加上1 
}

三、     消息队列

消息队列是消息的链接表,存放在内核中并由消息队列标识符标识。说白了,就是一个队列,全局的。

1. 函数

l  int msgget(key_t key, int flag)

头文件:sys/msg.h

参数:key为消息队列标识符,flag为一些配置的参数,如权限等

返回值:如果成功,返回消息队列ID,否则返回-1

l  int msgctl(int msqid, int cmd, struct msqid_ds *buf)

头文件:sys/msg.h

参数:msqid是消息队列ID,cmd说明队列要执行的命令:

IPC_STAT 取此队列的msqid_ds结构,并将它放在buf中。

IPC_SET 将buf指向的设置为队列的msqid_ds配置

IPC_RMID 从系统中删除该消息队列以及仍在该队列中的所有数据。

返回值:如果成功返回0,否则返回-1

l  int msgsnd(int msqid, const void *ptr, size_t nbytes, int flag)

头文件:sys/msg.h

参数:msqid是消息队列ID,ptr为将要发送的mymesg的指针,nbytes为要发送的长度,flag表示发送方式的一些参数,如IPC_NOWAIT等。

有自定义结构体mymesg如下:

struct mymesg{

       long mtype;

       char mtext[512];

}

mtype为发送的消息类型,mtext为发送的数据。

返回值:如果成功,返回0;否则返回-1。

l  ssize_t msgrcv(int msqid, void *ptr, size_t nbytes, long type, int flag)

头文件:sys/msg.h

参数:msqid是消息队列ID,ptr为存放接收数据的buffer指针,type指定我们想要接收哪一种消息,flag指定接收方式的一些方式,如IPC_NOWAIT。

type == 0 返回第一个消息

type > 0  返回队列中消息类型为type的第一个消息

type < 0  返回队列中消息类型值小于等于type绝对值的消息。如果有多个,则返回类型值最小的消息。

返回:如成功,返回接收的数据长度,否则返回-1。

Msgrcv成功执行时,内核更新与该消息队列相关联的msqid_ds结构,以指示调用者的进程ID(msg_lrpid)和调用时间(msg_rtime),并将队列中的消息树(msg_qnum)减1.

2. 实例

一个send进程和一个receive进程,send进程通过用户输入信息和信息类型,向一个消息队列中发送消息,receive进程通过用户输入信息类型,从消息队列中获取相应的消息。

send_test.cpp

#include <iostream.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

#define BUF_SIZE 1024
#define PATH_NAME "."
#define PROJ_ID 32
#define MAX_SIZE 5120

using namespace std;

typedef struct _mymsgbuf
{
    long msgtype;
    char message[BUF_SIZE];
}mymsgbuf;

int main(int argc, char *argv[])
{
    int queueid,i;
    int msglen;
    key_t qkey;
    FILE *fp;
    mymsgbuf msgBuf;
    char input[20];
    char buf[BUF_SIZE];
    
    qKey = ftok(PATH_NAME, PROJ_ID);
    if(qKey < 0)
    {
        perror("ftok error!");
        return -1;
    }
    
    if((queueid = msgget(qKey, IPC_CREAT|0660)) == -1)
    {
        perror("msgget error!");
        return -1;
    }
    
    while(1)
    {
        cout<<"please input the data:"<<endl;
        memset(msgBuf.message, 0, BUF_SIZE);
        fgets(msgBuf.message,BUF_SIZE,stdin);//从标准输入读取字符存至message
        
        cout<<"please input the type:(long)"<<endl;
        memset(input, 0, 20);
        fgets(input, 20, stdin);
        msgBuf.msgtype = atol(input);
        
        msglen = strlen(msgBuf.message) + sizeof(long);
        
        if(msgsnd(queueid,&msgBuf,msglen,0) == -1)
        {
            perror("msgsnd error!\n");
            return -1;
        }

        struct msqid_ds myMsq;
        if(msgctl(queueid, IPC_STAT, &myMsq) == -1)
        {
            perror("msgctl error!");
            return -1;
        }
        cout<<"There is "<<myMsq.msg_qnum<<" messages in queue!"<<endl;

        if(memcmp(msgBuf.message, "end", strlen("end")) == 0)
        {
            msgctl(queueid,IPC_RMID,NULL);
            cout<<"the process quit!"<<endl;
            break;
        }
    }
    
    return 0;
}

receive_test.cpp

#include<iostream.h>
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<fcntl.h>
#include<sys/types.h>
#include<sys/ipc.h>
#include<sys/msg.h>
#include<sys/stat.h>

#define PROJ_ID 32
#define PATH_NAME "."
#define BUF_SIZE 1024

using namespace std;

typedef struct _mymsgbuf
{
    long msgtype;
    char message[BUF_SIZE];
}mymsgbuf;

int main(int argc ,char *argv[])
{
    int queueid;
    int msglen;
    mymsgbuf msgbuf;
    char input[20];
    key_t qKey;
    
    qKey = ftok(PATH_NAME, PROJ_ID);
    if(qKey < 0)
    {
        perror("ftok error!");
        return -1;
    }
    
    if((queueid = msgget(qKey, IPC_CREAT|0660)) == -1)
    {
        perror("msgget eror!\n");
        return -1;
    }
    
    while(1)
    {
        cout<<"please input the type:(long)"<<endl;
        memset(input, 0, 20);
        fgets(input, 20, stdin);
        int msgtype = atol(input);
        memset(msgbuf.message, 0, BUF_SIZE);
        if(msgrcv(queueid, &msgbuf, BUF_SIZE, msgtype, 0) == -1)
        {
            perror("Send msgrcv error!\n");
            exit(1);
        }
        cout<<"I receive data :"<<msgbuf.message<<"type is "<<msgbuf.msgtype<endl;
        struct msqid_ds myMsq;
        if(msgctl(queueid, IPC_STAT, &myMsq) == -1)
        {
            perror("msgctl error!");
            return -1;
        }
        cout<<"There is "<<myMsq.msg_qnum<<" messages in queue!"<<endl;
        
        if(memcmp(msgbuf.message, "end", strlen("end")) == 0)
        {
            cout<<"the process quit!"<<endl;
            break;
        }
    }
    return 0;
}

四、     共享内存

  共享内存允许两个或多个进程共享一块给定的存储区,因为数据不需要在客户进程和服务进程之间进行拷贝,所以它是最快的一种IPC。要注意同步问题,通常,信号量被用来实现对共享存储访问的同步(记录锁业可用于这种场景)。

下面主要介绍System V环境下的共享存储,而另一种,即POSIX下的共享存储暂不介绍。

内核为每个共享存储段设置了一个shmid_ds结构:

struct shmid_ds {

         struct ipc_perm         shm_perm;  /* operation perms */

         size_t       shm_segsz;     /* size of segment (bytes) */

         time_t        shm_atime;  /* last attach time */

         time_t        shm_dtime;  /* last detach time */

         time_t        shm_ctime;  /* last change time */

         pid_t     shm_cpid;    /* pid of creator */

         pid_t     shm_lpid;     /* pid of last operator */

         unsigned short           shm_nattch; /* no. of current attaches */

         unsigned short          shm_unused;      /* compatibility */

         void                     *shm_unused2;  /* ditto - used by DIPC */

         void                     *shm_unused3;  /* unused */

};

1. 函数

l  int shmget(key_t key, size_t size, int flag)

说明:为了获得一个共享存储的标识符

头文件:sys/shm.h

参数:key为键值,可以通过ftok生成,size为共享存储段的长度,如果是引用一个现存的段,则size可以为0,如果是创建一个新段,则必须指定size,flag指明权限设置。

返回值:成功返回共享存储ID,否则返回-1。

l  int shmctl(int shmid, int cmd, struct shmid_ds *buf)

说明:对共享存储段执行多种操作。

头文件:sys/shm.h

参数:shmid为共享存储段ID;cmd为IPC_STAT、IPC_SET、IPC_RMID、SHM_LOCK和SHM_UNLOCK五种命令的一种;buf为进行操作的共享存储段设置属性。

返回值:成功返回0,否则返回-1。

l  void *shmat(int shmid, const void *addr, int flag)

说明:一旦创建了一个共享存储段,进程就可以调用shmat将其连接到它的地址空间中。

头文件:sys/shm.h

参数:shmid为共享存储段的ID

addr为0,则此段连接到由内核选择的第一个可用地址上,这是推荐的方式

addr不为0,且没有指定SHM_RND,则此段连接到addr所指定的地址上

addr不为90,且指定了SHM_RND,则此段连接到(addr-(addr mod ulus SHMLBA))所表示的地址上。SHM_RND表示“取整”的意思。SHMLBA表示“低边界地址倍数”,它总是2的乘方。

返回值:成功返回指向共享存储的指针,否则返回-1。

l  int shmdt(void *addr)

说明:当对公共村春段的操作已经结束时,通过调用该函数进行脱接。经过脱接后,该标识符不会被删除,只有通过调用shmctl,并设置参数为IPC_RMID才能删除该标识符。

头文件:sys/shm.h

参数:addr为事先分配的公共存储段。

返回值:成功返回0,否则返回-1。

2. 代码:一个很好的共享内存与信号量的例子

有三个信号量,一个为写信号量fullid,即如果信号量达到上限,生产者就不能继续往共享存储array中写数据;一个为度信号量emptyid,即如果信号量为0,消费者就不能从共享存储中读数据;最后一个为互斥信号量,即无论是生产者还是消费者,在同一时间只能其中一个进行操作。

(转自http://linux.chinaitlab.com/c/825371.html

#include <iostream.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/types.h>
#include <errno.h>
#include <time.h>
#include <sys/sem.h>
#include <sys/wait.h>

#define MAXSHM  5    
int fullid;
int emptyid;
int mutexid; 

union semun
{
    int val;                           //<= value for SETVAL
    struct semid_ds *buf;              //<= buffer for IPC_STAT & IPC_SET
    unsigned short int *array;         //<= array for GETALL & SETALL
    struct seminfo *__buf;             //<= buffer for IPC_INFO
};

int main(int argc, char *argv[])
{
    int arrayid, getid;    //定义两个共享内存的ID
    int *array, *get;    //定义两个共享内存的地址
    
    /* 创建共享内存 */
    arrayid = shmget(IPC_PRIVATE, sizeof(int) * MAXSHM, IPC_CREAT|0666); 
    getid = shmget(IPC_PRIVATE, sizeof(int), IPC_CREAT|0666);
    
    /* 初始化共享内存 */ 
    array = (int*)shmat(arrayid, 0, 0);
    get = (int*)shmat(getid, 0, 0);
    
    /* 定义信号量数据结构 */
    struct sembuf P, V;
    union semun arg;
    
    /* 创建信号量 */
    fullid = semget(IPC_PRIVATE, 1, IPC_CREAT|0666);
    emptyid = semget(IPC_PRIVATE, 1, IPC_CREAT|0666);
    mutexid = semget(IPC_PRIVATE, 1, IPC_CREAT|0666);
    
    /* 初始化信号量 */
    arg.val = 0;            //初始时缓冲区中无数据
    if(semctl(fullid, 0, SETVAL, arg) == -1)
        perror("semctl setval error");
        
    arg.val = MAXSHM;        //初始时缓冲区中有5个空闲的数组元素 
    if(semctl(emptyid, 0, SETVAL, arg) == -1)
        perror("semctl setval error");
        
    arg.val = 1;
    if(semctl(mutexid, 0, SETVAL, arg) == -1)
        perror("semctl setval error");
        
    /* 初始化 P V 操作 */
    P.sem_num = 0;
    P.sem_op = -1;
    P.sem_flg = SEM_UNDO;
    
    V.sem_num = 0;
    V.sem_op = 1;
    V.sem_flg = SEM_UNDO;
    
    /* 生产者进程 */
    if(fork() == 0)
    {
        int i = 0;
        int set = 0;
        while(i < 10)
        {
            semop(emptyid, &P, 1);
            semop(mutexid, &P, 1);
            
            array[set % MAXSHM] = i + 1;
            printf("Producer put number %d to No.%d\n", array[set%MAXSHM], set%MAXSHM);        
            set++;
            semop(mutexid, &V, 1);
            semop(fullid, &V, 1);
            i++;
        }
        sleep(3);
        printf("Producer if over\n");
        return 0;
    }
    else
    {
        /* 消费者进程A */
        if(fork() == 0)
        {
            while(1)
            {
                if(*get == 10)
                    break;
                    
                semop(fullid, &P, 1);
                semop(mutexid, &P, 1);
                printf("The consumerA get number from No.%d\n", (*get)%MAXSHM);
                (*get)++;
                semop(mutexid, &V, 1);
                semop(emptyid, &V, 1);
                sleep(1);
            }
            printf("ConsumerA is over\n");
            return 0;
        }
        else
        {
            if(fork() == 0)
            {
                while(1)
                {
                    if(*get == 10)
                        break;
                    semop(fullid, &P, 1);
                    semop(mutexid, &P, 1);
                    printf("The consumerB get number from No.%d\n", (*get)%MAXSHM);
                    (*get)++;
                    semop(mutexid, &V, 1);
                    semop(emptyid, &V, 1);
                    sleep(1);
                }
                printf("ConsumerB is over\n");
                return 0;
            }
        }
    }
    /* 父进程返回后回收3个子进程 */
    wait(0);
    wait(0);
    wait(0);
    
    /* 断开并撤销2个共享内存 */
    shmdt(array);
    shmctl(arrayid, IPC_RMID, 0);
    
    shmdt(get);
    shmctl(get, IPC_RMID, 0);
    
    /* 撤销3个信号量集 */
    semctl(emptyid, IPC_RMID, 0);
    semctl(fullid, IPC_RMID, 0);
    semctl(mutexid, IPC_RMID, 0);
    
    return 0;
}
原文地址:https://www.cnblogs.com/geekma/p/2588711.html