RabbitMQ整合Spring

之前我们使用 RabbitMQ 原生的 API 方法来实现MQ的使用,Spring 也提供了 RabbitMQ 的集成,让我们更方便的使用MQ,让我们来学习下吧。

Spring AMQP 是基于 Spring 框架的 AMQP 消息解决方案,提供模板化的发送和接收消息的抽象层,提供基于消息驱动的 POJO 的消息监听等,很大方便我们使用 RabbitMQ 程序的相关开发。

一、RabbitAdmin 管理组件

1.1 准备工作:

  1. 添加 Spring AMQP 依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.3.1.RELEASE</version>
</dependency>
  1. 声明 Bean 对象
@Configuration
public class RabbitMQConfig {
    /**
     * 注入连接工厂对象
     * @return
     */
    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost("111.231.83.100");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        return connectionFactory;
    }
    
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        // 必须显式设置为 True ,否则 Spring 容器不会加载
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
}

1.2 Exchange 操作

相关方法:

方法定义 作用
void declareExchange(Exchange exchange) 声明交换机
boolean deleteExchange(String exchange) 删除交换机

添加交换机

@SpringBootTest
public class ExchangeAddTest {

    @Autowired
    private RabbitAdmin rabbitAdmin;

    @Test
    public void shouldAddExchangeSuccess() {
        rabbitAdmin.declareExchange(new DirectExchange("admin.direct", true, false));
        rabbitAdmin.declareExchange(new TopicExchange("admin.topic", false, true));
        rabbitAdmin.declareExchange(new FanoutExchange("admin.fanout", false, false));
    }
}    

删除交换机

@SpringBootTest
public class ExchangeAddTest {

    @Autowired
    private RabbitAdmin rabbitAdmin;
    
    @Test
    public void shouldDeleteExchangeSuccess() {
        boolean result = rabbitAdmin.deleteExchange("admin.direct");
        Assert.assertTrue(result);
    }
}    

1.3 Queue 操作

方法定义 作用
Queue declareQueue() 声明默认队列
String declareQueue(Queue queue) 申明给定的队列
boolean deleteQueue(String queueName) 删除队列
void deleteQueue(String queueName, boolean unused, boolean empty) 删除队列
void purgeQueue(String queueName, boolean noWait) 清除队列信息,noWait = true 时异步执行
int purgeQueue(String queueName) 清除队列信息
Properties getQueueProperties(String queueName) 获取指定队列的属性

声明队列

@SpringBootTest
public class QueueTest {

    @Autowired
    private RabbitAdmin rabbitAdmin;

    @Test
    public void shouldAddQueueSuccess() {
        // 创建默认队列
        Queue defaultQueue = rabbitAdmin.declareQueue();
        Assert.assertNotNull(defaultQueue);
        Assert.assertEquals(false,defaultQueue.isDurable());

        // 创建指定名称和是否持久化属性的队列
        String  queueName =  rabbitAdmin.declareQueue(new Queue("orderQueue",true));
        Assert.assertNotNull(queueName);
        Assert.assertEquals("orderQueue",queueName);
    }
}    

注: 默认的队列因为设置 exclusive = true ,导致在其连接断开的时候自动删除,所以图中看不到。

删除队列

@SpringBootTest
public class QueueTest {

    @Autowired
    private RabbitAdmin rabbitAdmin;
   
    @Test
    public void shouldDeleteQueueSuccess() {
        boolean result = rabbitAdmin.deleteQueue("orderQueue");
        Assert.assertTrue(result);
    }   
}    

1.4 Binding 绑定

方法定义 作用
void declareBinding(Binding binding) 声明队列与交换机的绑定
void removeBinding(Binding binding) 删除队列与交换机的绑定

声明队列与交换机的绑定

@SpringBootTest
public class BindingTest {

    @Autowired
    private RabbitAdmin rabbitAdmin;

    @Test
    public void shouldBindingSuccess() {
        // 交换机名称
        String exchange = "admin.topic";
        // 队列名称
        String queueName = "orderQueue";
        // 1.创建绑定关系对象
        Binding binding =
                BindingBuilder
                        // 创建队列
                        .bind(new Queue(queueName, true))
                        // 创建交换机
                        .to(new TopicExchange(exchange, true, false))
                        // 指定路由 Key
                        .with("order#");
        // 2.进行绑定
        rabbitAdmin.declareBinding(binding);
    }

}

