rabbitmq实现一台服务器同时给指定部分的consumer发送消息(tp框架)(第五篇)

承接上一篇: http://www.cnblogs.com/spicy/p/7942521.html

背景: 前面已经实现了一个rabbitmq服务器给所有的subscrible的consumer统一发消息

目标:实现给指定部分consumer发放消息

1,之前用到下面的绑定exchange的方式,意思就是:  该队列queue对来自该exchange消息非常感兴趣

  

2,实际上这个方法课可以接受第三个参数 路由 routing_key, 意思是:一条消息会通过exchange交换机转发到路由

  

但是第三个参数也是根据exchange的类型来决定,如果fanout类型,那么就会自动忽略第三个参数,所以现在用type是direct的交换机exchange

 一条发布的消息如果routing key是 orange 就会被路由到Q1队列, 同理 routing key是black 或者 green的会被路由到Q2而其他的消息就会被丢弃掉

 

 同一个binding key可以同时绑定给多个队列,这下下图发布一条消息如果routing key是black 就会同时发给2条队列

 实验: 指定routing key 发到对应名字的队列接受,不符合的不接受,从而某些消息只发送到指定的队列中

1,tp 路由处理  下面task来发布消息, worker1 和 worker2 来消费消息

  

2,发送队列消息的方法(下面的$serverity就是指定routing key的走向)

  

public function task()
    {
        $connection = new AMQPStreamConnection('localhost', 5672, 'bitch', 'bitch');
        $channel = $connection->channel();

        #申明一个exchange名字叫logs,类型是fanout
        $channel->exchange_declare('direct_logs','direct',false,false,false);

        $severity = 'info';
        $msg = new AMQPMessage('123');

        #第二个参数是表示走什么exchange  第三个参数表示走什么队列
        $channel->basic_publish($msg, 'direct_logs',$severity);//$severity包括‘info’,‘waring’,‘error’
        echo '发送完毕';

        $channel->close();
        $connection->close();
    }
View Code

3,接受消息方法1:(只接受 info,waring,error的消息)

public function worker2()
    {
        set_time_limit(0);
        $connection = new AMQPStreamConnection('localhost', 5672, 'bitch', 'bitch');
        $channel = $connection->channel();

        #申明一个exchange名字叫logs,类型是fanout
        $channel->exchange_declare('direct_logs','direct',false,false,false);

        #申明一个由服务器自动命名的队列,这个队列会在连接结束以后 自动断掉
        list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

        $severities = ['info','warning','error'];

        #把随机命名的队列绑定到绗棉新建的exchange,同时分配routing key
        foreach($severities as $severity) {
            $channel->queue_bind($queue_name, 'direct_logs', $severity);
        }

        #下面第四个参数如果为false表示开启确认模式,也就是消费以后会告知rabbitmq服务器该条消息已经处理完毕,这样可以方式消息处理一半挂掉了,结果服务器也删除了这条未处理完毕的消息
        $receiver = new self();
        $channel->basic_consume($queue_name, '', false, true, false, false, [$receiver, 'callFunc']);

        while(true) {
            $channel->wait();
        }
        $channel->close();
        $connection->close();
    }

        #下面第四个参数如果为false表示开启确认模式,也就是消费以后会告知rabbitmq服务器该条消息已经处理完毕,这样可以方式消息处理一半挂掉了,结果服务器也删除了这条未处理完毕的消息
        $receiver = new self();
        $channel->basic_consume($queue_name, '', false, true, false, false, [$receiver, 'callFunc']);

        while(true) {
            $channel->wait();
        }
        $channel->close();
        $connection->close();
    }
View Code

4,分别开启worker1  worker2

  

5,先发一个 routing key 是info的消息

  

  发现二个worker1 worker2 都获取到了消息

  

6,再发一个 routing key 是warning的消息 (更改task方法,把里面的)

 

  现在发现 只有worker2 获取到了消息,

 7,如果发一个 routing_key 是noting的 ,会发现 worker1 和 woker2 什么都没有收到。

  

原文地址:https://www.cnblogs.com/spicy/p/7989717.html