SpringBoot整合RabbitMQ之Fanout扇形交换器

接着前面的Topic交换器,我们来研究一下Fanout交换器。

Fanout交换器:

扇形模式,不处理路由键,只是简单将队列绑定到交换器,发送到交换器的每条消息都回被转发到该交换器绑定的所有队列中,类型子网传播,通过Fanout交换器转发消息是最快的

现在进入实例工程来了解一下FanoutExchange扇形交换器的用法,还是基于上一篇的实例工程上去做编码

首先编写rabbitmq-provider生产者项目

创建FanoutRabbitConfig配置类

package com.rabbitmq.provider.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class FanoutRabbitConfig {

    /**
     *  创建三个队列 :fanout.A   fanout.B  fanout.C
     *  将三个队列都绑定在交换机 fanoutExchange 上
     *  因为是扇型交换机, 路由键无需配置,配置也不起作用
     */

    @Bean
    Queue queueA() {
        return new Queue("fanout.A");
    }

    @Bean
    Queue queueB() {
        return new Queue("fanout.B");
    }

    @Bean
    Queue queueC() {
        return new Queue("fanout.C");
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    @Bean
    Binding bindingExchangeA() {
        return BindingBuilder.bind(queueA()).to(fanoutExchange());
    }

    @Bean
    Binding bindingExchangeB() {
        return BindingBuilder.bind(queueB()).to(fanoutExchange());
    }

    @Bean
    Binding bindingExchangeC() {
        return BindingBuilder.bind(queueC()).to(fanoutExchange());
    }
}

在Controller上多加一个接口

    @GetMapping("/sendFanoutMessage")
    public String sendFanoutMessage(){
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "message: test fanoutMessage";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String,Object> map = new HashMap<>();
        map.put("messageId",messageId);
        map.put("messageData",messageData);
        map.put("createTime",createTime);
        rabbitTemplate.convertAndSend("fanoutExchange",null,map);
        return "ok";
    }

好了,生产者的代码就完成了,现在去rabbitmq-consumer消费者项目上完成后续代码

创建FanoutRabbitConfig配置类 

package com.rabbitmq.consumer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class FanoutRabbitConfig {
    /**
     *  创建三个队列 :fanout.A   fanout.B  fanout.C
     *  将三个队列都绑定在交换机 fanoutExchange 上
     *  因为是扇型交换机, 路由键无需配置,配置也不起作用
     */

    @Bean
    Queue queueA() {
        return new Queue("fanout.A");
    }

    @Bean
    Queue queueB() {
        return new Queue("fanout.B");
    }

    @Bean
    Queue queueC() {
        return new Queue("fanout.C");
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    @Bean
    Binding bindingExchangeA() {
        return BindingBuilder.bind(queueA()).to(fanoutExchange());
    }

    @Bean
    Binding bindingExchangeB() {
        return BindingBuilder.bind(queueB()).to(fanoutExchange());
    }

    @Bean
    Binding bindingExchangeC() {
        return BindingBuilder.bind(queueC()).to(fanoutExchange());
    }
}

创建FanoutReceiverA、FanoutReceiverB、FanoutReceiverC

package com.rabbitmq.consumer.receiver;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;


@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {

    @RabbitHandler
    public void process(Map message) {
        System.out.println("FanoutReceiverA 消费者收到消息: " + message);
    }
}
package com.rabbitmq.consumer.receiver;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;


@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {

    @RabbitHandler
    public void process(Map message) {
        System.out.println("FanoutReceiverB 消费者收到消息: " + message);
    }
}
package com.rabbitmq.consumer.receiver;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;


@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC {

    @RabbitHandler
    public void process(Map message) {
        System.out.println("FanoutReceiverC 消费者收到消息: " + message);
    }
}

启动这两个项目,使用postman工具调用sendFanoutMessage这个接口

可以看到只要发送到 fanoutExchange 这个扇型交换机的消息, 三个队列都绑定这个交换机,所以三个消息接收类都监听到了这条消息。

原文地址:https://www.cnblogs.com/liweixml/p/14662089.html