四十九、进程间通信——System V IPC 之消息队列

49.1 System V IPC 介绍

49.1.1 System V IPC 概述

  • UNIX 系统存在信号、管道和命名管道等基本进程间通讯机制
  • System V 引入了三种高级进程间通信机制
    • 消息队列、共享内存和信号量
  • IPC 对象(消息队列、共享内存和信号量)存在于内核中而不是文件系统中,由用户控制释放(用户管理 ipc 对象的生命周期),不像管道的释放由内核控制
  • IPC 对象通过其标识符来引用和访问,所有 IPC 对象在内核空间中有唯一性标识 ID,在用户空间中的唯一性标识称为 key。
  • Linux IPC 继承自 System V IPC

49.1.2 System V IPC 对象的访问

  • IPC 对象是全局对象
    • 可用 ipcs, ipcm 等命令查看或者删除
  • 每个 IPC 对象都由 get 函数创建
    • msgger, shmget, semget
    • 调用 get 函数时必须指定关键字 key

49.1.3 IPC 对象的权限和所有者结构体

  

49.1.4 消息队列

  • 消息队列是内核中的一个链表
  • 用户进程将数据传输到内核后,内核重新添加一些如用户 ID、组 ID、读写进程的 ID 和优先级等相关信息后,并打成一个数据包称为消息
  • 允许一个或者多个进程往消息队列中写消息和读消息,但一个消息只能被一个进程读取,读取完毕后就自动删除
  • 消息队列具有一定的 FIFO 的特性,消息可以按照顺序发送到队列中,也可以几种不同的方式从队列中读取。每一个消息队列在内核中用一个唯一的 IPC 标识 ID 表示
  • 消息队列的实现包括创建和打开队列、发送消息、读取消息和控制消息队列四种操作

49.1.5 消息队列的属性、打开、创建和控制

(1)消息队列属性

  

(2)打开或创建消息队列

1 #include <sys/msg.h>
2 int msgget(key_t key, int flag);
  • 函数参数:
    • key:用户指定的消息队列键值
    • flag:IPC_CREAT、IPC_EXCL 等权限组合
  • 返回值:
    • 成功,则返回内核中消息队列的标识 ID,出错,则返回 -1
  • 若创建消息队列,key 可指定键值,也可将之设置为 IPC_PRIVATE。若打开进行查询,则 key 不能为 0,必须是一个非零的值,否则是查询不到的。

(3)消息队列控制

  

  • 函数参数:
    • msgid:消息队列 ID
    • buf:消息队列属性指针
    • cmd
      • IPC_STAT:获取消息队列的属性,取此队列的 msqid_ds 结构,并将其存放在 buf 指向的结构中
      • IPC_SET:设置属性,按由 buf 指向的结构中的值,设置与此队列相关的结构中的字段
      • IPC_RMID:删除队列,从系统中删除该消息队列以及仍在该队列上的所有数据

(4)消息队列的发送和接收

  

  msgp 参数是一个指向 msgbuf 的指针

  

  • 函数参数
    • msgid:消息队列 ID
    • msgq:最终要发送(接受)的数据
      • mtype:指消息的类型,它由一个整数来代表,并且它只能是大于 0 的整数;可以通过此数据接受特定的消息
      • mtext:消息数据本身,可以是文本类型,也可以是二进制等
      • 在 Linux 中,消息的最大长度是 4096 个字节,其中包括 mtype,它占有 4 个字节
      • 结构体 msgbuf 用户可自定义,但是第一个成员必须是 mtype
    • msgsz:指定消息的大小,不包括 mtype 的大小,仅是消息的大小
      • msgsz = sizeof(struct mymesg) - sizeof(long)
    • msgtyp:消息类型
      • msgtyp = 0:获得消息队列中第一个消息
      • msgtyp > 0:获得消息队列中类型为 type 的第一个消息
      • msgtyp < 0:获得消息队列中小于或等于 msgtyp 绝对值的消息(类型最小的)
    • msgflag:
      • 0:阻塞
      • IPC_NOWAIT:类似于文件 I/O 的非阻塞 I/O 标识
      • 若消息队列已满(或者是队列中的消息总数等于系统限制值,或队列中的字节总数等于系统限制值),则指定 IPC_NOWAIT 使得 msgsnd 立即出错返回 EAGAIN。如果指定0,则进程:
        • 阻塞直到有空间可以容纳要发送的消息
        • 或从系统中删除了此队列
        • 或捕捉到一个信号,并从信号处理程序返回

