rabbitMQ的几种工作模式及代码demo(二)-----订阅模式之广播fanout交换机

  1. 订阅模式
    • 订阅模式相比于点对点模式,多了一个交换机exchange角色。
    • 生产者会将消息发送给exchange
    • exchange一端接收消息。一端处理消息。例如,交给特定的队列、发送给所有的队列、或者将消息丢弃。
    • exchange有三种类型:Fanout广播、Direct定向、Topic通配符。
      1. Fanout:广播,将消息交给所有绑定到交换机的队列
      2. Direct:定向,把消息交给符合指定routing key 的队列
      3. Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
    • exchange只负责转发消息,不负责存储消息。所以如果没有任何队列和交换机绑定,或者没有符合路由规则的队列,那么消息就会被丢弃。
  2. Publish/Subscribe发布与订阅模式
    • 需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列
    • 与点对点模式对比,一条消息如果被转发到n个队列,并且每个队列都会有消费者消费。该条消息就会被消费n次。
    • 代码层面:多了声明交换机、绑定交换机和队列的两个步骤。消费者并没有多大变化。
      image
    • 广播交换机-生产者代码
    public class Producer {
        public static void main(String[] args) throws Exception {
    
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            /*
           exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
           参数:
            1. exchange:交换机名称
            2. type:交换机类型
                DIRECT("direct"),:定向
                FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
                TOPIC("topic"),通配符的方式
                HEADERS("headers");参数匹配
            3. durable:是否持久化
            4. autoDelete:自动删除
            5. internal:内部使用。 一般false
            6. arguments:参数
            */
            String exchangeName = "test_fanout";
            //5. 创建交换机
            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
            //6. 创建队列
            String queue1Name = "test_fanout_queue1";
            String queue2Name = "test_fanout_queue2";
            channel.queueDeclare(queue1Name,true,false,false,null);
            channel.queueDeclare(queue2Name,true,false,false,null);
            //7. 绑定队列和交换机
            /*
            queueBind(String queue, String exchange, String routingKey)
            参数:
                1. queue:队列名称
                2. exchange:交换机名称
                3. routingKey:路由键,绑定规则
                    如果交换机的类型为fanout ,routingKey设置为""
             */
            channel.queueBind(queue1Name,exchangeName,"");
            channel.queueBind(queue2Name,exchangeName,"");
    
            String body = "日志信息:张三调用了findAll方法...日志级别:info...";
            //8. 发送消息
            channel.basicPublish(exchangeName,"",null,body.getBytes());
    
            //9. 释放资源
            channel.close();
            connection.close();
        }
    }
    
    • 广播交换机-对应队列1的消费者代码
    public class Consumer1 {
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            String queue1Name = "test_fanout_queue1";
            Consumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("body:"+new String(body));
                    System.out.println("将日志信息打印到控制台.....");
                }
            };
            channel.basicConsume(queue1Name,true,consumer);
        }
    }
    
    • 广播交换机-对应队列2的消费者代码
        public class Consumer2 {
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            String queue2Name = "test_fanout_queue2";
            Consumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("body:"+new String(body));
                    System.out.println("将日志信息打印到控制台.....");
                }
            };
            channel.basicConsume(queue2Name,true,consumer);
        }
    }
    
原文地址:https://www.cnblogs.com/rbwbear/p/15557669.html