linux进程间通信--消息队列

IPC 对象 ---- 消息队列

IPC 对象命令

(1)查看系统中IPC对象
ipcs -a 显示所有的IPC对象
ipcs -s/-q/-m

(2)删除系统中的IPC对象
ipcrm -q/-s/-m ID


1.获得key值

第一种:
key_t ftok(const char *pathname, int proj_id);
参数:
@pathname 已经存在的文件路径
@proj_id 获取这个整数的低8bit
返回值:
成功返回 key值,失败返回-1


第二种:
将key值指定为IPC_PRIVATE ,当IPC对象在亲缘关系进程通信的时候


2.创建IPC对象
int msgget(key_t key, int msgflg);
参数:
@key IPC_PRIVATE 或 ftok
@msgflg IPC_CREAT | 0666 或 IPC_CREAT | IPC_EXCL | 0666 (判断IPC对象是否存在)
返回值:
成功返回ID,失败返回-1

注意:
如果对应key值的IPC对象不存在,则创建,如果存在,直接返回IPC对象的ID

3.发送消息
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
@msqid 消息队列ID
@msgp 需要发送的消息存放的地址
@msgsz 消息正文的大小
@msgflg 0:阻塞的方式发送,调用阻塞直到收到一条相应类型的消息为止

   IPC_NOWAIT:非阻塞方式调用,若在消息队列中没有相应类型的消息可以接收,则函数立即返回

返回值:
成功返回0,失败返回-1

消息结构体定义:

typedef struct{
  long msg_type; //消息类型必须在第一个位置,
  char mtxt[1024];
...
}msg_t;

正文大小:sizeof(msg_t) - sizeof(long)

4.接收消息
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp,int msgflg);
参数:
@msqid 消息队列ID
@msgp 存放接收到的消息
@msgsz 正文大小
@msgtyp 消息类型 , 0: 总是从消息队列中提取第一个消息
@msgflg 0:阻塞的方式接收 IPC_NOWAIT:非阻塞方式调用
返回值:
成功返回 接收消息正文大小,失败返回-1

4.删除消息队列
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
参数:
@msgqid 消息队列
@cmd IPC_RMID(删除消息队列)

    IPC_SET(设置消息队列的属性信息)设置消息队列的数据结构msgid_ds的ipc_perm域(IPC操作权限描述结构)值,这个值取自buf参数

           IPC_STAT(获取消息队列属性信息)读取消息队列的数据结构msgid_ds,并将其存放在buf指定的的地址中
@buf 存放消息队列属性
返回值:
成功返回0,失败返回-1

实例如下:

msg_recv.c

typedef struct{
  long msg_type;
  char mtxt[1024];
}msg_t;

#define MSG_TYPE 100
#define MSG_MTXT_LEN (sizeof(msg_t) - sizeof(long))

//./a.out filename
int main(int argc, const char *argv[])
{
  int ret;
  key_t key;
  int msgid;
  msg_t msg;

if(argc < 2){
  fprintf(stderr,"Usage : %s <filename> ",argv[0]);
  exit(EXIT_FAILURE);
  }

  key = ftok(argv[1],'k');
  if(key < 0){
    perror("Fail to ftok");
    exit(EXIT_FAILURE);
  }

  msgid = msgget(key,IPC_CREAT | 0666);
  if(msgid < 0){
    perror("Fail to msgget");
    exit(EXIT_FAILURE);
  }

while(1)
{
  ret = msgrcv(msgid,&msg,MSG_MTXT_LEN,MSG_TYPE,0);
  if(ret < 0){
    perror("Fail to msgsnd");
    exit(EXIT_FAILURE);
  }

  printf("----------------------- ");
  printf("TYPE : %ld ",msg.msg_type);
  printf("MTXT : %s ",msg.mtxt);
  printf("----------------------- ");

  if(strcmp(msg.mtxt,"quit") == 0){
    break;
  }
}

return 0;
}

msg_send.c如下:

typedef struct{
  long msg_type;
  char mtxt[1024];
}msg_t;

#define MSG_TYPE 100
#define MSG_MTXT_LEN (sizeof(msg_t) - sizeof(long))

//./a.out filename
int main(int argc, const char *argv[])
{
  int ret;
  key_t key;
  int msgid;
  msg_t msg;

  if(argc < 2){
    fprintf(stderr,"Usage : %s <filename> ",argv[0]);
    exit(EXIT_FAILURE);
  }

  key = ftok(argv[1],'k');
  if(key < 0){
    perror("Fail to ftok");
    exit(EXIT_FAILURE);
  }

  msgid = msgget(key,IPC_CREAT | 0666);
  if(msgid < 0){
    perror("Fail to msgget");
    exit(EXIT_FAILURE);
  }

  while(1)
  {
    printf("input>");
    msg.msg_type = MSG_TYPE;
    fgets(msg.mtxt,sizeof(msg.mtxt),stdin);
    msg.mtxt[strlen(msg.mtxt) - 1] = '';

    ret = msgsnd(msgid,&msg,MSG_MTXT_LEN,0);
    if(ret < 0){
      perror("Fail to msgsnd");
      exit(EXIT_FAILURE);
    }

    if(strcmp(msg.mtxt,"quit") == 0){
      break;
    }
  }

  if(msgctl(msgid,IPC_RMID,NULL) < 0){
  perror("Fail to msgctl");
  exit(EXIT_FAILURE);
  }

  return 0;
}

编译msg_send并运行结果:

