PHP与RabbitMQ(上)

一、下载

1.下载Erlang环境

https://www.erlang.org/downloads

2.下载Rabbitmq-server

https://www.rabbitmq.com/install-windows.html

 

二、安装

先安装Erlang,再安装rabbitmq-server,一直下一步即可。安装完之后使用cmd进入RabbitMQ Server文件下的sbin目录,执行:

rabbitmq-plugins enable rabbitmq_management

打开浏览器,输入localhost:15672,出现如下界面则安装成功。

默认账号:guest

默认密码:guest

我们创建个admin管理员账号:

admin管理员登录创建个test_host

三、Hello World

这里我以Yii框架为例介绍如何使用RabbitMQ,需要使用composer下载php-amqplib

composer require php-amqplib/php-amqplib

生产者:

<?php

namespace appcommands;

use yiiconsoleController;
use PhpAmqpLibMessageAMQPMessage;
use PhpAmqpLibConnectionAMQPStreamConnection;

class ProductController extends Controller
{
    public function actionIndex()
    {
        $conn = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin', 'test_host');
        $channel = $conn->channel();

        $msgBody = 'Hello White Rabbit!';

        $msg = new AMQPMessage($msgBody, ['content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
        //声明一个队列,所有的参数会在下一篇文章介绍
        $channel->queue_declare('white', false, true, false, false, false);
        //将消息发送到队列,这里并不是直接将消息发送到队列,而是通过交换机间接发送到队列
        $channel->basic_publish($msg, '', 'white');

        echo "product: send msg success
";
    }
}

控制台:

php yii product

RabbitMQ出现了一个white队列,里面有一条消息未被消费:

消费者:

<?php

namespace appcommands;

use yiiconsoleController;
use PhpAmqpLibConnectionAMQPStreamConnection;

class ConsumeController extends Controller
{
    public function actionIndex()
    {
        $conn = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin', 'test_host');
        $channel = $conn->channel();

        $callback = function ($msg) {
            if (true) {
                echo "consume: " . $msg->body . "
";
                //确认消费
                $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
            } else {
                //进行回滚操作
            }
        };

        /*
         * prefetch_count = 1,一次只消费一条消息
         * no_ack = false,消费成功后回复ack
         */
        $channel->basic_qos(null, 1, null);
        $channel->queue_declare('white', false, true, false, false, false);
        $channel->basic_consume('white', '', false, false, false, false, $callback);

        while (count($channel->callbacks)) {
            $channel->wait();
        }
    }
}

控制台:

php yii consume

此时white队列中的一条消息已经被消费:

原文地址:https://www.cnblogs.com/74percent/p/12013950.html