UNIX IPC: POSIX 消息队列

首先在我的MAC OSX上试了一下虽然有_POSIX_MESSAGE_PASSING的宏定义,但是用gcc编译会提示没有mqueue.h头文件,先放一边。在Ubuntu上使用正常,不过POSIX消息队列通过ipcs命令是看不到的,需要通过如下方式进行查看:

mount -t mqueue none /mnt
ls -al /mnt

ls列出的文件即为程序中创建的POSIX消息队列,消息队列的名称需要以“/”开头否则会提示参数非法。通过cat查看这个文件可以知道这个队列的一些参数如:

hgf@ubuntu:~/ipc$ cat /mnt/testmq

QSIZE:0 NOTIFY:0 SIGNO:0 NOTIFY_PID:0

编译代码时需要加上-lrt参数,即连接Posix.1 realtime库,

基本操作API

mqd_t mq_open(const char* name, int oflag, ... /* mode_t mode, struct mq_attr attr */);

int mq_close(mqd_t mqdes);

int mq_unlink(const char* name);

int mq_getattr(mqd_t mqdes, struct mq_attr* attr);

int mq_setattr(mqd_t mqdes, struct mq_attr* attr, struct mq_attr* oattr);

int mq_send(mqd_t mqdes, const char* buf, size_t len, unsigned int prior);

int mq_receive(mqd_t mqdes, const char* buf, size_t len, unsigned int* priop);

mqcreate.c (gcc mqcreate.c -lrt -o mqcreate)

#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <mqueue.h>
#include <unistd.h>
#include <getopt.h>

struct mq_attr attr;

int main(int argc, char** argv) {
    char opt;
    
    char* MQ_NAME = "/testmq";
    
    int OPEN_MODE = O_RDWR | O_CREAT;
    int FILE_MODE = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
    

    while((opt = getopt(argc, argv, "en:m:z:")) != -1) {
        switch(opt) {
            case 'e':
                OPEN_MODE |= O_EXCL;
                break;
            case 'm':
                attr.mq_maxmsg = atol(optarg);
                break;
            case 'z':
                attr.mq_msgsize = atol(optarg);
                break;
            case 'n':
                MQ_NAME = optarg;
                break;
        }
    }

    printf("msg queue name:%s
msg max size: %ld
msg per size: %ld
",
        MQ_NAME, attr.mq_maxmsg, attr.mq_msgsize);
    
    struct mq_attr* attrarg = NULL;
    if (attr.mq_maxmsg != 0) {
        attrarg = &attr;
    }

    mqd_t mq = mq_open(MQ_NAME, OPEN_MODE, FILE_MODE, attrarg);
    
    if (mq < 0) {
        perror("message queue create failed:");
        return 0;
    }
    
    return 0;
}

mqunlink.c (gcc mqunlink.c -lrt -o mqunlink)

#include <stdio.h>
#include <stdlib.h>

#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <mqueue.h>

int main(int argc, char** argv) {
    char* MQ_NAME = "/testmq";

    if (argc > 1) {
        MQ_NAME = argv[1];
    }

    int res = mq_unlink(MQ_NAME);

    if (res < 0) {
        perror("close message queue:");
    }

    return 0;
}

mqgetattr.c

#include <stdio.h>
#include <stdlib.h>

#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <mqueue.h>

int main(int argc, char** argv) {
    char* name = "/testmq";
    
    if (argc > 1) {
        name = argv[1];
    }
    
    mqd_t mq = mq_open(name, O_RDONLY);
    
    if (mq < 0) {
        perror("open message queue failed:");
        return 0;
    }

    struct mq_attr attr;

    mq_getattr(mq, &attr);

    printf("max #msgs = %ld, max #bytes/msg = %ld, #current = %ld
",
        attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs
    );

    int res = mq_close(mq);

    if (res != 0) {
        perror("close message queue failed:");
    }
    return 0;
}

mqsend.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <mqueue.h>

int main(int argc, char** argv) {
    mqd_t mqd;
    void *ptr;
    size_t len;
    unsigned int prio;

    if (argc != 4) {
        printf("%s msg_queue_name message priority
", argv[0]);
        return -1;
    }
    
    mqd = mq_open(argv[1], O_WRONLY);
    
    if (mqd < 0) {
        perror("open message queue failed");
        return -1;
    }

    len = strlen(argv[2]) + 1;

    ptr = malloc(len);
    
    prio= atoi(argv[3]);

    strcpy(ptr, argv[2]);

    int res = mq_send(mqd, ptr, len, prio);
    
    if (res != 0) {
        perror("send msg faild");
        return -1;
    }

    return 0;
}

mqreceive.c

#include <stdio.h>
#include <stdlib.h>

#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <mqueue.h>
#include <getopt.h>

int main(int argc, char** argv) {
    
    char opt;

    char* queue = "/testmq";

    int FILE_MODE = O_RDONLY;

    while ((opt = getopt(argc, argv, "xn:")) != -1) {
        switch (opt) {
            case 'n':
                queue = optarg;
                break;
            case 'b':
                FILE_MODE |= O_NONBLOCK;
                break;
        }
    }

    mqd_t mqd = mq_open(queue, FILE_MODE);

    if (mqd < 0) {
        perror("open message queue failed");
        return -1;
    }
    

    struct mq_attr attr;
    char* buffer = NULL;
    int prio;

    mq_getattr(mqd, &attr);
    buffer = malloc(attr.mq_msgsize + 1);
    
    for(;;) {
        int res = mq_receive(mqd, buffer, attr.mq_msgsize, &prio);
        if (res < 0) {
            perror("read msg fail");
            return;
        }

        printf("read msg size:%u, msg(%s), prio:%d
", res, buffer, prio);
        if (FILE_MODE & O_NONBLOCK) {
            sleep(1);
            printf(".");
        }
    }
    return 0;
}
原文地址:https://www.cnblogs.com/lailailai/p/4317031.html