删除队列与交换机的绑定

@SpringBootTest
public class BindingTest {

    @Autowired
    private RabbitAdmin rabbitAdmin;
	
    @Test
    public void shouldUnBindingSuccess() {
        // 交换机名称
        String exchange = "admin.topic";
        // 队列名称
        String queueName = "orderQueue";
        Binding binding =
                new Binding(queueName, Binding.DestinationType.QUEUE, exchange, "order#", null);
        rabbitAdmin.removeBinding(binding);
    }
}    

1.5 bean 注入

除了上面的通过代码显式申明交换机、队列、路由 之外,还可以通过 Bena 注入的形式申明。

@Configuration
public class RabbitMQConfig {

    /**
     * 注入连接工厂对象
     *
     * @return
     */
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost("111.231.83.100");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        return connectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        // 必须显式设置为 True ,否则 Spring 容器不会加载
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }

    @Bean
    public TopicExchange beanExchange() {
        return new TopicExchange("beanExchange", true, false);
    }

    @Bean
    public Queue beanQueue() {
        return new Queue("beanQueue", true);
    }

    @Bean
    public Binding beanBinding(TopicExchange beanExchange, Queue beanQueue) {
        return BindingBuilder
                // 创建队列
                .bind(beanQueue)
                // 创建交换机
                .to(beanExchange)
                // 指定路由 Key
                .with("bean#");
    }
}
@SpringBootTest
public class BeanInjectionBindingTest {

    @Autowired
    private RabbitAdmin rabbitAdmin;
    @Autowired
    private Binding beanBinding;

    @Test
    public void shouldBindingSuccess() {
        rabbitAdmin.declareBinding(beanBinding);
    }

}

二、RabbitTemplate 模板组件

如果你看过 RabbitAdmin 的源码,可以看到里面使用到了一个叫做 RabbitTemplate 的对象,它就是 Spring 提供的消息模板,封装了 RabbitMQ 核心 API 的一系列方法,而 RabbitAdmin 是在它之上的另一层封装。

2.1 常用方法

方法定义 作用
void send(Message message) 发送消息
void convertAndSend(Object object) 将 Java 对象包装成 Message 对象并发送 ,Java 对象需要实现 Serializable 序列化接口
Message receive(String queueName) 接收消息
receiveAndConvert(String queueName) 接收消息并将 Message 转换成 Java 对

2.2 发送消息

@SpringBootTest
public class MessageTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void shouldSendMessageSuccess(){
        // 创建消息属性对象
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.getHeaders().put("desc", "信息描述..");
        messageProperties.getHeaders().put("type", "自定义消息类型..");
        // 创建消息对象
        Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
        // 发送消息
        rabbitTemplate.send("beanExchange","bean#",message);

        // 发送消息时额外增加属性
        Message newMessage = new Message("newMessage".getBytes(), messageProperties);
        rabbitTemplate.convertAndSend("beanExchange", "bean#", newMessage, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
                message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");
                return message;
            }
        });
    }
}    

2.3 手动接收消息

@SpringBootTest
public class MessageTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;  
	
    @Test
    public void shouldConsumeMessageSuccess() {
        Message msg = rabbitTemplate.receive("beanQueue", 2000l);
        System.out.println("消息内容:" + new String(msg.getBody()));
        final Map<String, Object> headers = msg.getMessageProperties().getHeaders();
        System.out.println("=======消息头属性=======");
        for (String key : headers.keySet()) {
            System.out.println("key =" + key + " ; value =" + headers.get(key));
        }
    }
}    

执行方法,观察控制台输出:

消息内容:Hello RabbitMQ
=======消息头属性=======
key =type ; value =自定义消息类型..
key =desc ; value =信息描述..

再次执行方法,观察控制台输出:

消息内容:newMessage
=======消息头属性=======
key =type ; value =自定义消息类型..
key =attr ; value =额外新加的属性
key =desc ; value =额外修改的信息描述

2.4 消息监听容器

在实际项目中我们不可能采用手动接收消息的形式来消费消息,这个时候 Spring 就为我们提供了一个消息监听容器 SimpleMessageListenerContainer

它的功能如下:

* 监听多个队列
* 设置消费者消费数量
* 设置消息确认和自动确认模式
* 是否重回队列
* 异常捕获 handel 函数
* 设置消费者属性
* 设置具体的监听器和消息转换器

