RabbitMQ消息队列+spring监听mq服务器多个ip,接收消费mq消息

背景

最近公司新启动一个新平台的项目,需要配置多个RabbitMQ?下面就是在Spring Boot配置多个RabbitMQ的例子。是自己摸索搭建的,已经成功上线了,有其他好的实现方法的网友可以互相交流一下。

项目结构

 引入maven依赖

    <parent>
        <artifactId>spring-boot-starter-parent</artifactId>
        <groupId>org.springframework.boot</groupId>
        <version>2.3.10.RELEASE</version>
    </parent>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>

 常量类

public class RabbitaMqConstant {

    /**队列属性**/
    public final static String HOST_ONE = "81.69.230.42";
    public final static Integer PORT_ONE = 5673;
    public final static String USERNAME_ONE = "guest";
    public final static String PASSWORD_ONE = "guest";
    public final static String VIRTUALHOST_ONE = "/";
    public final static String QUEUE_ONE_NAME = "queue_one_name";
    public final static String QUEUE_ONE_KEY = "queueOneKey";


    public final static String HOST_TWO = "120.53.104.163";
    public final static Integer PORT_TWO = 5672;
    public final static String USERNAME_TWO = "admin";
    public final static String PASSWORD_TWO = "admin";
    public final static String VIRTUALHOST_TWO = "/";
    public final static String QUEUE_TWO_NAME = "queue_two_name";
    public final static String QUEUE_TWO_KEY = "queueTwoKey";

}

属性类

@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class RabbitProperties {
    private String host;
    private Integer port;
    private String username;
    private String password;
    private String virtualHost;
    private String queueName;

    /**
     * 队列属性
     */
    public static Map<String, RabbitProperties> MULTI_MQPROPERTIES_MAP = new HashMap<String, RabbitProperties>() {
        {

            put(RabbitaMqConstant.QUEUE_ONE_KEY, RabbitProperties.builder()
                    .host(RabbitaMqConstant.HOST_ONE)
                    .port(RabbitaMqConstant.PORT_ONE)
                    .username(RabbitaMqConstant.USERNAME_ONE)
                    .password(RabbitaMqConstant.PASSWORD_ONE)
                    .virtualHost(RabbitaMqConstant.VIRTUALHOST_ONE)
                    .queueName(RabbitaMqConstant.QUEUE_ONE_NAME).build());

            put(RabbitaMqConstant.QUEUE_TWO_KEY, RabbitProperties.builder()
                    .host(RabbitaMqConstant.HOST_TWO)
                    .port(RabbitaMqConstant.PORT_TWO)
                    .username(RabbitaMqConstant.USERNAME_TWO)
                    .password(RabbitaMqConstant.PASSWORD_TWO)
                    .virtualHost(RabbitaMqConstant.VIRTUALHOST_TWO)
                    .queueName(RabbitaMqConstant.QUEUE_TWO_NAME).build());

            //如果需要配置更多, 参考上面
        }
    };
}

rabbitMQ配置类

这个类的作用是:利用spring的工厂类在在容器的初始化不同服务器的Rabbit注入到容器

@Configuration
@RequiredArgsConstructor
@Slf4j
public class MultiRabbitMqConfig {

    private final DefaultListableBeanFactory defaultListableBeanFactory;


    /**
     * 初始化消息
     */
    @PostConstruct
        public void initRabbitmq() {
        RabbitProperties.MULTI_MQPROPERTIES_MAP.forEach((key, rabbitProperties) -> {

            AbstractBeanDefinition beanDefinition = BeanDefinitionBuilder.genericBeanDefinition(CachingConnectionFactory.class)
                    .addPropertyValue("cacheMode", CachingConnectionFactory.CacheMode.CHANNEL)
                    .addPropertyValue("host", rabbitProperties.getHost())
                    .addPropertyValue("port", rabbitProperties.getPort())
                    .addPropertyValue("username", rabbitProperties.getUsername())
                    .addPropertyValue("password", rabbitProperties.getPassword())
                    .addPropertyValue("virtualHost", rabbitProperties.getVirtualHost())
                    .getBeanDefinition();
            String connectionFactoryName = String.format("%s%s", key, "ConnectionFactory");
            //将连接工程注入容器
            defaultListableBeanFactory.registerBeanDefinition(connectionFactoryName, beanDefinition);
            CachingConnectionFactory connectionFactory = defaultListableBeanFactory.getBean(connectionFactoryName, CachingConnectionFactory.class);

            String rabbitAdminName = String.format("%s%s", key, "RabbitAdmin");
            AbstractBeanDefinition rabbitAdminBeanDefinition = BeanDefinitionBuilder.genericBeanDefinition(RabbitAdmin.class)
                    .addConstructorArgValue(connectionFactory)
                    .addPropertyValue("autoStartup", true)
                    .getBeanDefinition();
            //
            defaultListableBeanFactory.registerBeanDefinition(rabbitAdminName, rabbitAdminBeanDefinition);
            RabbitAdmin rabbitAdmin = defaultListableBeanFactory.getBean(rabbitAdminName, RabbitAdmin.class);
            log.info("rabbitAdmin:[{}]", rabbitAdmin);
            Queue queue = new Queue(rabbitProperties.getQueueName());
            rabbitAdmin.declareQueue(queue);
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            //设置消息确认
            connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData,
                                    boolean ack, String cause) {
                    if (ack) {// 如果发送交换机成功,但是没有匹配路由到指定的队列, 这个时候ack返回是true(这是一个坑)
                        log.info("====>ack success connection:[{}]", cause);
                    } else {
                        // 失败
                        log.info("====>message error success connection:[{}]", cause);
                    }
                }
            });
            defaultListableBeanFactory.registerSingleton(String.format("%s%s", key, "RabbitTemplate"), rabbitTemplate);

            SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
            // 设置监听的队列
            simpleMessageListenerContainer.setQueueNames(rabbitProperties.getQueueName());
            // 指定要创建的并发使用者的数量,默认值是1,当并发高时可以增加这个的数值,同时下方max的数值也要增加
            simpleMessageListenerContainer.setConcurrentConsumers(10);
            // 最大的并发消费者
            simpleMessageListenerContainer.setMaxConcurrentConsumers(10);
            // 设置是否重回队列
            simpleMessageListenerContainer.setDefaultRequeueRejected(false);
            // 设置签收模式
            simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            // 设置非独占模式
            simpleMessageListenerContainer.setExclusive(false);
            // 设置consumer未被 ack 的消息个数
            simpleMessageListenerContainer.setPrefetchCount(5);
            //消费mq消息的类
            CrossborderReceiptListen crossborderReceiptListen = new CrossborderReceiptListen();
            //onMessage消费mq消息的方法
            crossborderReceiptListen.addQueueOrTagToMethodName(rabbitProperties.getQueueName(),"onMessage");
            simpleMessageListenerContainer.setMessageListener(crossborderReceiptListen);
            defaultListableBeanFactory.registerSingleton(String.format("%s%s", key, "SimpleMessageListenerContainer"), simpleMessageListenerContainer);
        });
    }
}

controller类

@RestController
public class IndexController {

    @Autowired
    private DefaultListableBeanFactory defaultListableBeanFactory;

    @RequestMapping("/send")
    public String send(){
        RabbitTemplate rabbitTemplate = (RabbitTemplate) defaultListableBeanFactory.getBean(RabbitaMqConstant.QUEUE_ONE_KEY+"RabbitTemplate");
        rabbitTemplate.convertAndSend(RabbitaMqConstant.QUEUE_ONE_NAME,"测试queue_one_name");
        return "122432";
    }

    @RequestMapping("/send2")
    public String send2(){
        RabbitTemplate rabbitTemplate = (RabbitTemplate) defaultListableBeanFactory.getBean(RabbitaMqConstant.QUEUE_TWO_KEY+"RabbitTemplate");
        rabbitTemplate.convertAndSend(RabbitaMqConstant.QUEUE_TWO_NAME,"测试QUEUE_TWO_NAME");
        return "122432";
    }
}

监听类

@Service
@Slf4j
public class CrossborderReceiptListen extends MessageListenerAdapter{

    @Override
    public void onMessage(Message message, Channel channel) throws IOException {

        try {
            log.info("============> Thread:[{}] 接收到消息:[{}] ", Thread.currentThread().getName(), new String(message.getBody()));
            log.info("====>connection:[{}]", channel.getConnection());
            String base64Str = new String(Base64.getEncoder().encode(message.getBody()));
            //处理业务逻辑
            log.info("调用baohe 回调接口处理结果res:{}", base64Str);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            log.error("CrossborderReceiptListen onMessage error", e);
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }
    }
}

启动类

注意这里有个坑,因为spring boot自动配置已经加载了RabbitAutoConfiguration类,spring boot也会加载该类,就会与自己手动注册Rabbitmq冲突,因此必须排除该类,如果不排除,就会发生如下错误:

Description:

Parameter 1 of method rabbitTemplate in org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration$RabbitTemplateConfiguration required a single bean, but 3 were found:
    - rabbitConnectionFactory: defined by method 'rabbitConnectionFactory' in class path resource [org/springframework/boot/autoconfigure/amqp/RabbitAutoConfiguration$RabbitConnectionFactoryCreator.class]
    - queueOneKeyConnectionFactory: defined in null
    - queueTwoKeyConnectionFactory: defined in null


Action:

Consider marking one of the beans as @Primary, updating the consumer to accept multiple beans, or using @Qualifier to identify the bean that should be consumed
@SpringBootApplication(exclude = RabbitAutoConfiguration.class)
public class RabbtiMqBootstrap {
    public static void main(String[] args) {
        SpringApplication.run(RabbtiMqBootstrap.class);
    }
}

启动测试

访问http://localhost:8080/send

效果如下

访问http://localhost:8080/send2

效果如下:

 测试成功,哈哈

原文地址:https://www.cnblogs.com/cxyyh/p/14851744.html