rabbitmq php

connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');//连接mq
$channel = $connection->channel();//创建一个channel
$channel->queue_declare(queue[队列名称], passive[测试队列是否存在,存在成功返回,不存在返回错误(被动队列,不能临时创建,必须已经存在)],durable[消息持久化],exclusive[设置是否排他], auto_delete[]);//声明一个队列
$massage = new AMQPMessage(body[消息内容],['delivery_mode'=>AMQPMessage::DELIVERY_MODE_PERSISTENT]);//要插入队列的数据,delivery_mode代表是否持久化
$channel->basic_publish($massage, exchange, routking_key[queue名称或者routingkey]);
//--
$callback = function ($msg) {
echo " [x] Received ", $msg->body, " ";
sleep(substr_count($msg->body, '.'));
echo " [x] Done", " ";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
//我们可以使用basic.qos方法,并设置prefetch_count=1。这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个工作者(worker),
//直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的工作者(worker)
$channel->basic_qos(null, 1, null);
$channel->basic_consume('queue_name',consumer_tag,no_local,no_ack,excluslve,nowait,$callback);
consumer_tag [消费者标签,用来区分多个消费者]
no_local [设置为 true 则表示不能将同一个 Connection 口中生产者发送的消息传送给这个 Connection 中的消费者]
no_ack [ 是否开启自动确认机制,最好设置为false ]

exclusive [ 设置是否排他 ]   

nowait [该方法需要应答确认]
callback [ 设置消费者的回调函数。用来处理 Rabb itMQ 推送过来的消息,比如 DefaultConsumer , 使用时需要客户端重写 (override) 其中的方法 ]

自动确认机制实际上是先发出消息,然后给消息打上一个隐藏删除标志,等删除后在进行删除!
如果没收到确认消息,并且消费者已经断开链接,这mq安排这条消息从新进入队列,等待投递给下一个

手动拒绝消息:使用basic_reject(delivery_tag,requeue)
$msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'],false[ture(放回队列头部),false(丢弃消息)]);//消息拒绝

使用basic_nack批量处理拒绝消息
delivery_tag:消息
requeue 如果为true则消息从新进入队列,如果false,消息删除队列
$channel->basic_recover(requeue);用来请求mq从新发送还未被确认的消息。如果true则没被确认的消息会从新加入队列,false则同一条消息会被分配给与之前相同的消费者。

//---4 消息何去何从

mandatory 和 immediate 是 channel . basic_publish 方法中的两个参数,它们都有
当消息传递过程中不可达目的地时将消息返回给生产者的功能。 RabbitMQ 提供的备份交换器
(Altemate Exchange) 可以将未能被交换器路由的消息(没有绑定队列或者没有匹配的绑定〉存
储起来,而不用返回给客户端。

immediate会影响镜像队列的性能,一般用TTL或DLX的方法替代。
如果发送的消息不设置mandatory参数,那么消息在未被路由的情况下会丢失,如果添加了必须加returnlistener进行监听,如果不想复杂化代码又想要不让消息丢失,
那么可以使用备份交换器,这样可以把未被路由的消息存储在rabbitmq中,在有需要的时候在去处理它。

队列可以设置TTL,这种情况下,队列中的消息都是设置的这个过期时间
如果社会之ttl为0:消息可以直接投递到消费者,否则该消息会被直接丢弃。
也可以给每条消息设置ttl,例如:
new AMQPMessage($_GET['d'], ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,'expiration'=>5000]);
这个消息过期设置不是在过期后就直接删除,而是在队列数据读取出来的时候发现已经过期时间,在删除该消息
死信队列
当一个消息被拒绝或者消息过期或者队列达到最大长度,则消息会进入死信队列,前提是有设置死信队列。
延迟队列
延迟队列用死信队列+消失ttl实现(DLX+TTL)
优先级队列
通过x-max-priority参数来实现

数据持久化
分为:交换机持久化、队列持久化、消息持久化
注意:
将所有消息持久化会严重影响性能的,因为写入磁盘的速度比写入内存的速度慢得多
当然把交换机队列消息都设置为持久化也不一定会不丢失。
比如:在ack自动应答设置为true(自动应答),消息还没消费就宕机了。数据写入磁盘需要一段时间,在这段时间宕机。
解决办法有rabbitmq的镜像队列机制,相当于配置了副本,如果主节点master在此特殊时期内挂了,可以自动切换到从节点slave,这样可以有效的保证高可用性,除非整个集群都挂掉。

生产者确认
  事务机制
    可以使用事务进行数据添加或者用发送方确认机制,事务机制会影响性能。
  发送方确认机制
    生产者将信道设置为确认模式之后,发送消息之后会发送一条确认给生产者,这就使得生产者知晓消息已被送达到目的地,如果消息设置为可持久化,
    那么确认消息会在消息写入磁盘中之后发出。
(可批量处理)
    生产者发送消息是异步的,可以在等待确认消息送达的同时继续发送消息,如果消息没送达,会返回一个nack,生产者可以进行处理。
  注意:事务和发送确认是互斥的,不能共存。
消费者确认

消息分发
  当拥有多个消费者时,队列收到的消息将于轮询的分发方式发送给消费者。
  使用basic_qos机制可以,这个方法用来允许限制信道上的消费者所能保持的最大未确认消息数量。
  如果设置为0,则表示不设置上线。
  注意:qos的使用对拉模式的消费方式不生效。

    basic_qos:对于一个信道来说,它可以同事消费多个队列,当 prefetch_count 设置为 0 则表示
    没有上限。还有 prefetch_size 这个参数表示消费者所能接收未确认消息的总体大小的上限,
    单位为 B ,设置为 0 则表示没有上限。

消息传输保障
  
  At most once: 最多 一次。消息可能会丢失 ,但绝不会重复传输 。
  
  At least once: 最少一次。消息绝不会丢失,但可能会重复传输。

     Exactly once: 恰好 一次。每条消息肯定会被传输一次且仅传输一次。



//---------------------
<?php
/**
* Created by PhpStorm.
* User: yifanpan
* Date: 18-9-25
* Time: 下午7:31
*/

namespace AppHttpControllersApiV1Service;

use AppCoreJwtJwtAuth;
use AppHttpControllersApiApiController;
use httpEnvRequest;
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;
use PhpAmqpLibWireAMQPTable;

class TestController extends ApiController
{
public function token()
{
$clientId = 'e7f99380-ef09-11e8-950e-95ed57aef7d8';
$secret = 'bYTdW3X6ax6zSGIvvEguHbp5hXsQ5qf6O8B8H2W59QP8KW8Ef3q470UmPdWNtV4E';
$payload = [
'client_id' => $clientId
];
echo JwtAuth::encode($payload, $secret);
}

public function test()
{
$clientId = 'e7f99380-ef09-11e8-950e-95ed57aef7d8';
$secret = 'bYTdW3X6ax6zSGIvvEguHbp5hXsQ5qf6O8B8H2W59QP8KW8Ef3q470UmPdWNtV4E';
$payload = [
'client_id' => $clientId
];
echo JwtAuth::encode($payload, $secret);
}

public function getCallBack(Request $request)
{
$data = $request->all();
}

public function rabbitmqInitialization()
{
$rabbitmqObject = new AMQPStreamConnection('rabbitmq', '5672', 'guest', 'guest');
return $rabbitmqObject;
}

public function workTask()
{
//http://member-notification-service.dd01.work/rabbitmq/work_task?d=6623236
$ob = $this->rabbitmqInitialization();
$channel = $ob->channel();
$args = [
'x-message-ttl' => 5000//设置队列里面数据的过期时间
];
$channel->queue_declare('task_queue2', false, true, false, false, false, new AMQPTable($args));
$massage = new AMQPMessage($_GET['d'], ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, 'expiration' => 5000]);
$channel->basic_publish($massage, '', 'task_queue2');
$channel->set_return_listener(
function ($replyCode, $replyText, $exchange, $routingKey, AMQPMessage $message) {
echo $replyCode;
echo "Message returned with content " . $message->body . PHP_EOL;
});
echo "send success";
}

public function fanOut()
{
//http://member-notification-service.dd01.work/rabbitmq/fanout?d=7323237623236
//发布/订阅
$exchange = 'fanout_test';
$routingKey = 'fanout_routingkey';//不需要声明,在fanout的情况下会被忽略掉
$ob = $this->rabbitmqInitialization();
$channel = $ob->channel();
$channel->exchange_declare($exchange, 'fanout', false, false, false);
$massage = new AMQPMessage($_GET['d']);
$channel->basic_publish($massage, $exchange);
$channel->close();
$ob->close();
echo "send success";
}

public function direct()
{
//http://member-notification-service.dd01.work/rabbitmq/direct?d=7323237623236&l=info
//http://member-notification-service.dd01.work/rabbitmq/direct?d=7323237623236&l=error
//路由-直连交换机
$exchange = 'direct_test';
$ob = $this->rabbitmqInitialization();
$channel = $ob->channel();
$channel->exchange_declare($exchange, 'direct', false, false, false);
$massage = new AMQPMessage($_GET['d']);
$channel->basic_publish($massage, $exchange, $_GET['l']);
echo 'send success';
$channel->close();
$ob->close();
}

public function topic()
{
//http://member-notification-service.dd01.work/rabbitmq/topic?d=wodelaitianye&l=aaa.ww.eee.error.info
//http://member-notification-service.dd01.work/rabbitmq/topic?d=santian&l=error.info
$exchange = 'topic_test';
$ob = $this->rabbitmqInitialization();
$channel = $ob->channel();
$channel->exchange_declare($exchange, 'topic', false, false, false);
$massage = new AMQPMessage($_GET['d']);
$channel->basic_publish($massage, $exchange, $_GET['l']);
echo 'send success';
$channel->close();
$ob->close();
}
}
//---------------------------------------------------