SimpleMessageListenerContainer 可以在运行过程中动态修改属性,如修改消费者消费数量大小、接收消息的模式等

@Configuration
public class RabbitMQConfig {
    ......
	@Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory,Queue beanQueue) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        // 设置监听队列,可以有多个
        container.setQueues(beanQueue);
        // 设置并发消费者数量
        container.setConcurrentConsumers(1);
        // 设置最大并发消费者数量
        container.setMaxConcurrentConsumers(5);
        // 设置是否重回队列
        container.setDefaultRequeueRejected(false);
        // 设置签收模式,这里为了演示使用自动签收,实际项目中需要使用手动签收 AcknowledgeMode.MANUAL
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        // 设置消费端标签策略
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_" + UUID.randomUUID().toString();
            }
        });
        // 设置消息监听
        container.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                String msg = new String(message.getBody());
                System.err.println("消费端监听:" + msg);
            }
        });
        return container;
    }
}

启动应用后,执行发送消息测试方法 shouldSendMessageSuccess ,观察控制台输出:

消费端监听:Hello RabbitMQ

证明消费端监听并消费成功。

2.5 消息监听适配器

除了直接使用 ChannelAwareMessageListener 实现消息事件监听外,还可以通过消息监听适配器(MessageListenerAdapter),通过反射将消息处理委托给目标监听器的处理方法,并进行灵活的消息类型转换。允许监听器方法对消息内容类型进行操作,完全独立于 Rabbit API

实际上就是相当于自己实现 ChannelAwareMessageListener 功能。

  1. 新建 MessageDelegate
public class MessageDelegate {
    public void consumeMessage(byte[] messageBody) {
        System.err.println("默认方法, 消息内容:" + new String(messageBody));
    }
}
  1. 替换原 ChannelAwareMessageListener 事件
//   container.setMessageListener(new ChannelAwareMessageListener() {
//       @Override
//       public void onMessage(Message message, Channel channel) throws Exception {
//           String msg = new String(message.getBody());
//           System.err.println("消费端监听:" + msg);
//       }
//   });
// 设置消息监听
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
// adapter 默认执行方法是 handleMessage,这里我们设置自定义方法名
adapter.setDefaultListenerMethod("consumeMessage");
container.setMessageListener(adapter);
  1. 启动应用后,执行发送消息测试方法 shouldSendMessageSuccess ,观察控制台输出:
默认方法, 消息内容:Hello RabbitMQ

证明消费端监听并消费成功。

我们还可以将队列名和方法做绑定,实现转发功能:

MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
// adapter 默认执行方法是 handleMessage,这里我们设置自定义方法名
// adapter.setDefaultListenerMethod("consumeMessage");
Map<String, String> queueOrTagToMethodName = new HashMap<>();
queueOrTagToMethodName.put(beanQueue.getName(), "consumeMessage");
adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
container.setMessageListener(adapter);    

2.6 消息转换器

我们现在发送和接受消息的类型都是二进制形式传输,我们可以通过 MessageConverter 进行转换。

  1. 新建 TextMessageConverter
public class TextMessageConverter implements MessageConverter {
    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        return new Message(object.toString().getBytes(), messageProperties);
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        String contentType = message.getMessageProperties().getContentType();
        if (null != contentType && contentType.contains("text")) {
            return new String(message.getBody());
        }
        return message.getBody();
    }
}
  1. adapter 设置转换类
adapter.setMessageConverter(new TextMessageConverter());
  1. MessageDelegate 新增 字符串参数的方法
public class MessageDelegate {
	......
    public void consumeMessage(String messageBody){
        System.err.println("字符串类型, 消息内容:" + new String(messageBody));
    }
}
  1. 新增发送文本消息测试方法
@Test
public void shouldSendTextMessageSuccess() {
    // 创建消息属性对象
    MessageProperties messageProperties = new MessageProperties();
    // 通过设置属性,让消费端知道要将消息内容转换成文本类型
    messageProperties.setContentType("text");
    // 创建消息对象
    Message message = new Message("字符串消息".getBytes(), messageProperties);
    // 发送消息
    rabbitTemplate.send("beanExchange", "bean#", message);
}

启动应用后,执行发送消息测试方法,观察控制台输出:

字符串类型, 消息内容:字符串消息
原文地址:https://www.cnblogs.com/markLogZhu/p/13273809.html