spring boot实战(第十二篇)整合RabbitMQ

 

前言

本篇主要讲述Spring Boot与RabbitMQ的整合,内容非常简单,纯API的调用操作。 操作之间需要加入依赖Jar

  1.  
    <dependency>
  2.  
    <groupId>org.springframework.boot</groupId>
  3.  
    <artifactId>spring-boot-starter-amqp</artifactId>
  4.  
    </dependency>

消息生产者

不论是创建消息消费者或生产者都需要ConnectionFactory
 
 

ConnectionFactory配置

创建AmqpConfig文件AmqpConfig.java(后期的配置都在该文件中)
 
  1.  
    @Configuration
  2.  
    public class AmqpConfig {
  3.  
     
  4.  
    public static final String EXCHANGE = "spring-boot-exchange";
  5.  
    public static final String ROUTINGKEY = "spring-boot-routingKey";
  6.  
     
  7.  
    @Bean
  8.  
    public ConnectionFactory connectionFactory() {
  9.  
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  10.  
    connectionFactory.setAddresses("127.0.0.1:5672");
  11.  
    connectionFactory.setUsername("guest");
  12.  
    connectionFactory.setPassword("guest");
  13.  
    connectionFactory.setVirtualHost("/");
  14.  
    connectionFactory.setPublisherConfirms(true); //必须要设置
  15.  
    return connectionFactory;
  16.  
    }
  17.  
    }

这里需要显示调用
 connectionFactory.setPublisherConfirms(true);
才能进行消息的回调。
 
 

RabbitTemplate

通过使用RabbitTemplate来对开发者提供API操作
 
  1.  
    @Bean
  2.  
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
  3.  
    //必须是prototype类型
  4.  
    public RabbitTemplate rabbitTemplate() {
  5.  
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
  6.  
    return template;
  7.  
    }
这里设置为原型,具体的原因在后面会讲到
 
  在发送消息时通过调用RabbitTemplate中的如下方法
public void convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData)
  • exchange:交换机名称
  • routingKey:路由关键字

  • object:发送的消息内容

  • correlationData:消息ID

 
因此生产者代码详单简洁

Send.java

  1.  
    @Component
  2.  
    public class Send {
  3.  
     
  4.  
    private RabbitTemplate rabbitTemplate;
  5.  
     
  6.  
    /**
  7.  
    * 构造方法注入
  8.  
    */
  9.  
    @Autowired
  10.  
    public Send(RabbitTemplate rabbitTemplate) {
  11.  
    this.rabbitTemplate = rabbitTemplate;
  12.  
    }
  13.  
     
  14.  
    public void sendMsg(String content) {
  15.  
    CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
  16.  
    rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE, AmqpConfig.ROUTINGKEY, content, correlationId);
  17.  
    }
  18.  
     
  19.  
     
  20.  
    }

 

如果需要在生产者需要消息发送后的回调,需要对rabbitTemplate设置ConfirmCallback对象,由于不同的生产者需要对应不同的ConfirmCallback,如果rabbitTemplate设置为单例bean,则所有的rabbitTemplate

实际的ConfirmCallback为最后一次申明的ConfirmCallback。

