消息队列-Message Queue

消息队列: message queue

  消息队列是Linux IPC中常用的一种通信方式。

  消息队列是随内核的持续性,只要内核没有重新自举,Linux系统没有重启,都是一直存在的。

一 posix --- 消息队列

头文件  #include <mqueue.h>

link with -lrt

1. 创建消息队列

#include <fcntl.h>       /* For O_*  constants */
#include <sys/stat.h>    /* For mode constants */

mqd_t  mq_open(const char * name, int oflag);
mqd_t  mq_open(const char * name, int ofalg, mode_t mode, struct mq_attr * attr); 
/**************************************************************************************************************************
(1) name: 消息队列名字, 以 '/' 开始,不能有其它的‘/’
(2) oflag:决定访问(mq_send/mq_receive)消息队列的方式, O_RDONLY O_WRONLY O_RDWR, 可以用 '|' 设定 O_CREAT O_EXCL  O_NONBLOCK , 
    O_CREAT: 如果消息队列不存在,就创建新的消息队列。此时需要参数mode 和 attr。
    O_EXCL:  需要和O_CREAT连用,如果消息队列存在,返回失败,用于检测消息队列是否存在。
    O_NONBLOCK: 决定向消息队列中写入或读取时采用非阻塞方式,比如消息队列已满,此时写入,立即返回失败;消息队列没有消息,此时读取,立即返回失败。
(3) mode UGO(用户组其他人)权限(读,写,可执行),八进制 0664
(4) attr设置为NULL,则为默认属性
struct mq_attr    //至少四个成员
{
  long mq_flags    // 0 或者 O_NONBLOCK, 默认为0  
  long mq_maxmsg   //消息队列最多消息数  ubuntu16.04 默认10
  long mq_msgsize  //消息队列中,消息的最大字节数  ubuntun16.04 默认8192 (8KB)
  long mq_curmsgs  //消息队列中,当前的消息数
};                  
(5) 返回值: 成功 返回消息队列描述符,  失败返回-1并设置errno
*//***********************************************************************************************************************/

 2. 发送消息/接收消息

int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio);
int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio, const struct timespec *abs_timeout);

ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len,unsigned *msg_prio);
ssize_t mq_timedreceive(mqd_t mqdes, char * msg_ptr, size_t msg_len, unsigned * msg_prio, const struct timespec * abs_timeout);

/****************************************************************************************************************************
1. mqdes   消息队列描述符
2. msg_ptr 发送消息/接收消息的缓冲区
3. msg_len 消息体的长度, mq_send中msg_len<=mq_msgsize, mq_receive中msg_len>=mq_msgsize.
4. msg_prio 消息的优先级,值越大,优先级越高,最大为MQ_PRIO_MAX(至少32), 同时一个进程可以打开的最大消息队列数目MQ_OPEN_MAX(至少8)
  mq_receive返回优先级最高的最早消息. 不需要设置优先级,设为0 或 NULL
5. abs_timeout 绝对时间, mq_timedsend, mq_timedreceive限时发送,接收
6. mq_send 成功返回0, 失败返回-1, mq_receive 成功返回消息字节数,失败返回-1
*//*************************************************************************************************************************/

3. 关闭消息队列

/** 关闭描述符和消息队列的联系  成功返回0,失败返回-1 */
int mq_close(mqd_t mqdes);

4. 删除消息队列

/** 从内核中删除名为name的消息队列. 成功返回0,失败返回-1 */
/** 消息队列创建后只有通过调用该函数或者是内核自举才能进行删除。*/
/** 每个消息队列都有一个保存当前打开着描述符数的引用计数器 */
/** 消息队列的销毁会被推迟到所有的引用都被关闭时执行, mq_unlink不会阻塞 */
int mq_unlink(const char *name);

5. 获取/设置消息队列属性

/** 获取消息队列的属性,  成功返回0, 失败-1 */
int mq_getattr(mqd_t mqdes, struct mq_attr *mqstat);

/** 设置消息队列的属性(只有mq_flags有效,其它忽略), 成功返回0, 失败-1 */
int mq_setattr(mqd_t mqdes, const struct mq_attr * mqstat, struct mq_attr * omqstat);
/** mqstat新属性, omqstat保存原来的属性 */

6. 通知进程可以接收一条消息

/** 在调用进程注册一个异步通知函数, 成功返回0,失败-1*/
int mq_notify(mqd_t mqdes, const struct sigevent *notification);