<?php

namespace AppConsoleCommands;

use IlluminateConsoleCommand;
use PhpAmqpLibConnectionAMQPStreamConnection;

class RabbitMq extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'rabbitmq:work {--queue=default} {--routingKey=default} {--type=default}';

/**
* The console command description.
*
* @var string
*/
protected $description = 'Command description';

/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{
parent::__construct();
}

/**
* Execute the console command.
*
* @return mixed
*/
public function handle()
{
switch ($this->option('type')) {
case 'work_task':
//php artisan rabbitmq:work --type=work_task
$this->workTask();
break;

case 'fanout':
//php artisan rabbitmq:work --type=fanout
//php artisan rabbitmq:work --type=fanout
//php artisan rabbitmq:work --type=fanout
//开启两个以上。多个的情况下都能接收到消息
$this->fanOut();//同时开启多个work
break;

case 'direct':
$queues = $this->option('queue');
$routingKey = $this->option('routingKey');
$this->direct($queues, $routingKey);
//php artisan rabbitmq:work --queue=test_direct_error --routingKey=error --type=direct
//php artisan rabbitmq:work --queue=test_direct_info --routingKey=info --type=direct
break;

case 'topic':
$queues = $this->option('queue');
$routingKey = $this->option('routingKey');
$this->topic($queues, $routingKey);
//php artisan rabbitmq:work --queue=test_topic_info --routingKey=#.info --type=topic
//php artisan rabbitmq:work --queue=test_topic_error --routingKey=error.* --type=topic
break;
}
}