下面给出完整的生产者代码:

 

  1.  
    package com.lkl.springboot.amqp;
  2.  
     
  3.  
    import java.util.UUID;
  4.  
     
  5.  
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6.  
    import org.springframework.amqp.rabbit.support.CorrelationData;
  7.  
    import org.springframework.beans.factory.annotation.Autowired;
  8.  
    import org.springframework.stereotype.Component;
  9.  
     
  10.  
    /**
  11.  
    * 消息生产者
  12.  
    *
  13.  
    * @author liaokailin
  14.  
    * @version $Id: Send.java, v 0.1 2015年11月01日 下午4:22:25 liaokailin Exp $
  15.  
    */
  16.  
    @Component
  17.  
    public class Send implements RabbitTemplate.ConfirmCallback {
  18.  
     
  19.  
    private RabbitTemplate rabbitTemplate;
  20.  
     
  21.  
    /**
  22.  
    * 构造方法注入
  23.  
    */
  24.  
    @Autowired
  25.  
    public Send(RabbitTemplate rabbitTemplate) {
  26.  
    this.rabbitTemplate = rabbitTemplate;
  27.  
    rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容
  28.  
    }
  29.  
     
  30.  
    public void sendMsg(String content) {
  31.  
    CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
  32.  
    rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE, AmqpConfig.ROUTINGKEY, content, correlationId);
  33.  
    }
  34.  
     
  35.  
    /**
  36.  
    * 回调
  37.  
    */
  38.  
    @Override
  39.  
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  40.  
    System.out.println(" 回调id:" + correlationData);
  41.  
    if (ack) {
  42.  
    System.out.println("消息成功消费");
  43.  
    } else {
  44.  
    System.out.println("消息消费失败:" + cause);
  45.  
    }
  46.  
    }
  47.  
     
  48.  
    }

消息消费者

消费者负责申明交换机(生产者也可以申明)、队列、两者的绑定操作。

交换机

  1.  
    /**
  2.  
    * 针对消费者配置
  3.  
    FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
  4.  
    HeadersExchange :通过添加属性key-value匹配
  5.  
    DirectExchange:按照routingkey分发到指定队列
  6.  
    TopicExchange:多关键字匹配
  7.  
    */
  8.  
    @Bean
  9.  
    public DirectExchange defaultExchange() {
  10.  
    return new DirectExchange(EXCHANGE);
  11.  
    }

在Spring Boot中交换机继承AbstractExchange类
 

 

队列

 
  1.  
    @Bean
  2.  
    public Queue queue() {
  3.  
    return new Queue("spring-boot-queue", true); //队列持久
  4.  
     
  5.  
    }

绑定

  1.  
    @Bean
  2.  
    public Binding binding() {
  3.  
    return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);
  4.  
    }
完成以上工作后,在spring boot中通过消息监听容器实现消息的监听,在消息到来时执行回调操作。
 

消息消费

  1.  
    @Bean
  2.  
    public SimpleMessageListenerContainer messageContainer() {
  3.  
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
  4.  
    container.setQueues(queue());
  5.  
    container.setExposeListenerChannel(true);
  6.  
    container.setMaxConcurrentConsumers(1);
  7.  
    container.setConcurrentConsumers(1);
  8.  
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
  9.  
    container.setMessageListener(new ChannelAwareMessageListener() {
  10.  
     
  11.  
    @Override
  12.  
    public void onMessage(Message message, Channel channel) throws Exception {
  13.  
    byte[] body = message.getBody();
  14.  
    System.out.println("receive msg : " + new String(body));
  15.  
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
  16.  
    }
  17.  
    });
  18.  
    return container;
  19.  
    }