编译运行msg_recv结果:

可能有人对msg_type不是很理解

通过下面的例子你可能更能理解

msg_a.c如下

typedef struct{
  long msg_type;
  char mtxt[1024];
}msg_t;

#define MSG_TYPE1 100
#define MSG_TYPE2 200

#define MSG_MTXT_LEN (sizeof(msg_t) - sizeof(long))

void recv_msg(int msgid,int msg_type)
{
  int ret;
  msg_t msg;

while(1)
{
  ret = msgrcv(msgid,&msg,MSG_MTXT_LEN,msg_type,0);
  if(ret < 0){
    perror("Fail to msgsnd");
    exit(EXIT_FAILURE);
    }

  printf("----------------------- ");
  printf("TYPE : %ld ",msg.msg_type);
  printf("MTXT : %s ",msg.mtxt);
  printf("----------------------- ");

  if(strcmp(msg.mtxt,"quit") == 0){
    break;
  }
 }
}

void send_msg(int msgid,int msg_type)
{
  int ret;
  msg_t msg;

  while(1)
  {
    printf("input>");
    msg.msg_type = msg_type;
    fgets(msg.mtxt,sizeof(msg.mtxt),stdin);
    msg.mtxt[strlen(msg.mtxt) - 1] = '';

    ret = msgsnd(msgid,&msg,MSG_MTXT_LEN,0);
    if(ret < 0){
      perror("Fail to msgsnd");
      exit(EXIT_FAILURE);
    }

    if(strcmp(msg.mtxt,"quit") == 0){
      break;
    }
  }
}

void *thread_send_msg(void *arg)
{
  int msgid = *(int *)arg;

  send_msg(msgid,MSG_TYPE1);
}

//./a.out filename
int main(int argc, const char *argv[])
{
    int ret;
    key_t key;
    int msgid;
    pthread_t tid;

    if(argc < 2){
      fprintf(stderr,"Usage : %s <filename> ",argv[0]);
      exit(EXIT_FAILURE);
    }

    key = ftok(argv[1],'k');
    if(key < 0){
      perror("Fail to ftok");
      exit(EXIT_FAILURE);
    }

    msgid = msgget(key,IPC_CREAT | 0666);
    if(msgid < 0){
      perror("Fail to msgget");
      exit(EXIT_FAILURE);
     }

    //创建进程或线程,让一个进程或线程去发送或接收消息
    ret = pthread_create(&tid,NULL,thread_send_msg,&msgid);
    if(ret != 0){
      fprintf(stderr,"Fail to pthread_create : %s! ",strerror(ret));
      exit(EXIT_FAILURE);
    }

    recv_msg(msgid,MSG_TYPE2);

    return 0;
}

msg_b.c如下

typedef struct{
  long msg_type;
  char mtxt[1024];
}msg_t;

#define MSG_TYPE1 100
#define MSG_TYPE2 200

#define MSG_MTXT_LEN (sizeof(msg_t) - sizeof(long))

void recv_msg(int msgid,int msg_type)
{
   int ret;
   msg_t msg;

  while(1)
  {
    ret = msgrcv(msgid,&msg,MSG_MTXT_LEN,msg_type,0);
    if(ret < 0){
      perror("Fail to msgsnd");
      exit(EXIT_FAILURE);
    }

    printf("----------------------- ");
    printf("TYPE : %ld ",msg.msg_type);
    printf("MTXT : %s ",msg.mtxt);
    printf("----------------------- ");

    if(strcmp(msg.mtxt,"quit") == 0){
      break;
    }
  }
}

void send_msg(int msgid,int msg_type)
{
    int ret;
    msg_t msg;

    while(1)
    {
      printf("input>");
      msg.msg_type = msg_type;
      fgets(msg.mtxt,sizeof(msg.mtxt),stdin);
      msg.mtxt[strlen(msg.mtxt) - 1] = '';

      ret = msgsnd(msgid,&msg,MSG_MTXT_LEN,0);
      if(ret < 0){
        perror("Fail to msgsnd");
        exit(EXIT_FAILURE);
      }

    if(strcmp(msg.mtxt,"quit") == 0){
        break;
      }
    }
}

void *thread_send_msg(void *arg)
{
    int msgid = *(int *)arg;

    send_msg(msgid,MSG_TYPE2);
}

//./a.out filename
int main(int argc, const char *argv[])
{
    int ret;
    key_t key;
    int msgid;
    pthread_t tid;

    if(argc < 2){
      fprintf(stderr,"Usage : %s <filename> ",argv[0]);
      exit(EXIT_FAILURE);
    }

    key = ftok(argv[1],'k');
    if(key < 0){
      perror("Fail to ftok");
      exit(EXIT_FAILURE);
    }

      msgid = msgget(key,IPC_CREAT | 0666);
    if(msgid < 0){
      perror("Fail to msgget");
      exit(EXIT_FAILURE);
     }

    //创建进程或线程,让一个进程或线程去发送或接收消息
    ret = pthread_create(&tid,NULL,thread_send_msg,&msgid);
    if(ret != 0){
      fprintf(stderr,"Fail to pthread_create : %s! ",strerror(ret));
      exit(EXIT_FAILURE);
     }

    recv_msg(msgid,MSG_TYPE1);

    return 0;
}

 功能:分别开启一个线程去发送msg_type等于100,200的消息

编译并运行msg_a.c

编译并运行msg_b.c

 

原文地址:https://www.cnblogs.com/bwbfight/p/9294451.html