rabbitmq简单运用

<?php
/**
 * 生产者
 */


$connection = new AMQPConnection([
    'host' => '192.168.23.130',
    'port' => 5672,
    'login' => 'rabuser',
    'password' => '123456'
]);

$connection->connect() or die('连接失败');

try{

    $exchange_name = 'trades';
    $route_key = '/trade';
    //投递消息到中间件

    $channel = new AMQPChannel($connection);//创建消息通道

    $exchange = new AMQPExchange($channel);//通过通道连接交换几
    //设置通道名称
    $exchange->setName($exchange_name);

    $data = json_encode(['time'=>time()]);
    //发布消息到交换机中
    $exchange->publish($data,$route_key);


}catch (AMQPChannelException $e){
    var_dump($e);
}

  

<?php
/**
 * 消费者
 */

$connection = new AMQPConnection([
    'host' => '192.168.23.130',
    'port' => 5672,
    'login' => 'rabuser',
    'password' => '123456'
]);

$connection->connect() or die('连接失败');


try{

    $exchange_name = 'trades';
    $route_key = '/trade';
    $queue_name = 'queue';
    //投递消息到中间件

    $channel = new AMQPChannel($connection);//创建消息通道

    $exchange = new AMQPExchange($channel);//通过通道连接交换几
    //设置通道名称
    $exchange->setName($exchange_name);
    //三种获取消息的模式,直连模式,主题模式,广播模式

    $exchange->setType(AMQP_EX_TYPE_DIRECT);
    //声明
    $exchange->declareExchange();

    //声明队列绑定交换机路由
    $queue = new AMQPQueue($channel);
    $queue->setName($queue_name);
    $queue->declareQueue();

    //绑定监听获取数据
    $queue->bind($exchange_name,$route_key);

    //消费数据,默认阻塞监听获取数据
    $queue->consume(function ($event,$queue){
        //获取数据
        $msg = $event->getBody();

        var_dump($msg);
        var_dump($queue);
        //回应ACK
        $queue->ack($event->getDeliveryTag());
    });

}catch (AMQPChannelException $e){
    var_dump($e);
}

  

原文地址:https://www.cnblogs.com/pangxiaox/p/11070926.html