RabbitMQ交换机

RabbitMQ中,生产者并不是直接将消息发送给queue,而是先将消息发送给exchange,再由exchange通过不同的路由规则将消息路由到绑定的队列中进行存储,那么为什么要先将消息发送给exchange而不是直接发送给queue呢?

理解Exchange

为什么要在生产者和queue之间多一个exchange呢?

我们知道RabbitMQ是AMQP协议的一个实现,生产者和消费者解耦合是AMQP协议的核心思想,生产者不需要知道消息被发送到哪些队列中,只需要将消息发送到exchange即可。先由exchange来接收生产者发送的消息,然后exchange按照routing-key和Binding规则将消息路由到对应的队列中,exchange就相当于一个分发消息的交换机。

在这种模式下,生产者只面向exhange,exchange根据routing-key和binding路由消息到queue,消费者只面向对应的queue,以此来将消息传递的各个层面拆分开,从而降低整体的耦合度。

理解Routing-Key和Binding

exchange收到生产者发送的消息后,如何路由到queue呢,此时就需要用到routing-key和binding。

binding:exchange和queue之间的关系,也就是说使用binding关联的队列只对当前交换机上消息感兴趣。

routing-key:在绑定exchange和queue时可以添加routing-key,routing-key是一个消息的一个属性,这个属性决定了交换机如何将消息路由到队列。

可以说,binding和routing-key一起决定了exchange将消息路由到哪些队列中,当然路由的算法还取决于exchange的类型。

Exchange类型

exchange主要有以下几种分类: fanout exchange、direct exchange、topic exchange、headers exchange,我们主要介绍前面三种交换机。

Fanout Exchange

fanout exchange也可以叫做扇形交换机,示意图如下:

特点:

发布消息时routing-key被忽略
生产者发送到exchange中的消息会被路由到所有绑定的队列中

由于扇形交换机会将消息路由给所有绑定的队列的特性,扇形交换机是作为广播路由的理想选择。

应用场景:

对同样的消息做不同的操作,比如同样的数据,既要存数据库,又要存储到磁盘。

代码示例:

  • 生产者发送消息到交换机
@Service
public class Producer {

    @Value("${platform.exchange-name}")
    private String exchangeName;

    @Resource
    private RabbitTemplate rabbitTemplate;

    public void publishMessage(){
        for(int i = 0; i < 100; i++){
            rabbitTemplate.convertAndSend(exchangeName,"","发布消息========>"+i);
        }
    }
}

convertAndSend方法的第二个参数就是routing-key,此时设置为空字符串即可。

  • 消费端声明队列、交换机以及绑定
@Configuration
public class ConsumerConfig {
    /**
     * 交换机名称
     */
    @Value("${platform.exchange-name}")
    private String exchangeName;

    /**
     * 消费者队列名称(指定队列)
     */
    @Value("${platform.consumer-queue-name}")
    private String queueName;

    /**
     * 声明持久化队列
     * @return
     */
    @Bean
    public Queue consumerQueue(){
        return  new Queue(queueName,true);
    }

    /**
     * 声明扇形交换机
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange(exchangeName);
    }

    /**
     * 声明队列和交换机的绑定
     * @param queue
     * @param myexchange
     * @return
     */
    @Bean
    public Binding binding(Queue queue, FanoutExchange myexchange) {
        return BindingBuilder.bind(queue).to(myexchange);
    }
}

上述声明中,篇幅所限只声明了一个队列,生产使用时可以声明多个队列,并且和交换机进行绑定。

声明完队列、交换机以及绑定之后就可以启动生产者和消费者发送消息,此时就可以看到同样的消息发送到了多个绑定的队列中。

具体代码可以参考码云fanout生产者fanout消费者

Direct Exchange

直连交换机,RabbitMQ默认的交换机就是直连交换机,示意图如下所示:

特点:

生产者发布消息时必须带着routing-key,队列绑定到交换机时必须指定binding-key ,且routing-key和binding-key必须完全相同,如此才能将消息路由到队列中。

应用场景:

直连交换机通常用来循环分发任务给多个workers,例如在一个日志处理系统中,一个worker处理error级别日志,另外一个worker用来处理info级别的日志,此时生产者只需要在发送时指定特定的routing-key即可,绑定队列时binding-key只需要和routing-key保持一致即可接收到特定的消息。

代码实现:

  • 生产者发送消息:
@Service
public class Producer {

    @Value("${platform.exchange-name}")
    private String exchangeName;

    @Resource
    private RabbitTemplate rabbitTemplate;

    public void publishMessage(){
        for(int i = 0; i < 100; i++){
            rabbitTemplate.convertAndSend(exchangeName,"log.error","发布到绑定routing-key是log.error的队列"+i);
        }

        for (int i = 100; i < 200; i++) {
            rabbitTemplate.convertAndSend(exchangeName,"log.debug","发布到绑定routing-key是log.debug的队列"+i);
        }
    }
}
  • 声明队列、交换机以及绑定:
@Configuration
public class ConsumerConfig {
    /**
     * 交换机名称
     */
    @Value("${platform.exchange-name}")
    private String exchangeName;

    /**
     * 主题名称
     */
    @Value("${platform.exchange-routing-key}")
    private String bindingKey;

    /**
     * 消费者队列名称(指定队列)
     */
    @Value("${platform.consumer-queue-name}")
    private String queueName;

