Springboot配置多数据源Rabbitmq

  在上一篇我们提到了如何在Springboot项目中搭建单个Rabbitmq,但是当我们遇到多数据源Rabbitmq的时候,需要怎么做呢?

  我们首先看application.yml文件

spring:
  rabbitmq:
    rabbitmq:
    cart-order:
      virtual-host: /
      host: localhost
      port: 5672
      username: guest
      password: guest
    order-adaptor:
      virtual-host: /
      host: localhost
      port: 5672
      username: test
      password: 123456  
    listener:
      simple:
        concurrency: 10
        max-concurrency: 20
        prefetch: 5

mq:
  env: test

cart:
  place:
    order:
      queue: ${mq.env}.cart.place.order.queue
      exchange: ${mq.env}.cart.place.order.exchange
      routing:
        key: ${mq.env}.cart.place.order.routing.key  

ticketing:
  place:
    order:
      queue: ${mq.env}.ticketing.place.order.queue
      exchange: ${mq.env}.ticketing.place.order.exchange
      routing:
        key: ${mq.env}.ticketing.place.order.routing.key  

然后主要修改的部分还是在RabbitmqConfig的修改

@Configuration
public class RabbitmqConfig {
    private static final Logger logger = LogManager.getLogger(RabbitmqConfig.class);

    @Autowired
    private Environment environment;

    @Autowired
    private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;

    @Autowired
    private ObjectMapper objectMapper;

    @Bean(name = "cartOrderCachingConnectionFactory")
    @Primary
    public CachingConnectionFactory cartOrderCachingConnectionFactory() {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setHost(environment.getProperty("spring.rabbitmq.cart-order.host"));
        cachingConnectionFactory.setPort(environment.getProperty("spring.rabbitmq.cart-order.port", int.class));
        cachingConnectionFactory.setUsername(environment.getProperty("spring.rabbitmq.cart-order.username"));
        cachingConnectionFactory.setPassword(environment.getProperty("spring.rabbitmq.cart-order.password"));
        cachingConnectionFactory.setVirtualHost(environment.getProperty("spring.rabbitmq.cart-order.virtual-host"));
        cachingConnectionFactory.setPublisherReturns(true);
        cachingConnectionFactory.setPublisherConfirms(true);
        return cachingConnectionFactory;
    }

    @Bean(name = "orderAdaptorCachingConnectionFactory")
    public CachingConnectionFactory orderAdaptorCachingConnectionFactory() {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setHost(environment.getProperty("spring.rabbitmq.order-adaptor.host"));
        cachingConnectionFactory.setPort(environment.getProperty("spring.rabbitmq.order-adaptor.port", int.class));
        cachingConnectionFactory.setUsername(environment.getProperty("spring.rabbitmq.order-adaptor.username"));
        cachingConnectionFactory.setPassword(environment.getProperty("spring.rabbitmq.order-adaptor.password"));
        cachingConnectionFactory.setVirtualHost(environment.getProperty("spring.rabbitmq.order-adaptor.virtual-host"));
        cachingConnectionFactory.setPublisherReturns(true);
        cachingConnectionFactory.setPublisherConfirms(true);
        return cachingConnectionFactory;
    }

