生产者与消费者模式(理解) 进程间通信之消息队列编程

                                                                           

通信之消息队列编程

1:生产者和消费者模式理解

(1)       生产者/消费者模式:需要使用到同步,以及线程,属于多并发行列,产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。 单单抽象出生产者和消费者,还够不上是生产者/消费者模式。该模式还需要有一个缓冲区处于生产者和消费者之间,作为一个中介。生产者把数据放入缓冲区,而消费者从缓冲区取出数据。

(2)       解耦:假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化,可能会影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。

(3)       支持并发:生产者直接调用消费者的某个方法,还有另一个弊端。由于函数调用是同步的(或者叫阻塞的),在消费者的方法没有返回之前,生产者只好一直等在那边。万一消费者处理数据很慢,生产者就会白白糟蹋大好时光。 使用了生产者/消费者模式之后,生产者和消费者可以是两个独立的并发主体(常见并发类型有进程和线程两种)。生产者把制造出来的数据往缓冲区一丢,就可以再去生产下一个数据。基本上不用依赖消费者的处理速度。其实当初这个模式,主要就是用来处理并发问题的。

(4)       支持忙闲不均:

缓冲区还有另一个好处。如果制造数据的速度时快时慢,缓冲区的好处就体现出来了。当数据制造快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中。等生产者的制造速度慢下来,消费者再慢慢处理掉。

(5)       总共理解:生产者生产数据,消费者消费数据,生产者和消费者通过缓存联系起来,是通过消息队列来连接共享队列里面的东西,在队列为空的时候,消费者无法消费signal.notify() //通知生产者,反之对于生产者来是通知消费者来进行消费。

2:POSIX消息队列

(1) POSIX消息队列是独立于XSI消息队列的一套新的消息队列API,让进程可以用消息的方式进行数据交换。

//包含的头文件

#include <fcntl.h>           /* For O_* constants */

#include <sys/stat.h>        /* For mode constants */

#include <mqueue.h>

 

(2)打开或者创建消息队列。

mqd_t mq_open(const char *name, int oflag);

mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);

https://www.cnblogs.com/LubinLew/p/POSIX-mq_open.html

//解释 mq_open这个函数的

(3)发送或者接受消息队列。

int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio);

 

int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio, const struct timespec *abs_timeout);

 

ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio);

 

ssize_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio, const struct timespec *abs_timeout);

 

(4)int mq_close(mqd_t mqdes);  关闭一个消息队列。

我们可以使用mq_close来关闭一个消息队列,这里的关闭并非删除了相关文件,关闭之后消息队列在系统中依然存在,我们依然可以继续打开它使用。这跟文件的close和unlink的概念是类似的。

(5)int mq_unlink(const char *name);  使用mq_unlink真正删除一个消息队列。

(6)使用mq_getattr和mq_setattr来查看和设置消息队列。

int mq_getattr(mqd_t mqdes, struct mq_attr *attr);

int mq_setattr(mqd_t mqdes, const struct mq_attr *newattr, struct mq_attr *oldattr);

 

(7)   mq_attr结构体是这样的结构:

struct mq_attr {

    long mq_flags;       /* 只可以通过此参数将消息队列设置为是否非阻塞O_NONBLOCK */

    long mq_maxmsg;      /* 消息队列的消息数上限 */

    long mq_msgsize;     /* 消息最大长度 */

    long mq_curmsgs;     /* 消息队列的当前消息个数 */

};

(8)//挂载消息队列

编译posix mqueue时,要连接运行时库(runtime library),既-lrt选项,

//把消息队列挂载到 /dev/mqueue 下面

gcc -lrt -lpthread recv.c //

编译的时候需要链接一些库,所以我们可以创建Makefile

CFLAGS+=-lrt –lpthread

(9)创建消息队列和接收消息队列

include <fcntl.h>
#include <sys/stat.h>        /* For mode constants */
#include <mqueue.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>

#define MQNAME "/mqtest"