    @Bean
    public Queue consumerQueue(){
        return  new Queue(queueName,true);
    }

    /**
     * 声明直连交换机
     * @return
     */
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(exchangeName);
    }

    /**
     * 绑定队列到直连交换机
     * @param queue 队列
     * @param myexchange 直连交换机
     * @return
     */
    @Bean
    public Binding binding(Queue queue, DirectExchange myexchange) {
        return BindingBuilder.bind(queue).to(myexchange).with(bindingKey);
    }
}

使用不同的binding-key绑定队列到直连交换机,发送消息时只需要指定对应的routing-key就可以将消息发送到对应的队列中,此时启动生产者和消费者,发送消息后就可以看到不同的数据进入了对应的队列中,更多代码请参考码云direct生产者direct消费者

扩展:

前面说到RabbitMQ使用的默认交换机是直连交换机,此处我们从源码上来确认一下,代码入口如下所示:

rabbitTemplate.convertAndSend(queueName,"消息"+i);

点进convertAndSend方法后可以看到如下所示的代码:

@Override
public void convertAndSend(String routingKey, final Object object) throws AmqpException {
	convertAndSend(this.exchange, routingKey, object, (CorrelationData) null);
}

可以看到此处给了一个exchange参数,在当前类中可以找到这个exchange参数对应的声明:

private String exchange = DEFAULT_EXCHANGE;

/** Alias for amq.direct default exchange. */
private static final String DEFAULT_EXCHANGE = "";

从DEFAULT_EXCHANGE的注释可以看出来默认的交换机是直连交换机。

默认交换机中的routing-key是队列的名称,当队列没有明确指定绑定到某个交换机上时,默认会以队列名称作为binding-key绑定到默认交换机上,因为发送消息时的routing-key是队列名称,队列绑定默认交换机时的binding-key也是队列名称,因此默认交换机会将消息路由到对应的队列中。

Topic Exchange

主题交换机,一种支持灵活配置routing-key的交换机,示意图如下所示:

特点:

routing-key必须由多个单词或者通配符组成,单词或者通配符之间使用.隔开,上限为255个字节;
*通配符只能匹配一个单词;
#通配符可以匹配零个或者多个单词;
队列绑定交换机时的binding-key要能够匹配发送消息时的routing-key才能将消息路由到对应的队列;
根据routing-key和binding-key的匹配情况,消息可能进入单个队列,也可能进入多个队列,也可能丢失;
主题队列的routing-key设置为#时,表示所有所有的队列都可以接收到消息,相当于fanout交换机;
主题队列的routing-key中不包含#或者*时,表示指定队列可以接收到消息,相当于direct交换机;

匹配例子:

routing-key binding-key 是否匹配
*.orange.* quick.orange.rabbit true
*.orange.* quick.red.rabbit false
*.*.rabbit quick.red.rabbit true
*.*.rabbit a.quick.red.rabbit false
lazy.# lazy.red.rabbit true
lazy.# lazy.red.rabbit.a.b true

应用场景:

由多个workers完成的后台任务,每个worker负责处理特定的任务;
涉及分类或者标签的数据处理;
云端不同种类服务的协调;

代码实现:

  • 生产者发送数据:
@Service
public class Producer {

    @Value("${platform.exchange-name}")
    private String exchangeName;

    @Resource
    private RabbitTemplate rabbitTemplate;

    public void publishMessage(){
        for(int i = 0; i < 100; i++){
            if(i%2==0){
                rabbitTemplate.convertAndSend(exchangeName,"gz.log.error","消息==>"+i);
            }else{
                rabbitTemplate.convertAndSend(exchangeName,"zj.log.info.a","消息==>"+i);
            }
        }
    }
}
  • 声明队列、交换机以及绑定:
@Configuration
public class ConsumerConfig {

    @Value("${platform.exchange-name}")
    private String exchangeName;

    @Value("${platform.consumer-queue-name}")
    private String queueName;

    /**
     * gz.*.* | *.log.#
     */
    @Value("${platform.exchange-routing-key}")
    private String bindingKey;

    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(exchangeName);
    }

    @Bean
    public Queue consumerQueue(){
        return  new Queue(queueName,true);
    }

    @Bean
    public Binding binding(Queue queue, TopicExchange topicExchange){
        return BindingBuilder.bind(queue).to(topicExchange).with(bindingKey);
    }
}

上述声明完成以后,可以在rabbitmq的管理页面查看到如下所示的结果:

生产者设置的routing-key是gz.log.error和zj.log.info.a,两个队列的binding-key分别为gz.*.* 和*.log.#,gz.*.* 只能匹配gz.log.error,*.log.#可以匹配两个routing-key,因此绑定的两个队列,一个可以获取到全部数据,一个只能获取到部分数据,结果如下:

具体代码实现参考码云topic生产者topic消费者

总结

上面主要介绍三种类型的交换机,fanout交换机忽略routing-key,可以将消息发送到所有绑定的队列中,direct交换机需要指定routing-key,且必须和binding-key完全一致才可以发送消息到绑定队列中,最灵活的则为topic交换机,可以通过通配符的方式进行匹配,根据匹配结果将消息发送到不同队列中,其实还有header交换机,不过应用较少且本人也未进行研究过,此处忽略不记。

原文地址:https://www.cnblogs.com/ybyn/p/13690991.html