封装rabbitmq

今天又抽时间用php封装了rabbitmq,使用的框架是yaf

Consumer如下:

<?php

namespace RabbitMq;
class Consumer
{
    public $exchange_name = "exchange_1";
    public $route_name = "route_1";
    public $queue_name = "queue_1";
    public $conn = null;
    public $channel = null;
    public $exchange = null;
    public $queue = null;

    public function __construct(string $exchange_name = "", string $route_name = "", string $queue_name = "")
    {
        if ($exchange_name) $this->exchange_name = $exchange_name;
        if ($route_name) $this->route_name = $route_name;
        if ($queue_name) $this->queue_name = $queue_name;
        $this->init();
//        $this->createChannel();
//        $this->createQueue();
    }

    public function init()
    {
        //创建连接和channel
        $this->conn = new AMQPConnection(MqConfig::$config);
        if (!$this->conn->connect()) {
            die("Cannot connect to the broker!
");
        }

    }

    public function createChannel()
    {
        $this->channel = new AMQPChannel($this->conn);

        //创建交换机
        $this->exchange = new AMQPExchange($this->channel);
        $this->exchange->setName($this->exchange_name);
        $this->exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型
        $this->exchange->setFlags(AMQP_DURABLE); //持久化
        //echo "Exchange Status:" . $this->exchange->declare() . "
";


    }

    public function createQueue()
    {
        //创建队列
        $this->queue = new AMQPQueue($this->channel);
        $this->queue->setName($this->queue_name);
        $this->queue->setFlags(AMQP_DURABLE); //持久化
        //echo "Message Total:" . $this->queue->declare() . "
";
        //绑定交换机与队列,并指定路由键
        echo 'Queue Bind: ' . $this->queue->bind($this->exchange_name, $this->route_name) . "
";

        //阻塞模式接收消息
        echo "接收到的消息:
";
        while (True) {
            $this->queue->consume(function ($envelope, $queue) {
                $msg = $envelope->getBody();
                echo $msg . "
"; //处理消息
                $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
            });
            //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答
        }
        $this->conn->disconnect();
    }
}
Publisher如下
<?php

namespace RabbitMq;
class Publisher
{
    public $exchange_name = "exchange_1";
    public $route_name = "route_1";
    public $conn = null;
    public $channel = null;
    public $exchange = null;

    public function __construct(string $exchange_name = "", string $route_name = "")
    {
        if ($exchange_name) $this->exchange_name = $exchange_name;
        if ($route_name) $this->route_name = $route_name;
        $this->init();
    }

    public function init()
    {
        //创建连接和channel
        $this->conn = new AMQPConnection(MqConfig::$config);
        if (!$this->conn->connect()) {
            die("Cannot connect to the broker!
");
        }

    }

    public function createChannel()
    {
        $this->channel = new AMQPChannel($this->conn);
        //创建交换机对象
        $this->exchange = new AMQPExchange($this->channel);
        $this->exchange->setName($this->exchange_name);
    }

    public function publishMsg()
    {
        for ($i = 0; $i < 5; ++$i) {
            sleep(1);//休眠1秒
            //发送的消息内容
            $message = "测试消息,你好啊!" . date("h:i:s");
            echo "发送消息:哈哈哈:" . $this->exchange->publish($message, $this->route_name) . "
";
        }
        $this->conn->disconnect();
    }
}

简单调用:

调用consumer:

$consumer = new RabbitMqConsumer();
$consumer->createChannel();
$consumer->createQueue();

调用Publisher:

$publisher = new RabbitMqPublisher();
$publisher->createChannel();
$publisher->publishMsg();
原文地址:https://www.cnblogs.com/allen-spot/p/11439471.html