int main(int argc, char *argv[])
{
    mqd_t mqd;
    int ret;
    if (argc != 3) {
        fprintf(stderr, "Argument error!
");
        exit(1);
    }

mqd = mq_open(MQNAME, O_RDWR|O_CREAT, 0600, NULL);

    if (mqd == -1) {
        perror("mq_open()");
        exit(1);
    }

    ret = mq_send(mqd, argv[1], strlen(argv[1]), atoi(argv[2]));
    if (ret == -1) {
        perror("mq_send()");
        exit(1);
    }

    exit(0);
}

// gcc -lrt –lpthread 动态链接库来编译 
// 存入消息队列
./send zorro 1
./send shrek 2
./send jerry 3
./send zzzzz 1
./send ssssss 2
./send jjjjj 3

cat /dev/mqueue/mqtest  查看消息队列的状态 QSIZE:31 NOTIFY:0     SIGNO:0     NOTIFY_PID:0

 

(10)接收消息:

#include <fcntl.h>
#include <sys/stat.h>        /* For mode constants */
#include <mqueue.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>

#define MQNAME "/mqtest"


int main()
{

    mqd_t mqd;
    int ret;
    int val;
    char buf[BUFSIZ];

    mqd = mq_open(MQNAME, O_RDWR);
    if (mqd == -1) {
        perror("mq_open()");
        exit(1);
    }

    ret = mq_receive(mqd, buf, BUFSIZ, &val);
    if (ret == -1) {
        perror("mq_send()");
        exit(1);
    }

    ret = mq_close(mqd);
    if (ret == -1) {
        perror("mp_close()");
        exit(1);
    }

    printf("msq: %s, prio: %d
", buf, val);

    exit(0);
}

  

(10)删除这个消息队列

#include <fcntl.h>
#include <sys/stat.h>        /* For mode constants */
#include <mqueue.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#define MQNAME "/mqtest"
int main()
{

    int ret;
    ret = mq_unlink(MQNAME); //删除
    if (ret == -1) {
        perror("mp_unlink()");
        exit(1);
    }
    exit(0);
}

(11)异步通知机制  使用这个机制,我们就可以让队列在由空变成不空的时候触发一个异步事件,通知调用进程,以便让进程可以在队列为空的时候不用阻塞等待。

int mq_notify(mqd_t mqdes, const struct sigevent *sevp);

#include <pthread.h>
#include <mqueue.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>

static mqd_t mqdes;

void mq_notify_proc(int sig_num)
{
    /* mq_notify_proc()是信号处理函数,
    当队列从空变成非空时,会给本进程发送信号,
    触发本函数执行。 */

    struct mq_attr attr;
    void *buf;
    ssize_t size;
    int prio;
    struct sigevent sev;

    /* 我们约定使用SIGUSR1信号进行处理,
    在此判断发来的信号是不是SIGUSR1。 */
    if (sig_num != SIGUSR1) {
        return;
    }

    /* 取出当前队列的消息长度上限作为缓存空间大小。 */
    if (mq_getattr(mqdes, &attr) < 0) {
        perror("mq_getattr()");
        exit(1);
    }

    buf = malloc(attr.mq_msgsize);
    if (buf == NULL) {
        perror("malloc()");
        exit(1);
    }

    /* 从消息队列中接收消息。 */
    size = mq_receive(mqdes, buf, attr.mq_msgsize, &prio);
    if (size == -1) {
        perror("mq_receive()");
        exit(1);
    }

    /* 打印消息和其优先级。 */
    printf("msq: %s, prio: %d
", buf, prio);

    free(buf);

    /* 重新注册mq_notify,以便下次可以出触发。 */
    sev.sigev_notify = SIGEV_SIGNAL;
    sev.sigev_signo = SIGUSR1;
    if (mq_notify(mqdes, &sev) == -1) {
        perror("mq_notify()");
        exit(1);
    }

    return;
}

int main(int argc, char *argv[])
{
    struct sigevent sev;

    if (argc != 2) {
        fprintf(stderr, "Argument error!
");
        exit(1);
    }

    /* 注册信号处理函数。 */
    if (signal(SIGUSR1, mq_notify_proc) == SIG_ERR) {
        perror("signal()");
        exit(1);
    }

    /* 打开消息队列,注意此队列需要先创建。 */
    mqdes = mq_open(argv[1], O_RDONLY);
    if (mqdes == -1) {
        perror("mq_open()");
        exit(1);
    }

    /* 注册mq_notify。 */
    sev.sigev_notify = SIGEV_SIGNAL;
    sev.sigev_signo = SIGUSR1;
    if (mq_notify(mqdes, &sev) == -1) {
        perror("mq_notify()");
        exit(1);
    }

    /* 主进程每秒打印一行x,等着从消息队列发来异步信号触发收消息。 */
    while (1) {
        printf("x
");
        sleep(1);
    }
}

  

  

 

 

 

原文地址:https://www.cnblogs.com/love-life-insist/p/10543011.html