RabbitMQ不讲武德,发个消息也这么多花招

 

前言

本篇博客已被收录GitHub:https://zhouwenxing.github.io/
文中所涉及的源码也已被收录GitHub:https://github.com/zhouwenxing/lonely-wolf-note (message-queue模块)

使用消息队列必须要保证生产者发送的消息能被消费者所接收,那么生产者如何接收消息呢?下图是 RabbitMQ 的工作模型:

上图中生产者会将消息发送到交换机 Exchange 上,再由 Exchange 发送给不同的 Queue ,而 Queue 是用来存储消息队列,那么假如有多个生产者,那么消息发送到交换机 Exchange 之后,应该如何和 Queue 之间建立绑定关系呢?

如何使用 RabbitMQ 发送消息

RabbitMQ 中提供了3种发送消息的路由方式。

直连 Direct 模式

通过指定一个精确的绑定键来实现 Exchange(交换机) 和 Queue(消息队列) 之间的绑定,也就是说,当创建了一个直连类型的交换机时,生产者在发送消息时携带的路由键(routing key),必须与某个绑定键(binding key)完全匹配时,这条消息才会从交换机路由到满足路由关系消息队列上,然后消费者根据各自监听的队列就可以获取到消息(如下如吐所示,Queue1 绑定了 order ,那么这时候发送消息的路由键必须为 order 才能分配到 Queue1 上):

主题 Topic 模式

Direct 模式会存在一定的局限性,有时候我们需要按类型划分,比如订单类路由到一个队列,产品类路由到另一个队列,所以在 RabbitMQ 中,提供了主题模式来实现模糊匹配。使用主题类型连接方式支持两种通配符:

直连方式只能精确匹配,有时候我们需要实现模糊匹配,那么这时候就需要主题类型的连接方式,在 RabbitMQ 中,使用主题类型连接方式支持两种通配符:

  • :表示 0 个或者多个单词

  • *:表示 1 个单词

PS:使用通配符时,单词指的是用英文符号的小数点 . 隔开的字符,如:abc.def 就表示有 abcdef 两个单词。

下图所示中,因为 Queue1 绑定了 order.#,所以当发送消息的路由键为 order 或者 order.xxx时都可以使得消息分配到 Queue1 上:

广播 Fanout 模式

当我们定义了一个广播类型的交换机时就不需要指定绑定键,而且生产者发送消息到交换机上时,也不需要携带路由键,此时当消息到达交换机时,所有与其绑定的队列都会收到消息,这种模式的消息发送适用于消息通知类需求。

如下如所示,Queue1Queue2Queue3 三个队列都绑定到了一个 Fanout 交换机上,那么当 Fanout Exchange 收到消息时,会同时将消息发送给三个队列:

RabbitMQ 提供的后台管理系统中也能查询到创建的交换机和队列等信息,并且可以通过管理后台直接创建队列和交换机:

消息发送实战

下面通过一个 SpringBoot 例子来体会一下三种发送消息的方式。

  • 1、application.yml 文件中添加如下配置:
spring:
  rabbitmq:
    host: ip
    port: 5672
    username: admin
    password: 123456
  • 2、新增一个 RabbitConfig 配置类(为了节省篇幅省略了包名和导入 ),此类中声明了三个交换机和三个队列,并分别进行绑定:
@Configuration
public class RabbitConfig {
    //直连交换机
    @Bean("directExchange")
    public DirectExchange directExchange(){
        return new DirectExchange("LONGLY_WOLF_DIRECT_EXCHANGE");
    }

    //主题交换机
    @Bean("topicExchange")
    public TopicExchange topicExchange(){
        return new TopicExchange("LONGLY_WOLF_TOPIC_EXCHANGE");
    }

