Untitled

Untitled

PHP 进程间通信——消息队列

本文不涉及PHP基础库安装。详细安装说明,请参考官网,或期待后续博客分享。

1、消息队列函数准备

<?php
//生成一个消息队列的key
$msg_key = ftok(__FILE__, 'a');
//产生一个消息队列
$msg_queue = msg_get_queue($msg_key, 0666);
//检测一个队列是否存在 ,返回boolean值
$status = msg_queue_exists($msg_key);
//可以查看当前队列的一些详细信息
$message_queue_status = msg_stat_queue($msg_queue);

//将一条消息加入消息队列
msg_send($msg_queue, 1, "Hello, 1");
msg_send($msg_queue, 1, 'Hello, 2');
msg_send($msg_queue, 1, "Hello, 3");

//从消息队列中读取一条消息。
msg_receive($msg_queue, 1, $message_type, 1024, $message1);
msg_receive($msg_queue, 1, $message_type, 1024, $message2);
msg_receive($msg_queue, 1, $message_type, 1024, $message3);

//移除消息队列
msg_remove_queue($msg_queue);
echo $message1.PHP_EOL;
echo $message2.PHP_EOL;
echo $message3.PHP_EOL;


/**
* msg_send 有三个必选参数
* resource $queue ,
* int $msgtype ,
* mixed $message
*
* 第一个必须要是队列资源类型。resource(4) of type (sysvmsg queue)
* 第二个参数是消息类型,一个整形,且必须大于0.
* msg_send() sends a message of type msgtype (which MUST be greater than 0) to the message queue specified by queue.
* 第三个参数。是要发送的信息。可以是字符串,也可以是数组。默认会被serialize.
*/



/**
* msg_receive 的参数比较多。必须要填的参数有5个。
* resource $queue ,
* int $desiredmsgtype ,
* int &$msgtype ,
* int $maxsize ,
* mixed &$message
*
* 其中$desiredmsgtype .经过测试和官网描述不符,暂不解释。
*
* $msgtype 。这个是msg_send 中所选定的msg_type.这是一个引用参数。
* The type of the message that was received will be stored in this parameter.
*
* $maxsize。
* The maximum size of message to be accepted is specified by the maxsize;
* if the message in the queue is larger than this size the function will fail (unless you set flags as described below).
* 这个参数声明的是一个最大的消息大小,如果超过则会报错。
*
* $message.
* 上文msg_send 发送的消息类型。
*/

2、多进程通信实例

<?php
/**
* 这段代码模拟了一个日常的任务。
* 第一个父进程产生了一个子进程。子进程又作为父进程,产生10个子进程。
* 可以简化为A -> B -> c,d,e... 等进程。
* 作为A来说,只需要生产任务,然后交给B 来处理。B 则会将任务分配给10个子进程来进行处理。
*
*/


//设定脚本永不超时
set_time_limit(0);
$ftok = ftok(__FILE__, 'a');
$msg_queue = msg_get_queue($ftok);
$pidarr = [];

//产生子进程
$pid = pcntl_fork();
if ($pid) {
//父进程模拟生成一个特大的数组。
$arr = range(1,100000);

//将任务放进队里,让多个子进程并行处理
foreach ($arr as $val) {
$status = msg_send($msg_queue,1, $val);
usleep(1000);
}
$pidarr[] = $pid;
msg_remove_queue($msg_queue);
} else {
//子进程收到任务后,fork10个子进程来处理任务。
for ($i =0; $i<10; $i++) {
$childpid = pcntl_fork();
if ($childpid) {
$pidarr[] = $childpid; //收集子进程processid
} else {
while (true) {
msg_receive($msg_queue, 0, $msg_type, 1024, $message);
if (!$message) exit(0);
echo $message.PHP_EOL;
usleep(1000);
}
}
}
}

//防止主进程先于子进程退出,形成僵尸进程
while (count($pidarr) > 0) {
foreach ($pidarr as $key => $pid) {
$status = pcntl_waitpid($pid, $status);
if ($status == -1 || $status > 0) {
unset($pidarr[$key]);
}
}
sleep(1);
}
?>

以上的示例只是为了说明多进程通信的应用示例,并未在真实的项目中应用。为了示例方便,省略了很多的校验条件。但作为了解过程及原理来说,并不影响。
在执行while 循环时候,必须要使用usleep(1000) 以上。否则CPU可能会被撑爆。
以上的多进程通信,没有产生僵尸进程。得益于最后一段的while循环。
其原理在于,父进程在每次循环的时候都检测子进程是否退出。如果退出,则父进程就会回收该子进程。并且将该进程从进程列表中删除。
可以使用ps aux |grep process.php来查看当前产生的进程数量。 其中process.php 是运行的文件名
效果如下:

[root@roverliang~]# ps aux |grep php
74:root 4163 9.3 2.2 243908 22844 pts/1 S+ 17:42 0:00 php process.php
75:root 4164 0.0 0.3 229104 3924 pts/1 S+ 17:42 0:00 php process.php
76:root 4165 1.3 0.4 229104 4124 pts/1 S+ 17:42 0:00 php process.php
77:root 4166 1.3 0.4 229104 4124 pts/1 S+ 17:42 0:00 php process.php
78:root 4167 1.0 0.4 229104 4124 pts/1 S+ 17:42 0:00 php process.php
79:root 4168 1.3 0.4 229104 4124 pts/1 S+ 17:42 0:00 php process.php
80:root 4169 1.3 0.4 229104 4124 pts/1 S+ 17:42 0:00 php process.php
81:root 4170 1.3 0.4 229104 4124 pts/1 S+ 17:42 0:00 php process.php
82:root 4171 1.3 0.4 229104 4124 pts/1 S+ 17:42 0:00 php process.php
83:root 4172 1.3 0.4 229104 4124 pts/1 S+ 17:42 0:00 php process.php
84:root 4173 1.3 0.4 229104 4124 pts/1 S+ 17:42 0:00 php process.php
85:root 4174 1.3 0.4 229104 4124 pts/1 S+ 17:42 0:00 php process.php

有疑问的话,可以共同讨论学习。博主也是刚学习这块,如果有什么不对的,希望能得到指点,共同提高。

原文地址:https://www.cnblogs.com/roverliang/p/6226980.html