public function workTask()
{

$channel = $this->connection()->channel();
$channel->queue_declare('test_queue2', false, true, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', " ";
$callback = function ($msg) {
echo " [x] Received ", $msg->body, " ";
// sleep(10);
echo " [x] Done", " ";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

//我们可以使用basic.qos方法,并设置prefetch_count=1。这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个工作者(worker),
//直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的工作者(worker)
$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue2', '', false, false, false, false, $callback);

while (count($channel->callbacks)) {
$channel->wait();
}

$channel->close();
$this->connection()->close();
}

/**
* 创建一个随机队列进行监听
*/
public function fanOut()
{
$channel = $this->connection()->channel();
$channel->exchange_declare('fanout_test', 'fanout', false, false, false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$channel->queue_bind($queue_name, 'fanout_test');
echo ' [*] Waiting for logs. To exit press CTRL+C', " ";

$callback = function ($msg) {
echo ' [x] ', $msg->body, " ";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while (count($channel->callbacks)) {
$channel->wait();
}

$channel->close();
$this->connection()->close();
}

public function direct($queue_name, $routingKey)
{
//php artisan rabbitmq:work --queue=test_direct_error --routingKey=error
//php artisan rabbitmq:work --queue=test_direct_info --routingKey=info 无法进行多个work监听同一个队列
$exchange = 'direct_test';
$channel = $this->connection()->channel();
$channel->exchange_declare($exchange, 'direct', false, false, false);
$channel->queue_declare($queue_name, false, false, true, false);
$channel->queue_bind($queue_name, $exchange, $routingKey);
echo ' [*] Waiting for logs. To exit press CTRL+C', " ";

$callback = function ($msg) {
echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, " ";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while (count($channel->callbacks)) {
$channel->wait();
}

$channel->close();
$this->connection()->close();
}

public function topic($queue_name, $routingKey)
{
//*代表一个单词
//#代表0个或多个单词
$exchange = 'topic_test';
$channel = $this->connection()->channel();
$channel->exchange_declare($exchange, 'topic', false, false, false);
// $channel->exchange_declare($exchange, 'topic', false, false, false);
$channel->queue_declare($queue_name, false, true, false, false);
$channel->queue_bind($queue_name, $exchange, $routingKey);
echo ' [*] Waiting for logs. To exit press CTRL+C', " ";
$callback = function ($msg) {
echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, " ";
// sleep(50);
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);//消息应答
// $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'],false);//消息拒绝第二个传输true放回队列头部,false丢弃消息
;
};
//我们可以使用basic.qos方法,并设置prefetch_count=1。这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个工作者(worker),
//直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的工作者(worker)
$channel->basic_qos(null, 1, null);
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);

while (count($channel->callbacks)) {
$channel->wait();
}

$channel->close();
$this->connection()->close();
}

private function connection()
{
$connection = new AMQPStreamConnection('rabbitmq', '5672', 'guest', 'guest');
return $connection;

}
}

//-------------------------rpc-server

<?php

namespace AppConsoleCommands;

use IlluminateConsoleCommand;
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;

class RpcRabbitMqServer extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'rabbitmq:rpc_server';

/**
* The console command description.
*
* @var string
*/
protected $description = 'Command description';

/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{
parent::__construct();
}

/**
* Execute the console command.
*
* @return mixed
*/
public function handle()
{
$this->rpc_server();
}

public function rabbitmqInitialization()
{
$rabbitmqObject = new AMQPStreamConnection('rabbitmq', '5672', 'guest', 'guest');
return $rabbitmqObject;
}

public function rpc_server()
{
$ob = $this->rabbitmqInitialization();
$channel = $ob->channel();
$channel->queue_declare('rpc_queue', false, false, false, false);
echo " [x] Awaiting RPC requests ";
$callback = function ($req) {
$n = intval($req->body);
echo " [.] fib(", $n, ") ";

$msg = new AMQPMessage(
(string)$this->fib($n),
array('correlation_id' => $req->get('correlation_id'))
);

$req->delivery_info['channel']->basic_publish(
$msg, '', $req->get('reply_to'));
$req->delivery_info['channel']->basic_ack(
$req->delivery_info['delivery_tag']);
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}

$channel->close();
$ob->close();

}

function fib($n)
{
if ($n == 0)
return 0;
if ($n == 1)
return 1;
return $this->fib($n - 1) + $this->fib($n - 2);
}

}

//------------rpc-client

<?php

namespace AppConsoleCommands;

use IlluminateConsoleCommand;
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;

class RpcRabbitMqClient extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'rabbitmq:rpc_client';

/**
* The console command description.
*
* @var string
*/
protected $description = 'Command description';

//----------------------------
private $callback_queue;
private $response;
private $corr_id;
private $channel;
/**
* Create a new command instance.
*
* @return void
*/

public function __construct()
{
$ob = $this->rabbitmqInitialization();
$this->channel = $ob->channel();
parent::__construct();
}

/**
* Execute the console command.
*
* @return mixed
*/
public function handle()
{
$this->rpc_client_construct();

$this->callData(30);
while (!$this->response) {
$this->channel->wait();
}
echo " [.] Got ", $this->response, " ";
}

public function rpc_client_construct()
{

list($this->callback_queue, ,) = $this->channel->queue_declare(
"", false, false, true, false);
$callBack = function ($rep){
$this->response = $rep->body;
if ($rep->get('correlation_id') == $this->corr_id) {
$this->response = $rep->body;
}
};
$this->channel->basic_consume(
$this->callback_queue, '', false, false, false, false,
$callBack);
}

public function rabbitmqInitialization()
{
$rabbitmqObject = new AMQPStreamConnection('rabbitmq', '5672', 'guest', 'guest');
return $rabbitmqObject;
}

public function callData($n)
{
$this->corr_id = uniqid();

$msg = new AMQPMessage(
$n,
array('correlation_id' => $this->corr_id,
'reply_to' => $this->callback_queue)
);
$this->channel->basic_publish($msg, '', 'rpc_queue');
}
}
//---------
 
 

原文地址:https://www.cnblogs.com/yifan72/p/10638136.html