49.2 例子

49.2.1 消息队列发送

  msg_snd.c

 1 #include <string.h>
 2 #include <stdio.h>
 3 #include <stdlib.h>
 4 #include <sys/msg.h>
 5 
 6 typedef struct {
 7     long    type;///< 消息数据
 8     int     start;///< 消息数据本身(包括start 和 end)
 9     int     end;
10 }MSG;
11 
12 /** 往消息队列中发送消息 */
13 int main(int argc, char *argv[])
14 {
15     if(argc < 2){
16         printf("usage: %s key
", argv[0]);
17         return 1;
18     }
19 
20     key_t key = atoi(argv[1]);
21     printf("key: %d
", key);
22 
23     /** 创建消息队列 */
24     int msg_id;
25     if((msg_id = msgget(key, IPC_CREAT | IPC_EXCL | 0777)) < 0){
26         perror("msgget error");
27     }
28 
29     printf("msg id: %d
", msg_id);
30 
31     /** 定义要发送的消息 */
32     MSG m1 = {4, 4, 400};
33     MSG m2 = {2, 2, 200};
34     MSG m3 = {1, 1, 100};
35     MSG m4 = {6, 6, 600};
36     MSG m5 = {6, 60, 6000};
37 
38     /** 发送消息到消息队列 */
39     if(msgsnd(msg_id, &m1, sizeof(MSG) - sizeof(long), IPC_NOWAIT) < 0){
40         perror("msgsnd error");
41     }
42 
43     if(msgsnd(msg_id, &m2, sizeof(MSG) - sizeof(long), IPC_NOWAIT) < 0){
44         perror("msgsnd error");
45     }
46 
47     if(msgsnd(msg_id, &m3, sizeof(MSG) - sizeof(long), IPC_NOWAIT) < 0){
48         perror("msgsnd error");
49     }
50 
51     if(msgsnd(msg_id, &m4, sizeof(MSG) - sizeof(long), IPC_NOWAIT) < 0){
52         perror("msgsnd error");
53     }
54 
55     if(msgsnd(msg_id, &m5, sizeof(MSG) - sizeof(long), IPC_NOWAIT) < 0){
56         perror("msgsnd error");
57     }
58 
59     /** 发送后去获得消息队列中消息的总数 */
60     struct msqid_ds ds;
61     if(msgctl(msg_id, IPC_STAT, &ds) < 0){
62         perror("msgctl error");
63     }
64 
65     printf("msg total: %ld
", ds.msg_qnum);
66 
67     return 0;
68 }

  编译运行:

  

  

  删除消息队列:

  

49.2.2 消息队列接收

  msg_rcv.c

 1 #include <stdio.h>
 2 #include <stdlib.h>
 3 #include <string.h>
 4 #include <sys/msg.h>
 5 
 6 typedef struct {
 7     long    type;
 8     int     start;
 9     int     end;
10 }MSG;
11 
12 int main(int argc, char *argv[])
13 {
14     if(argc < 3){
15         printf("usage: %s key type
", argv[0]);
16         return 1;
17     }
18 
19     int key = atoi(argv[1]);
20     long type = atoi(argv[2]);
21 
22     /** 获得指定的消息队列 */
23     int msg_id;
24     if((msg_id = msgget(key, 0777)) < 0){
25         perror("msgget error");
26     }
27     printf("msg id: %d
", msg_id);
28 
29     /** 从消息队列中接收指定类型的消息 */
30     MSG m;
31     if(msgrcv(msg_id, &m, sizeof(MSG) - sizeof(long), type, IPC_NOWAIT) < 0){
32         perror("msgrcv error");
33     }
34     else{
35         printf("type: %ld start: %d end: %d
", m.type, m.start, m.end);
36     }
37 
38     return 0;
39 }

  编译后,与 msg_snd 联调

  

  

  再次接收 6 的时候,已经没有消息了

  

  消息队列还存在,但是消息都已经被接受,接受一条消息后,内核就会自动删除那条消息

原文地址:https://www.cnblogs.com/kele-dad/p/10301066.html