rabbitmq6-路由模式

一、前言:

大体的应用场景在前面都已经分析过,这里面只是最基础的rabbitmq的基础的教程的使用方法,后面我们在实际使用的过程中是会结合springboot来开展的,所以这里我们迅速的把基础过万,然后迅速进入实战的部分。但是这里建议还是把前面的相关的博文看一看,因为springboot封装了rabbitmq的一些操作,但是你要知道封装的内容是什么,不要求完全精细的掌握底层,但是也要知道个大概,这样有问题起码你能有一个明确的思路,就是google或者百度你起码能有一个关键词来搜索。

二、两种模式:

image.png
图上的两种绑定模式都是合法的:
- 相同的routing key 绑定不同的队列
- 不同的routing key 绑定同一个队列
有人可能会说前面我话的图routing key是在交换机和队列之间binding key是在消费者和队列之间,这里申明一下这个是没有问题的,因为按照Exchange的type来进行匹配的时候都是在队列里面进行匹配的,所以这在图上是没有问题的。

三、代码:

3.1、生产者:
  • 这里面我们演示图二的情况,不同的routing key绑定同一个队列。事实上进行这个实验的完整做法是在发送消息的时候传入routing key的,但是这里为了简单演示,所以直接发送两次消息。
public class Producer {
    public static final String EXCHANGE_NAME = "routing_model";
    public static final String ROUTING_KEY1 = "routing_key1";
    public static final String ROUTING_KEY2 = "routing_key2";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection conn = ConnUtils.getConn();
        Channel channel = conn.createChannel();

        /**
         * 注意下面我们持久化的是交换机,已经不是持久化的队列了,交换机是不存放消息的,只是一个消息的搬运工
         */
        String exchangeType = BuiltinExchangeType.DIRECT.getType();
        boolean durable = true;
        boolean autoDelete = false;
        boolean internal = false;
        Map<String,Object> arguments = null;
        // 声明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME, exchangeType,durable,autoDelete,internal,arguments);
        /**
         * 绑定一个队列,说明这个对象了对这个交换机上的数据感兴趣,下面是绑定的规则:
         *  1、同一个routing key 可以绑定多个队列
         *                ---routing key1 ----Q1
         *          P --- X
         *                ---routing key2 ----Q2
         *  2、不同的 routing key绑定一个队列
         *              ----routing key1 -----Q1
         *      P ----- X
         *              ----routing kye2 ----------|
         *                                         Q2
         *              ----routing key3 ----------
         *
         *   下面我们演示第二种,
         */
        /**
         * 下面是依据我的个人理解写出来的,如果大家有不同的意见可以留言或者加群交流
         * 在路由的模式下,生产者是不需要绑定队列的,只是声明交换机,同时交换机是无法存放消息的,
         * 假如我们直接声明交换机,然后发送消息,但是消费者还有绑定声明队列,那么这个消息就有可能丢失,所以必须先建立交换机以及队列
         * 然后生产者发送消息,消费者接受
         *
         * 这个正常的做法应该是把 routing key作为方法的参数传递过来的,这里面由于我们是做测试,就直接发送两个消息以不同的routing可以
         * 来发送
         */
        channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY1,null,"CEUIXCXI routing key1".getBytes());
        channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY2,null,"CEUIXCXI routing key2".getBytes());
    }
}

希望大家能多看看我代码中的注释信息:
- 上面的持久化指的是交换机的持久化,现在我们接触到的持久化有:消息的持久化、交换机的持久化。
- 进行上面的实验,我们要先建立交换机,所以在执行的时候一定要先执行以下上面的代码进行建立交换机,但是我们发现一个问题,我们执行上述的两码,由于我们没有运行消费者的关系,我们发现在web控制台我们查看不到消息。是的,我们查看不到,大家记住交换机的作用,交换机只是消息的搬运工,既不生产消息亦不储存消息
- 记住上面我们使用的模式是Exchanage的direct交换机模式

3.2、消费者:
public class Consume001 {
    public static final String EXCHANGE_NAME = "routing_model";
    public static final String QUEUE_NAME1 = "routing_queue1";
    public static final String ROUTING_KEY1 = "routing_key1";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection conn = ConnUtils.getConn();
        final Channel channel = conn.createChannel();
        channel.queueDeclare(QUEUE_NAME1,true,false,false,null);
       channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,ROUTING_KEY1);
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body,"utf-8"));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(QUEUE_NAME1,false,consumer);
    }
}
public class Consume002 {
    public static final String EXCHANGE_NAME = "routing_model";
    public static final String QUEUE_NAME1 = "routing_queue1";
    public static final String ROUTING_KEY1 = "routing_key2";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection conn = ConnUtils.getConn();
        final Channel channel = conn.createChannel();
        channel.queueDeclare(QUEUE_NAME1,true,false,false,null);
        channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,ROUTING_KEY1);

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body,"utf-8"));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        channel.basicConsume(QUEUE_NAME1,false,consumer);

    }
}

可以看到我们使用不同的routing key绑定了同一个队列
image.png
image.png

大概就是这样了,如果大家有疑问欢迎留言和加群讨论。

原文地址:https://www.cnblogs.com/fkxuexi/p/10674049.html