SpringAMQP 发布订阅--广播 Fanout

实现思路如下:
在consumer服务中,利用代码声明队列、交换机,并将两者绑定
在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
在publisher中编写测试方法,向fanout发送消息

在consumer服务声明FanoutExchange、Queue、Binding

@Configuration
public class FanoutConfig {

    // 声明FanoutExchange交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanout");
    }

    // 声明第1个队列
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    //绑 定队列1和交换机
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }


    // 声明第2个队列
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    //绑 定队列2和交换机
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}
View Code

在consumer服务声明两个消费者

    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg) throws InterruptedException {
        System.out.println("listenFanoutQueue1 消费者接收到消息 :【" + msg + "】");
    }

    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg) throws InterruptedException {
        System.err.println("listenFanoutQueue2 消费者接收到消息 :【" + msg + "】");
    }

publisher服务发送消息到FanoutExchange

    @Test
    public void testFanoutQueue() throws InterruptedException {
        String queueName = "marw.fanout";
        String message = "hello, fanout queue message";
        rabbitTemplate.convertAndSend(queueName, "", message);
    }

交换机的作用

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • 不能缓存消息,路由失败,消息丢失
  • FanoutExchange的会将消息路由到每个绑定的队列
原文地址:https://www.cnblogs.com/WarBlog/p/15476656.html