30. Spring Boot ActiveMQ

1.引入依赖

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. 配置文件设置

spring.application.name=springboot-rabbitmq
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.virtual-host=/

3. 配置类

package org.springframework.boot.autoconfigure.amqp;
import 。。。。
@Configuration
@ConditionalOnClass({RabbitTemplate.class, Channel.class})
@EnableConfigurationProperties({RabbitProperties.class})
@Import({RabbitAnnotationDrivenConfiguration.class})
public class RabbitAutoConfiguration {
    public RabbitAutoConfiguration() {
    }

    @Configuration
    @ConditionalOnClass({RabbitMessagingTemplate.class})
    @ConditionalOnMissingBean({RabbitMessagingTemplate.class})
    @Import({RabbitAutoConfiguration.RabbitTemplateConfiguration.class})
    protected static class MessagingTemplateConfiguration {
        protected MessagingTemplateConfiguration() {
        }

        @Bean
        @ConditionalOnSingleCandidate(RabbitTemplate.class)
        public RabbitMessagingTemplate rabbitMessagingTemplate(RabbitTemplate rabbitTemplate) {
            return new RabbitMessagingTemplate(rabbitTemplate);
        }
    }

    @Configuration
    @Import({RabbitAutoConfiguration.RabbitConnectionFactoryCreator.class})
    protected static class RabbitTemplateConfiguration {
        private final RabbitProperties properties;
        private final ObjectProvider<MessageConverter> messageConverter;
        private final ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers;

        public RabbitTemplateConfiguration(RabbitProperties properties, ObjectProvider<MessageConverter> messageConverter, ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers) {
            this.properties = properties;
            this.messageConverter = messageConverter;
            this.retryTemplateCustomizers = retryTemplateCustomizers;
        }

        @Bean
        @ConditionalOnSingleCandidate(ConnectionFactory.class)
        @ConditionalOnMissingBean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            PropertyMapper map = PropertyMapper.get();
            RabbitTemplate template = new RabbitTemplate(connectionFactory);
            MessageConverter messageConverter = (MessageConverter)this.messageConverter.getIfUnique();
            if (messageConverter != null) {
                template.setMessageConverter(messageConverter);
            }

            template.setMandatory(this.determineMandatoryFlag());
            Template properties = this.properties.getTemplate();
            if (properties.getRetry().isEnabled()) {
                template.setRetryTemplate((new RetryTemplateFactory((List)this.retryTemplateCustomizers.orderedStream().collect(Collectors.toList()))).createRetryTemplate(properties.getRetry(), Target.SENDER));
            }

            properties.getClass();
            map.from(properties::getReceiveTimeout).whenNonNull().as(Duration::toMillis).to(template::setReceiveTimeout);
            properties.getClass();
            map.from(properties::getReplyTimeout).whenNonNull().as(Duration::toMillis).to(template::setReplyTimeout);
            properties.getClass();
            map.from(properties::getExchange).to(template::setExchange);
            properties.getClass();
            map.from(properties::getRoutingKey).to(template::setRoutingKey);
            properties.getClass();
            map.from(properties::getDefaultReceiveQueue).whenNonNull().to(template::setDefaultReceiveQueue);
            return template;
        }

        private boolean determineMandatoryFlag() {
            Boolean mandatory = this.properties.getTemplate().getMandatory();
            return mandatory != null ? mandatory : this.properties.isPublisherReturns();
        }

        @Bean
        @ConditionalOnSingleCandidate(ConnectionFactory.class)
        @ConditionalOnProperty(
            prefix = "spring.rabbitmq",
            name = {"dynamic"},
            matchIfMissing = true
        )
        @ConditionalOnMissingBean
        public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
            return new RabbitAdmin(connectionFactory);
        }
    }

    @Configuration
    @ConditionalOnMissingBean({ConnectionFactory.class})
    protected static class RabbitConnectionFactoryCreator {
        protected RabbitConnectionFactoryCreator() {
        }

        @Bean
        public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties properties, ObjectProvider<ConnectionNameStrategy> connectionNameStrategy) throws Exception {
            PropertyMapper map = PropertyMapper.get();
            CachingConnectionFactory factory = new CachingConnectionFactory((com.rabbitmq.client.ConnectionFactory)this.getRabbitConnectionFactoryBean(properties).getObject());
            properties.getClass();
            map.from(properties::determineAddresses).to(factory::setAddresses);
            properties.getClass();
            map.from(properties::isPublisherConfirms).to(factory::setPublisherConfirms);
            properties.getClass();
            map.from(properties::isPublisherReturns).to(factory::setPublisherReturns);
            org.springframework.boot.autoconfigure.amqp.RabbitProperties.Cache.Channel channel = properties.getCache().getChannel();
            channel.getClass();
            map.from(channel::getSize).whenNonNull().to(factory::setChannelCacheSize);
            channel.getClass();
            map.from(channel::getCheckoutTimeout).whenNonNull().as(Duration::toMillis).to(factory::setChannelCheckoutTimeout);
            Connection connection = properties.getCache().getConnection();
            connection.getClass();
            map.from(connection::getMode).whenNonNull().to(factory::setCacheMode);
            connection.getClass();
            map.from(connection::getSize).whenNonNull().to(factory::setConnectionCacheSize);
            connectionNameStrategy.getClass();
            map.from(connectionNameStrategy::getIfUnique).whenNonNull().to(factory::setConnectionNameStrategy);
            return factory;
        }

        private RabbitConnectionFactoryBean getRabbitConnectionFactoryBean(RabbitProperties properties) throws Exception {
            PropertyMapper map = PropertyMapper.get();
            RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean();
            properties.getClass();
            map.from(properties::determineHost).whenNonNull().to(factory::setHost);
            properties.getClass();
            map.from(properties::determinePort).to(factory::setPort);
            properties.getClass();
            map.from(properties::determineUsername).whenNonNull().to(factory::setUsername);
            properties.getClass();
            map.from(properties::determinePassword).whenNonNull().to(factory::setPassword);
            properties.getClass();
            map.from(properties::determineVirtualHost).whenNonNull().to(factory::setVirtualHost);
            properties.getClass();
            map.from(properties::getRequestedHeartbeat).whenNonNull().asInt(Duration::getSeconds).to(factory::setRequestedHeartbeat);
            Ssl ssl = properties.getSsl();
            if (ssl.isEnabled()) {
                factory.setUseSSL(true);
                ssl.getClass();
                map.from(ssl::getAlgorithm).whenNonNull().to(factory::setSslAlgorithm);
                ssl.getClass();
                map.from(ssl::getKeyStoreType).to(factory::setKeyStoreType);
                ssl.getClass();
                map.from(ssl::getKeyStore).to(factory::setKeyStore);
                ssl.getClass();
                map.from(ssl::getKeyStorePassword).to(factory::setKeyStorePassphrase);
                ssl.getClass();
                map.from(ssl::getTrustStoreType).to(factory::setTrustStoreType);
                ssl.getClass();
                map.from(ssl::getTrustStore).to(factory::setTrustStore);
                ssl.getClass();
                map.from(ssl::getTrustStorePassword).to(factory::setTrustStorePassphrase);
                ssl.getClass();
                map.from(ssl::isValidateServerCertificate).to((validate) -> {
                    factory.setSkipServerCertificateValidation(!validate);
                });
                ssl.getClass();
                map.from(ssl::getVerifyHostname).to(factory::setEnableHostnameVerification);
            }

            properties.getClass();
            map.from(properties::getConnectionTimeout).whenNonNull().asInt(Duration::toMillis).to(factory::setConnectionTimeout);
            factory.afterPropertiesSet();
            return factory;
        }
    }
}

