【rabbitmq】之Exchange

rabbitmq常用Exchange有3个,Direct,Topic,Fanout

全局配置文件

spring.rabbitmq.host=dev-mq.ttsingops.com
spring.rabbitmq.port=5672
spring.rabbitmq.username=xxx
spring.rabbitmq.password=xxxxx
spring.rabbitmq.virtual-host=/cd

三个完整交换机配置

@Configuration
public class RabbitMQExchangeConfig {


    /**
     * direct sexchange 点对点  完全匹配
     */
    public static final String DIRECT_EXCHANGE = "direct_exchange";

    /**
     * topic_exchage 点对点 规则匹配
     */
    public static final String TOPIC_EXCHANGE = "topic_exchange";

    /**
     * fanout_exchage 广播
     */
    public static final String FANOUT_EXCHANGE = "fanout_exchange";


    @Bean("directExchange")
    public DirectExchange directExchange() {
        return new DirectExchange(DIRECT_EXCHANGE, true, false);
    }

    @Bean("topicExchange")
    public TopicExchange topicExchange() {
        return new TopicExchange(TOPIC_EXCHANGE, true, false);
    }

    @Bean("fanoutExchange")
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(FANOUT_EXCHANGE, true, false);
    }

}

RabbitmqTemplate配置

@Configuration
public class RabbitMQConfig {


    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    /**
     *
     * @param connectionFactory
     * @return
     */
    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //Mandatory为true时,消息通过交换器无法匹配到队列会返回给生产者,为false时匹配不到会直接被丢弃
         rabbitTemplate.setMandatory(true);
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }

}

DirectExchange

可以理解为发布/订阅,点对点的一种交换机,A发消息,B消费消息。是一种完全匹配的交换机

配置DirectExchange  绑定direct_queue 绑定direct_queue_key

/********************direct************************/
    @Bean("directQueue")
    public Queue directQueue() {
        return new Queue("direct_queue", true, false, false);
    }

    @Bean("directBind")
    public Binding directBind(@Autowired @Qualifier("directExchange") DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue()).to(directExchange).with("direct_queue_key");
    }

发送DirectMQ消息

@GetMapping("direct")
    public String direct(String direct) {
        rabbitTemplate.convertAndSend(RabbitMQExchangeConfig.DIRECT_EXCHANGE, "direct_queue_key", direct);
        return "ok";
    }

监听direct_queue消息

@RabbitListener(queues = {"#{directQueue.name}"})
    public void directQueue(@Header(AmqpHeaders.CHANNEL) Channel channel, String msg, Message message) throws Exception {

        log.info("msg:{},mq.message:{}", msg, message.toString());

    }

image

TopicExchange

TopicExchange配置

/********************topic1,************************/
    @Bean("topicQueue")
    public Queue topicQueue() {
        return new Queue("topic_queue", true, false, false);
    }
    /**
     *  * 是单个匹配
     * @param topicExchange
     * @return
     */
    @Bean("topicBind")
    public Binding topicBind(@Autowired @Qualifier("topicExchange") TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueue()).to(topicExchange).with("*.queue.key");
    }

    /********************topic2************************/
    @Bean("topic2Queue")
    public Queue topic2Queue() {
        return new Queue("topic_queue", true, false, false);
    }

    /**
     *  #可以多个匹配
     * @param topicExchange
     * @return
     */
    @Bean("topic2Bind")
    public Binding topic2Bind(@Autowired @Qualifier("topicExchange") TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueue()).to(topicExchange).with("#.queue.key");
    }

发送TopicExchange消息

@GetMapping("topic")
    public String topic(String topic) {
        //* 单个匹配
        rabbitTemplate.convertAndSend(RabbitMQExchangeConfig.TOPIC_EXCHANGE, "order.queue.key", topic);
        rabbitTemplate.convertAndSend(RabbitMQExchangeConfig.TOPIC_EXCHANGE, "bill.queue.key", topic);
        //# 多个匹配
        rabbitTemplate.convertAndSend(RabbitMQExchangeConfig.TOPIC_EXCHANGE, "order.1.queue.key", topic);
        rabbitTemplate.convertAndSend(RabbitMQExchangeConfig.TOPIC_EXCHANGE, "bill.1.queue.key", topic);
        return "ok";
    }

监听TopicExchange消息

@RabbitListener(queues = {"#{topicQueue.name}"})
    public void topicQueue(@Header(AmqpHeaders.CHANNEL) Channel channel, String msg, Message message) throws Exception {
        log.info("topicQueue,msg:{},mq.message:{}", msg, message.toString());
    }


    @RabbitListener(queues = {"#{topic2Queue.name}"})
    public void topic2Queue(@Header(AmqpHeaders.CHANNEL) Channel channel, String msg, Message message) throws Exception {
        log.info("topic2Queue,msg:{},mq.message:{}", msg, message.toString());

    }

image

FanoutExchange

/********************fanout************************/
    @Bean("fanoutQueue")
    public Queue fanoutQueue() {
        return new Queue("fanout_queue", true, false, false);
    }

    @Bean("fanoutBind")
    public Binding fanoutBind(@Autowired @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange);
    }
@GetMapping("fanout")
    public String fanout(String fanout) {
        rabbitTemplate.convertAndSend(RabbitMQExchangeConfig.FANOUT_EXCHANGE,"",fanout);
        return "ok";
    }
@RabbitListener(queues = {"#{fanoutQueue.name}"})
    public void fanoutQueue(@Header(AmqpHeaders.CHANNEL) Channel channel, String msg, Message message) throws Exception {

        log.info("msg:{},mq.message:{}", msg, message.toString());


    }

image

以上就是Rabbitmq三个常用Exchange的基本用法。

原文地址:https://www.cnblogs.com/gyjx2016/p/13622097.html