    /**
     * singleton can't set multi times callback
     * @return
     */
    @Bean(name = "cartOrderRabbitTemplate")
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    @Primary
    public RabbitTemplate cartOrderRabbitTemplate(@Qualifier("cartOrderCachingConnectionFactory")CachingConnectionFactory cachingConnectionFactory) {

       RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
       rabbitTemplate.setMandatory(true);
       rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

       rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
           if (ack) {
               logger.info("cartOrder message send succeed:correlationData({}),ack({}),cause({})",correlationData, ack, cause);
           } else {
               logger.info("cartOrder message send failed:correlationData({}),ack({}),cause({})",correlationData, ack, cause);
           }
       });


       rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
           logger.info("cartOrder message lose:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
       });

       return rabbitTemplate;

    }

    /**
     * singleton can't set multi times callback
     * @return
     */
    @Bean(name = "orderAdaptorRabbitTemplate")
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate orderAdaptorRabbitTemplate(@Qualifier("orderAdaptorCachingConnectionFactory")CachingConnectionFactory cachingConnectionFactory) {

        RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        rabbitTemplate.setRoutingKey("");
        rabbitTemplate.setDefaultReceiveQueue("");

        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                logger.info("orderAdaptor message send succeed:correlationData({}),ack({}),cause({})",correlationData, ack, cause);
            } else {
                logger.info("orderAdaptor message send failed:correlationData({}),ack({}),cause({})",correlationData, ack, cause);
            }
        });


        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            logger.info("orderAdaptor message lose:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
        });

        return rabbitTemplate;

    }

    @Bean(name = "cartOrderSingleListenerContainer")
    @Primary
    public SimpleRabbitListenerContainerFactory cartOrderSingleListenerContainer(SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer,
                                                                                 @Qualifier("cartOrderCachingConnectionFactory")CachingConnectionFactory cachingConnectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(cachingConnectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setConcurrentConsumers(1);
        factory.setMaxConcurrentConsumers(1);
        factory.setPrefetchCount(1);
        factory.setTxSize(1);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

        return factory;
    }

    @Bean(name = "cartOrderMultiListenerContainer")
    @Primary
    public SimpleRabbitListenerContainerFactory cartOrderMultiListenerContainer(SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer,
                                                                                @Qualifier("cartOrderCachingConnectionFactory")CachingConnectionFactory cachingConnectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factoryConfigurer.configure(factory, cachingConnectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setConcurrentConsumers(environment.getProperty("spring.rabbitmq.listener.simple.concurrency", int.class));
        factory.setMaxConcurrentConsumers(environment.getProperty("spring.rabbitmq.listener.simple.max-concurrency", int.class));
        factory.setPrefetchCount(environment.getProperty("spring.rabbitmq.listener.simple.prefetch", int.class));

        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

        return factory;
    }

    @Bean(name = "orderAdaptorSingleListenerContainer")
    public SimpleRabbitListenerContainerFactory orderAdaptorSingleListenerContainer(@Qualifier("orderAdaptorCachingConnectionFactory")CachingConnectionFactory cachingConnectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(cachingConnectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setConcurrentConsumers(1);
        factory.setMaxConcurrentConsumers(1);
        factory.setPrefetchCount(1);
        factory.setTxSize(1);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

        return factory;
    }

    @Bean(name = "orderAdaptorMultiListenerContainer")
    public SimpleRabbitListenerContainerFactory orderAdaptorMultiListenerContainer(@Qualifier("orderAdaptorCachingConnectionFactory")CachingConnectionFactory cachingConnectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factoryConfigurer.configure(factory, cachingConnectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setConcurrentConsumers(environment.getProperty("spring.rabbitmq.listener.simple.concurrency", int.class));
        factory.setMaxConcurrentConsumers(environment.getProperty("spring.rabbitmq.listener.simple.max-concurrency", int.class));
        factory.setPrefetchCount(environment.getProperty("spring.rabbitmq.listener.simple.prefetch", int.class));

        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

        return factory;
    }

    /**
     * Message persistent: Set the deliveryMode of the message to 2 and the consumer can continue to consume
     * the messages after persistence after restarting;
     * Use convertAndSend to send a message. The message is persistent by default. The following is the source code:
     * new MessageProperties() --> DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT --> deliveryMode = 2;
     *
     * @param exchangeName
     * @param routingKeyName
     * @param content
     * @param flag
     * @param messageId
     * @param <T>
     */
    public <T> void sendMessage(RabbitTemplate rabbitTemplate, String exchangeName, String routingKeyName, T content, boolean flag, String messageId) {
        logger.info("message send :messageId({}), exchangeName({}), routingKeyName({}), content({}), flag({})",
                messageId, exchangeName, routingKeyName, content);

        CorrelationData correlationData = new CorrelationData();
        MessageProperties properties = new MessageProperties();
        properties.setContentType("application/json");

        try {
            if (flag) {
                properties.setCorrelationId(messageId);
                correlationData.setId(messageId);

                rabbitTemplate.convertAndSend(exchangeName,
                        routingKeyName,
                        MessageBuilder.withBody(objectMapper.writeValueAsBytes(content)).andProperties(properties).build(),
                        correlationData);
            } else {
                rabbitTemplate.convertAndSend(exchangeName,
                        routingKeyName,
                        MessageBuilder.withBody(objectMapper.writeValueAsBytes(content)).andProperties(properties).build(),
                        correlationData);
            }
        } catch (Exception e) {
            logger.error("error message :e.getMessage({})", e.getMessage());
        }

    }


}

从代码中就可以看到我们定义了多个连接,@Primary表示我们默认的连接。因为在单个数据源的情况下,我们可以使用默认的加载方式读取到mq的配置,但是多数据源的情况下我们就必须明确的指出每个bean对应的配置详情了。但是到这一步我们仍然会出现问题,那就是我们的queue和exchange的绑定仍然是默认的。单数据源的时候我们是默认绑定到单独的mq上,现在我们需要动态的分配它们的绑定信息,所以QueueConfig方式就不能满足我们的需求了,于是我采用了RabbitAdmin这个类动态的帮助我绑定queue和exchange到指定的mq上。

@Configuration
public class CartOrderRabbitAdminConfig {

    private static final Logger logger = LogManager.getLogger(CartOrderRabbitAdminConfig.class);

    @Autowired
    private Environment environment;

    @Bean
    public RabbitAdmin cartOrderRabbitAdmin(@Qualifier("cartOrderCachingConnectionFactory")CachingConnectionFactory cachingConnectionFactory) {

        RabbitAdmin rabbitAdmin = new RabbitAdmin(cachingConnectionFactory);
        rabbitAdmin.setAutoStartup(true);//place cart order
        Queue placeCartOrderQueue = RabbitmqUtil.createQueue(environment.getProperty("cart.place.order.queue"));
        DirectExchange placeCartOrderExchange = RabbitmqUtil.createExchange(environment.getProperty("cart.place.order.exchange"));
        Binding placeCartOrderBinding = RabbitmqUtil.createBinding(placeCartOrderQueue, placeCartOrderExchange, environment.getProperty("cart.place.order.routing.key"));
        RabbitmqUtil.createRabbitAdmin(placeCartOrderQueue, placeCartOrderExchange, placeCartOrderBinding, rabbitAdmin);

        //

        return rabbitAdmin;

    }
@Configuration
public class TicketOrderRabbitAdminConfig {

    private static final Logger logger = LogManager.getLogger(TicketOrderRabbitAdminConfig.class);

    @Autowired
    private Environment environment;

    @Bean
    public RabbitAdmin ticketOrderRabbitAdmin(@Qualifier("orderAdaptorCachingConnectionFactory")CachingConnectionFactory cachingConnectionFactory) {

        RabbitAdmin rabbitAdmin = new RabbitAdmin(cachingConnectionFactory);
        rabbitAdmin.setAutoStartup(true);

        //place ticket order
        Queue createCartOrderQueue = RabbitmqUtil.createQueue(environment.getProperty("ticketing.place.order.queue"));
        DirectExchange createCartOrderExchange = RabbitmqUtil.createExchange(environment.getProperty("ticketing.place.order.exchange"));
        Binding createCartOrderBinding = RabbitmqUtil.createBinding(createCartOrderQueue, createCartOrderExchange, environment.getProperty("ticketing.place.order.routing.key"));
        RabbitmqUtil.createRabbitAdmin(createCartOrderQueue, createCartOrderExchange, createCartOrderBinding, rabbitAdmin);

        return rabbitAdmin;

    }



}
public class RabbitmqUtil {

    public static DirectExchange createExchange (String exchangeName) {
        if(StringUtils.isNotBlank(exchangeName)) {
            return new DirectExchange(exchangeName, true, false);
        }

        return null;

    }

    public static Queue createQueue(String queueName) {
        if(StringUtils.isNotBlank(queueName)) {
            return new Queue(queueName, true);
        }

        return null;
    }

    public static Binding createBinding (Queue queueName, DirectExchange exchangeName, String routingKeyName) {
        if(Objects.nonNull(queueName) && Objects.nonNull(exchangeName) && StringUtils.isNotBlank(routingKeyName)) {
            return BindingBuilder.bind(queueName).to(exchangeName).with(routingKeyName);
        }
        return null;
    }

    public static void createRabbitAdmin(Queue queue, DirectExchange exchange, Binding binding, RabbitAdmin rabbitAdmin) {
        rabbitAdmin.declareQueue(queue);
        rabbitAdmin.declareExchange(exchange);
        rabbitAdmin.declareBinding(binding);
    }
}

如上,我们就可以就可以动态绑定我们queue和exchange到目标mq上了,生产者和消费者和单数据源的情况下没有很大的区别。

原文地址:https://www.cnblogs.com/daishoucheng/p/12022451.html