    //广播交换机
    @Bean("fanoutExchange")
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("LONGLY_WOLF_FANOUT_EXCHANGE");
    }


    @Bean("orderQueue")
    public Queue orderQueue(){
        return new Queue("LONGLY_WOLF_ORDER_QUEUE");
    }

    @Bean("userQueue")
    public Queue userQueue(){
        return new Queue("LONGLY_WOLF_USER_QUEUE");
    }

    @Bean("productQueue")
    public Queue productQueue(){
        return new Queue("LONGLY_WOLF_PRODUCT_QUEUE");
    }

    //Direct交换机和orderQueue绑定,绑定键为:order.detail
    @Bean
    public Binding bindDirectExchange(@Qualifier("orderQueue") Queue queue, @Qualifier("directExchange") DirectExchange directExchange){
        return BindingBuilder.bind(queue).to(directExchange).with("order.detail");
    }

    //Topic交换机和userQueue绑定,绑定键为:user.#
    @Bean
    public Binding bindTopicExchange(@Qualifier("userQueue") Queue queue, @Qualifier("topicExchange") TopicExchange topicExchange){
        return BindingBuilder.bind(queue).to(topicExchange).with("user.#");
    }

    //Fanout交换机和productQueue绑定
    @Bean
    public Binding bindFanoutExchange(@Qualifier("productQueue") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange){
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
}
  • 3、新建一个消费者 ExchangeConsumer 类,不同的方法实现分别监听不同的队列:
@Component
public class ExchangeConsumer {

    /**
     * 监听绑定了direct交换机的的消息队列
     */
    @RabbitHandler
    @RabbitListener(queues = "LONGLY_WOLF_ORDER_QUEUE")
    public void directConsumer(String msg){
        System.out.println("direct交换机收到消息:" + msg);
    }

    /**
     * 监听绑定了topic交换机的的消息队列
     */
    @RabbitHandler
    @RabbitListener(queues = "LONGLY_WOLF_USER_QUEUE")
    public void topicConsumer(String msg){
        System.out.println("topic交换机收到消息:" + msg);
    }

    /**
     * 监听绑定了fanout交换机的的消息队列
     */
    @RabbitHandler
    @RabbitListener(queues = "LONGLY_WOLF_PRODUCT_QUEUE")
    public void fanoutConsumer(String msg){
        System.out.println("fanout交换机收到消息:" + msg);
    }
}
  • 4、新增一个 RabbitExchangeController 类来作为生产者,进行消息发送:
@RestController
@RequestMapping("/exchange")
public class RabbitExchangeController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping(value="/send/direct")
    public String sendDirect(String routingKey,@RequestParam(value = "msg",defaultValue = "no direct message") String msg){
        rabbitTemplate.convertAndSend("LONGLY_WOLF_DIRECT_EXCHANGE",routingKey,msg);
        return "succ";
    }
    @GetMapping(value="/send/topic")
    public String sendTopic(String routingKey,@RequestParam(value = "msg",defaultValue = "no topic message") String msg){
        rabbitTemplate.convertAndSend("LONGLY_WOLF_TOPIC_EXCHANGE",routingKey,msg);
        return "succ";
    }
    @GetMapping(value="/send/fanout")
    public String sendFaout(String routingKey,@RequestParam(value = "msg",defaultValue = "no faout message") String msg){
        rabbitTemplate.convertAndSend("LONGLY_WOLF_FANOUT_EXCHANGE",routingKey,msg);
        return "succ";
    }
}

  • 5、启动服务,当我们调用第一个接口时候,路由键和绑定键 order.detail 精确匹配时,directConsumer 就会收到消息,同样的,调用第二接口时,路由键满足 user.# 时,topicConsumer 就会收到消息,而只要调用第三个接口,不论是否指定路由键,fanoutConsumer 都会收到消息。

消息过期了怎么办

简单的发送消息我们学会了,难道这就能让我们就此止步了吗?显然是不能的,要玩就要玩高级点,所以接下来让我们给消息加点佐料。

原文地址:https://www.cnblogs.com/wjxzs/p/14236557.html