/***********************************************************
1. notification为NULL, 并且这个进程之前注册过了通知函数, 则会取消调用进程之前注册的通知
2. notification不为NULL,在调用进程注册一个异步通知函数.消息队列从空到非空,会通知
3. 任何时候, 一个消息队列只能被一个进程注册通知. 如果之前调用进程或者其他进程已经注册过了,那么随后调用本函数注册通知会返回失败.
只有在没有任何线程阻塞在该队列的mq_receive调用中的前提下,通知才会发出。
*//********************************************************/

 struct sigevent : 链接

二 System V --- 消息队列

头文件 #include <sys/msg.h>

1.创建消息队列

/** 创建消息队列, 成功返回标识符(非负整数, 可以是0), 失败返回-1 【备注:第一次调用该函数的返回值有可能是0,测试结果为没有错误信号errno】*/
int msgget(key_t key, int msgflg);

/******************************************************
1. key IPC键值,  IPC_PRIVATE 或者 其它key值,
不建议使用IPC_PRIVATE,因为最大创建的消息队列数是有限制的,并且消息队列是以内核的持续性,使用IPC_PRIVATE每次都会创建新的消息队列,消耗OS资源. 2. msgflg IPC_CREAT 消息队列不存在就创建(可以和存取权限控制符连用,例如 0664),已存在就打开 IPC_EXCL 和IPC_CREAT连用,消息队列不存在就创建,有就返回一个错误
3. key_t ftok( const char * path, int id );创建一个IPC键值
path: 已存在文件的路径,可以用当前路径 "./"
id: 子序号, 低八位有效,即有效值为1~255,不能为0
path和id都一样,返回的key值也相同
*//***************************************************/

2.发送/接收消息

/***********************************************************
* 消息类型必须是长整型的,而且必须是结构体类型的第一个成员,
* 消息类型下面是消息正文,正文可以有多个成员(正文成员可以是任意数据类型的)。
* 至于消息结构体类型叫什么名字,成员叫什么名字,自行定义,没有明文规定。
*//********************************************************/
typedef struct _msg
{
 long mtype;   // 消息类型
 char mtext[100]; // 消息正文
 //…… ……          // 消息的正文可以有多个成员
}MSG;
/**********************************************************
* 从消息队列中获取消息,获取成功后,该消息从消息队列中删除
* msqid:消息队列标识符
* msgp:存放消息
* msgsz:消息的长度(不包括消息类型的long)
* msgtyp:消息类型
   msgtyp = 0:返回队列中的第一个消息。
    msgtyp > 0:返回队列中消息类型为 msgtyp 的消息(常用)。
    msgtyp < 0:返回队列中消息类型值小于或等于 msgtyp 绝对值的消息,如果这种消息有若干个,则取类型值最小的消息。
  注意:在获取某类型消息的时候,若队列中有多条此类型的消息,则获取最先添加的消息,即先进先出原则。 * msgflg:一般为0 表示如果没有获取消息后的行为, 以下三种行为可以用 | 一起使用 0:msgrcv() 调用阻塞直到接收消息成功为止. MSG_NOERROR: 若返回的消息字节数比 nbytes 字节数多,则消息就会截短到 nbytes 字节,且不通知消息发送进程。 IPC_NOWAIT: 调用进程会立即返回。若没有收到消息则立即返回 -1。 返回值:成功为读取的消息长度,失败-1
*//********************************************************/ ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg); /********************************************************** * 将消息添加到消息队列中 * msqid:消息队列标识符 * msgp:要发送的消息 * msgsz:消息的长度(不包括消息类型的long) * msgflg:一般为0, IPC_NOWAIT(2048 == 1000 0000 0000) if(msgflg&IPC_NOWAIT ==0) 一直阻塞到条件满足 if(msgflg&IPC_NOWAIT !=0) 消息没有立即发送则调用的进程立即返回 * 返回值:成功为0,失败-1 *//********************************************************/ int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

3.删除消息队列

/***********************************************************
消息队列控制
* msqid:消息队列标识符
* cmd:IPC_RMID, IPC_STAT, IPC_SET
    IPC_RMID:删除由 msqid 指示的消息队列,将它从系统中删除并破坏相关数据结构。
    IPC_STAT:将 msqid 相关的数据结构中各个元素的当前值存入到由 buf 指向的结构中。相当于,把消息队列的属性备份到 buf 里。
    IPC_SET:将 msqid 相关的数据结构中的元素设置为由 buf 指向的结构中的对应值。相当于,消息队列原来的属性值清空,再由 buf 来替换。
* buf:存放或更改消息队列的属性
* 返回值:成功为0,失败-1
*//********************************************************/

int msgctl(int msqid, int cmd, struct msqid_ds *buf);
原文地址:https://www.cnblogs.com/blackandwhite/p/12495974.html