rabbitmq实现一台服务器同时给指定部分的consumer发送消息(tp框架)(第六篇)

previous article:  http://www.cnblogs.com/spicy/p/7989717.html

上一篇学习了,发送消息的时候用direct类型的exchange,绑定不同的路由信息比如 info、warning、error 

       接受消息的时候,通过queue_bind("队列名字“,“exchange名字”,“想监听的routing”) 方法 就可以接受想监听的对应路由关键字的信息(info,warning,error)

这次,我们使用exchange 类型是topic的方式,然后路由可以更灵活的走到对应的队列

   “quick.orange.rabbit”  会走到Q1 和 Q2

    “lazy.orange.elephant” 会走到Q1 和 Q2

   "quick.orange.fox"  只走Q1

   lazy.brown.fox  只去Q2

           lazy.pink.rabbit  虽然符合二次 但是只会发送一次到Q2

   quick.brown.fox 不会去任何队列

   lazy.orange.male.rabbit 会去Q2

知识点: *表示任意一个字符  #表示所有字符

发送和接受代码代码:

 public function worker1()
    {
        set_time_limit(0);
        $connection = new AMQPStreamConnection('localhost', 5672, 'bitch', 'bitch');
        $channel = $connection->channel();

        #申明一个exchange名字叫logs,类型是fanout
        $channel->exchange_declare('topic_logs','topic',false,false,false);

        #申明一个由服务器自动命名的队列,这个队列会在连接结束以后 自动断掉
        list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

        $binding_keys  =['*'];

        #把随机命名的队列绑定到绗棉新建的exchange,同时分配routing key
        foreach($binding_keys as $binding_key) {
            $channel->queue_bind($queue_name, 'topic_logs', $binding_key);
        }

        #下面第四个参数如果为false表示开启确认模式,也就是消费以后会告知rabbitmq服务器该条消息已经处理完毕,这样可以方式消息处理一半挂掉了,结果服务器也删除了这条未处理完毕的消息
        $receiver = new self();
        $channel->basic_consume($queue_name, '', false, true, false, false, [$receiver, 'callFunc']);

        while(true) {
            $channel->wait();
        }
        $channel->close();
        $connection->close();
    }
    public function worker2()
    {
        set_time_limit(0);
        $connection = new AMQPStreamConnection('localhost', 5672, 'bitch', 'bitch');
        $channel = $connection->channel();

        #申明一个exchange名字叫logs,类型是fanout
        $channel->exchange_declare('topic_logs','topic',false,false,false);

        #申明一个由服务器自动命名的队列,这个队列会在连接结束以后 自动断掉
        list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

        $binding_keys  = ['a.#'];

        #把随机命名的队列绑定到绗棉新建的exchange,同时分配routing key
        foreach($binding_keys as $binding_key) {
            $channel->queue_bind($queue_name, 'topic_logs', $binding_key);
        }

        #下面第四个参数如果为false表示开启确认模式,也就是消费以后会告知rabbitmq服务器该条消息已经处理完毕,这样可以方式消息处理一半挂掉了,结果服务器也删除了这条未处理完毕的消息
        $receiver = new self();
        $channel->basic_consume($queue_name, '', false, true, false, false, [$receiver, 'callFunc']);

        while(true) {
            $channel->wait();
        }
        $channel->close();
        $connection->close();
    }

    public function task()
    {
        $connection = new AMQPStreamConnection('localhost', 5672, 'bitch', 'bitch');
        $channel = $connection->channel();

        #申明一个exchange名字叫topic_logs,类型是topic
        $channel->exchange_declare('topic_logs','topic',false,false,false);

        $routing_key  = 'a.b.c';
        $msg = new AMQPMessage('4');

        #第二个参数是表示走什么exchange  第三个参数表示路由到什么队列
        $channel->basic_publish($msg, 'topic_logs',$routing_key);
        echo '发送完毕';

        $channel->close();
        $connection->close();
    }
View Code
原文地址:https://www.cnblogs.com/spicy/p/8006405.html