rabbitMQ的几种工作模式及代码demo(四)订阅模式之通配符topic交换机

  1. Topics通配符模式
    • 需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
    • topic模式是direct模式的升级版,可以对routing key使用通配符。
    • Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
    • 通配符规则:
      • #:匹配一个或多个词
      • *:匹配不多不少恰好1个词
    • 举例:
      • item.#:能够匹配item.insert.abc 或者 item.insert
      • item.*:只能匹配item.insert
    • 应用场景:相比如direct模式,可以更适应复杂的策略业务环境。
    • 代码层面:除了通配符,基本无差别
      image
    • 生产者代码:
    public class Producer {
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            String exchangeName = "test_topic";
            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
            String queue1Name = "test_topic_queue1";
            String queue2Name = "test_topic_queue2";
            channel.queueDeclare(queue1Name,true,false,false,null);
            channel.queueDeclare(queue2Name,true,false,false,null);
            // 绑定队列和交换机
            /**
             *  参数:
                 1. queue:队列名称
                 2. exchange:交换机名称
                 3. routingKey:路由键,绑定规则
                     如果交换机的类型为fanout ,routingKey设置为""
             */
            // routing key  系统的名称.日志的级别。
            //需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
            channel.queueBind(queue1Name,exchangeName,"#.error");
            channel.queueBind(queue1Name,exchangeName,"order.*");
            channel.queueBind(queue2Name,exchangeName,"*.*");
            String body = "日志信息:张三调用了findAll方法...日志级别:info...";
            //发送消息goods.info,goods.error
            channel.basicPublish(exchangeName,"order.info",null,body.getBytes());
            channel.close();
            connection.close();
        }
    }
    
    • 消费者1代码:
    public class Consumer1 {
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            String queue1Name = "test_topic_queue1";
            Consumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("body:"+new String(body));
                }
            };
            channel.basicConsume(queue1Name,true,consumer);
        }
    }
    
    • 消费者2代码:
    public class Consumer2 {
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            String queue2Name = "test_topic_queue2";
            Consumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("body:"+new String(body));
                }
            };
            channel.basicConsume(queue2Name,true,consumer);
        }
    }
    
原文地址:https://www.cnblogs.com/rbwbear/p/15557680.html