Java之RabbitMQ(二)多mq配置

场景:

springboot单项目,自身使用mq中间件处理一些业务需求,某些业务上又需要消费第三方mq消息,这时候需要我们单项目中配置多套mq,这时候,需要我们自定义多套mq相关连接工厂、模板、监听工厂、管理等流程,具体实现,参见如下:

实现:

1.配置文件:application.yml

spring:
  rabbitmq:
    first:
      host: localhost
      port: 5672
      username: guest
      password: guest
      virtualHost: /channel-demo
    second:
      host: 100.100.100.184
      port: 5672
      username: guest
      password: guest
      virtualHost: /xxx-test
    

2.配置类:

FirstRabbitConfig配置类:
package com.xxx.channe.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

/**
 * <p>
 * </p>
 *
 * @author mumu
 * @since 2019/12/6
 */
@Configuration
@ConfigurationProperties("spring.rabbitmq.first")
public class FirstRabbitConfig extends AbstractRabbitConfig {
    @Bean(name = "firstConnectionFactory")
    @Primary
    public ConnectionFactory firstConnectionFactory() {
        return super.connectionFactory();
    }

    @Bean(name = "firstRabbitTemplate")
    @Primary
    public RabbitTemplate rabbitTemplate(@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
    }

    @Bean(name = "firstFactory")
    public SimpleRabbitListenerContainerFactory firstFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
                                                             @Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        return factory;
    }

    @Bean(value = "firstRabbitAdmin")
    public RabbitAdmin firstRabbitAdmin(@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
//        rabbitAdmin.declareExchange(firstDirectExchange());
//        rabbitAdmin.declareQueue(firstQueue());
//        rabbitAdmin.declareBinding(firstBinding());
        return rabbitAdmin;
    }

    @Bean
    public DirectExchange firstDirectExchange() {
        return new DirectExchange("first-direct-exchange");
    }

    @Bean
    public Queue firstQueue() {
        return new Queue("first-queue");
    }

    @Bean
    public Binding firstBinding() {
        return BindingBuilder.bind(firstQueue()).to(firstDirectExchange()).with("first-routing-key");
    }
}
SecondRabbitConfig配置类:
package com.xxx.channe.config;

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * <p>
 * </p>
 *
 * @author mumu
 * @since 2019/12/6
 */
@Configuration
@ConfigurationProperties("spring.rabbitmq.second")
public class SecondRabbitConfig extends AbstractRabbitConfig {

    @Bean(name = "secondConnectionFactory")
    public ConnectionFactory secondConnectionFactory() {
        return super.connectionFactory();
    }

    @Bean(name = "secondRabbitTemplate")
    public RabbitTemplate secondRabbitTemplate(@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
    }

    @Bean(name = "secondFactory")
    public SimpleRabbitListenerContainerFactory secondFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
                                                              @Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        return factory;
    }

    @Bean(value = "secondRabbitAdmin")
    public RabbitAdmin secondRabbitAdmin(@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

//    @Bean
//    public DirectExchange secondDirectExchange() {
//        return new DirectExchange("second-direct-exchange");
//    }
//
//    @Bean
//    public Queue secondQueue() {
//        return new Queue("second-queue");
//    }
//
//    @Bean
//    public Binding secondBinding() {
//        return BindingBuilder.bind(secondQueue()).to(secondDirectExchange()).with("second-routing-key");
//    }
}

3.生产者:

package com.xxx.channe.service.impl;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

/**
 * <p>
 * </p>
 *
 * @author mumu
 * @since 2019/12/6
 */
@Slf4j
@Component
public class MqTest {

    @Autowired
    @Qualifier(value = "firstRabbitTemplate")
    private RabbitTemplate firstRabbitTemplate;

  
    public void send() {
        firstRabbitTemplate.convertAndSend("first-direct-exchange", "first-routing-key", "我是 first。。。");
        log.info("消息发送成功...");
    }

}

4.消费者:

package com.xxx.channe.service.impl;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

/**
 * <p>
 * </p>
 *
 * @author mumu
 * @since 2019/12/6
 */
@Slf4j
@Component
public class MqTest {@RabbitListener(queues = {"first-queue"}, containerFactory = "firstFactory")
    @RabbitHandler
    public void firstTest() {
        System.out.println("hello, first mq收到消息");
    }

}

未完待续...

原文地址:https://www.cnblogs.com/kobe-lin/p/12009459.html