默认配置

@ConfigurationProperties(
    prefix = "spring.rabbitmq"
)
public class RabbitProperties {
    private String host = "localhost";
    private int port = 5672;
    private String username = "guest";
    private String password = "guest";
    private final RabbitProperties.Ssl ssl = new RabbitProperties.Ssl();
    private String virtualHost;
    private String addresses;
    @DurationUnit(ChronoUnit.SECONDS)
    private Duration requestedHeartbeat;
    private boolean publisherConfirms;
    private boolean publisherReturns;
    private Duration connectionTimeout;
    private final RabbitProperties.Cache cache = new RabbitProperties.Cache();
    private final RabbitProperties.Listener listener = new RabbitProperties.Listener();
    private final RabbitProperties.Template template = new RabbitProperties.Template();
    private List<RabbitProperties.Address> parsedAddresses;

SpringBoot 启动类

/**
 * 自动配置
 *  1、RabbitAutoConfiguration
 *  2、有自动配置了连接工厂ConnectionFactory;
 *  3、RabbitProperties 封装了 RabbitMQ的配置
 *  4、 RabbitTemplate :给RabbitMQ发送和接受消息;
 *  5、 AmqpAdmin : RabbitMQ系统管理功能组件;
 *      AmqpAdmin:创建和删除 Queue,Exchange,Binding
 *  6、@EnableRabbit +  @RabbitListener 监听消息队列的内容
 *
 */
@EnableRabbit  //开启基于注解的RabbitMQ模式
@SpringBootApplication
public class Springboot02AmqpApplication {

    public static void main(String[] args) {
        SpringApplication.run(Springboot02AmqpApplication.class, args);
    }
}

使序列化到MQ的对象消息能够以JSON的形式展现而不是乱码

@Configuration
public class MyAMQPConfig {

    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

测试类

@RunWith(SpringRunner.class)
@SpringBootTest
public class Springboot02AmqpApplicationTests {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Autowired
    AmqpAdmin amqpAdmin;

    @Test
    public void createExchange(){

//        amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange"));
//        System.out.println("创建完成");

//        amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true));
        //创建绑定规则

//        amqpAdmin.declareBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE,"amqpadmin.exchange","amqp.haha",null));

        //amqpAdmin.de
    }

    /**
     * 1、单播(点对点)
     */
    @Test
    public void contextLoads() {
        //Message需要自己构造一个;定义消息体内容和消息头
        //rabbitTemplate.send(exchage,routeKey,message);

        //object默认当成消息体,只需要传入要发送的对象,自动序列化发送给rabbitmq;
        //rabbitTemplate.convertAndSend(exchage,routeKey,object);
        Map<String,Object> map = new HashMap<>();
        map.put("msg","这是第一个消息");
        map.put("data", Arrays.asList("helloworld",123,true));
        //对象被默认序列化以后发送出去
        rabbitTemplate.convertAndSend("exchange.direct","atguigu.news",new Book("西游记","吴承恩"));

    }

    //接受数据,如何将数据自动的转为json发送出去
    @Test
    public void receive(){
        Object o = rabbitTemplate.receiveAndConvert("atguigu.news");
        System.out.println(o.getClass());
        System.out.println(o);
    }

    /**
     * 广播
     */
    @Test
    public void sendMsg(){
        rabbitTemplate.convertAndSend("exchange.fanout","",new Book("红楼梦","曹雪芹"));
    }

}

  

相关文章:https://www.cnblogs.com/boshen-hzb/p/6841982.html

原文地址:https://www.cnblogs.com/guchunchao/p/10333434.html