下面给出完整的配置文件:
 
  1.  
    package com.lkl.springboot.amqp;
  2.  
     
  3.  
    import org.springframework.amqp.core.AcknowledgeMode;
  4.  
    import org.springframework.amqp.core.Binding;
  5.  
    import org.springframework.amqp.core.BindingBuilder;
  6.  
    import org.springframework.amqp.core.DirectExchange;
  7.  
    import org.springframework.amqp.core.Message;
  8.  
    import org.springframework.amqp.core.Queue;
  9.  
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  10.  
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  11.  
    import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
  12.  
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
  13.  
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
  14.  
    import org.springframework.beans.factory.config.ConfigurableBeanFactory;
  15.  
    import org.springframework.context.annotation.Bean;
  16.  
    import org.springframework.context.annotation.Configuration;
  17.  
    import org.springframework.context.annotation.Scope;
  18.  
     
  19.  
    import com.rabbitmq.client.Channel;
  20.  
     
  21.  
    /**
  22.  
    * Qmqp Rabbitmq
  23.  
    *
  24.  
    * http://docs.spring.io/spring-amqp/docs/1.4.5.RELEASE/reference/html/
  25.  
    *
  26.  
    * @author lkl
  27.  
    * @version $Id: AmqpConfig.java, v 0.1 2015年11月01日 下午2:05:37 lkl Exp $
  28.  
    */
  29.  
     
  30.  
    @Configuration
  31.  
    public class AmqpConfig {
  32.  
     
  33.  
    public static final String EXCHANGE = "spring-boot-exchange";
  34.  
    public static final String ROUTINGKEY = "spring-boot-routingKey";
  35.  
     
  36.  
    @Bean
  37.  
    public ConnectionFactory connectionFactory() {
  38.  
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  39.  
    connectionFactory.setAddresses("127.0.0.1:5672");
  40.  
    connectionFactory.setUsername("guest");
  41.  
    connectionFactory.setPassword("guest");
  42.  
    connectionFactory.setVirtualHost("/");
  43.  
    connectionFactory.setPublisherConfirms(true); //必须要设置
  44.  
    return connectionFactory;
  45.  
    }
  46.  
     
  47.  
    @Bean
  48.  
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
  49.  
    //必须是prototype类型
  50.  
    public RabbitTemplate rabbitTemplate() {
  51.  
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
  52.  
    return template;
  53.  
    }
  54.  
     
  55.  
    /**
  56.  
    * 针对消费者配置
  57.  
    * 1. 设置交换机类型
  58.  
    * 2. 将队列绑定到交换机
  59.  
    *
  60.  
    *
  61.  
    FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
  62.  
    HeadersExchange :通过添加属性key-value匹配
  63.  
    DirectExchange:按照routingkey分发到指定队列
  64.  
    TopicExchange:多关键字匹配
  65.  
    */
  66.  
    @Bean
  67.  
    public DirectExchange defaultExchange() {
  68.  
    return new DirectExchange(EXCHANGE);
  69.  
    }
  70.  
     
  71.  
    @Bean
  72.  
    public Queue queue() {
  73.  
    return new Queue("spring-boot-queue", true); //队列持久
  74.  
     
  75.  
    }
  76.  
     
  77.  
    @Bean
  78.  
    public Binding binding() {
  79.  
    return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);
  80.  
    }
  81.  
     
  82.  
    @Bean
  83.  
    public SimpleMessageListenerContainer messageContainer() {
  84.  
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
  85.  
    container.setQueues(queue());
  86.  
    container.setExposeListenerChannel(true);
  87.  
    container.setMaxConcurrentConsumers(1);
  88.  
    container.setConcurrentConsumers(1);
  89.  
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
  90.  
    container.setMessageListener(new ChannelAwareMessageListener() {
  91.  
     
  92.  
    @Override
  93.  
    public void onMessage(Message message, Channel channel) throws Exception {
  94.  
    byte[] body = message.getBody();
  95.  
    System.out.println("receive msg : " + new String(body));
  96.  
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
  97.  
    }
  98.  
    });
  99.  
    return container;
  100.  
    }
  101.  
     
  102.  
    }


以上完成 Spring Boot与RabbitMQ的整合 
 
 

自动配置

在Spring Boot中实现了RabbitMQ的自动配置,在配置文件中添加如下配置信息
  1.  
    spring.rabbitmq.host=localhost
  2.  
    spring.rabbitmq.port=5672
  3.  
    spring.rabbitmq.username=test
  4.  
    spring.rabbitmq.password=test
  5.  
    spring.rabbitmq.virtualHost=test

后会自动创建ConnectionFactory以及RabbitTemplate对应Bean,为什么上面我们还需要手动什么呢?
 
自动创建的ConnectionFactory无法完成事件的回调,即没有设置下面的代码
connectionFactory.setPublisherConfirms(true);
 
具体分析见后续文章的源码解读.
 
转载请注明 
http://blog.csdn.net/liaokailin/article/details/48186331
原文地址:https://www.cnblogs.com/pejsidney/p/9909522.html