Springboot整合RabbitMQ

1.pom修改

引入如下依赖:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.修改application.properties 文件

spring.rabbitmq.host=192.168.99.100
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

查看rabbitmq的配置如下:(还可以指定vhost等参数)

@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitProperties {

    /**
     * RabbitMQ host.
     */
    private String host = "localhost";

    /**
     * RabbitMQ port.
     */
    private int port = 5672;

    /**
     * Login user to authenticate to the broker.
     */
    private String username;

    /**
     * Login to authenticate against the broker.
     */
    private String password;

    /**
     * SSL configuration.
     */
    private final Ssl ssl = new Ssl();

    /**
     * Virtual host to use when connecting to the broker.
     */
    private String virtualHost;

    /**
     * Comma-separated list of addresses to which the client should connect.
     */
    private String addresses;

    /**
     * Requested heartbeat timeout, in seconds; zero for none.
     */
    private Integer requestedHeartbeat;

    /**
     * Enable publisher confirms.
     */
    private boolean publisherConfirms;

    /**
     * Enable publisher returns.
     */
    private boolean publisherReturns;

    /**
     * Connection timeout, in milliseconds; zero for infinite.
     */
    private Integer connectionTimeout;

    /**
     * Cache configuration.
     */
    private final Cache cache = new Cache();

    /**
     * Listener container configuration.
     */
    private final Listener listener = new Listener();

    private final Template template = new Template();

    private List<Address> parsedAddresses;

        ...
}

3.direct类型的消息的发送和接收

1. 消息发送

1. 增加配置类,声明交换机和队列、以及将队列和交换机绑定

package cn.qlq.rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    /**
     * 配置一个directExchange
     * 
     * @return
     */
    @Bean
    public DirectExchange directExchange() {return new DirectExchange("bootDirectExchange");
    }

    /**
     * 配置一个队列
     * 
     * @return
     */
    @Bean
    public Queue directQueue() {
        return new Queue("bootDirectQueue");
    }

    /**
     * 建立一个绑定:队列和交换机绑定
     * 
     * @param directExchange
     *            交换机,上面的bean,用于自动注入
     * @param directQueue
     *            队列,上面的队列,自动注入
     * @return
     */
    @Bean
    public Binding directBinding(DirectExchange directExchange, Queue directQueue) {return BindingBuilder.bind(directQueue).to(directExchange).with("bootDirectRoutingKey");
    }

}

2.生产者:

接口

package cn.qlq.rabbitmq;

public interface MessageService {

    void sendDirectMsg(String msg);

}

实现类:

package cn.qlq.rabbitmq;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MessageServiceImpl implements MessageService {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Override
    public void sendDirectMsg(String msg) {
        // 第一种发送方式
        // MessageProperties property = null;
        // Message message = new Message(msg.getBytes(), property );
        // amqpTemplate.send(message );

        // 第二种:
        amqpTemplate.convertSendAndReceive("bootDirectExchange", "bootDirectRoutingKey", msg);
    }

}

3. 测试类

package rabbitmq;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import cn.qlq.MySpringBootApplication;
import cn.qlq.rabbitmq.MessageService;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = MySpringBootApplication.class)
public class MQTest {

    @Autowired
    private MessageService messageService;

    @Test
    public void sendDirectMsgTest() {
        messageService.sendDirectMsg("bootDirect msg===123456");
    }

}

4.测试:

测试后可以到rabbitmq查看队列中有一条消息

 2.消息接收

消息接收有两种方式:

方式一:

    @Override
    public void receiveDirectMsg() {
        String msg = (String) amqpTemplate.receiveAndConvert("bootDirectQueue");
        System.out.println("接收到d消息: " + msg);
    }

这种方式只能接收到一次消息,无法持续性的接收消息。

方式二:使用监听器的方式持续性接收消息

    /**
     * 监听器接收消息。不需要手动调用,springboot会自动监听
     */
    @RabbitListener(queues = { "bootDirectQueue" })
    @Override
    public void receiveDirectMsg(String msg) {
        System.out.println("监听器接收到的消息: " + msg);
    }

  这种方式会持续性监听,并且监听完会删除消息,自动应答。查看@RabbitListener注解的源码如下,

/** <a href="http://www.cpupk.com/decompiler">Eclipse Class Decompiler</a> plugin, Copyright (c) 2017 Chen Chao. **/
package org.springframework.amqp.rabbit.annotation;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Repeatable;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListeners;
import org.springframework.messaging.handler.annotation.MessageMapping;

@Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@MessageMapping
@Documented
@Repeatable(RabbitListeners.class)
public @interface RabbitListener {
    String id() default "";

    String containerFactory() default "";

    String[] queues() default {};

    Queue[] queuesToDeclare() default {};

    boolean exclusive() default false;

    String priority() default "";

    String admin() default "";

    QueueBinding[] bindings() default {};

    String group() default "";

    String returnExceptions() default "";

    String errorHandler() default "";

    String concurrency() default "";

    String autoStartup() default "";

    String executor() default "";

    String ackMode() default "";

    String replyPostProcessor() default "";
}

测试:使用@RabbitListeners注解可以监听多个队列,指定队列的应答方式为手动应答

接口:

package cn.qlq.rabbitmq;

import org.springframework.amqp.core.Message;

import com.rabbitmq.client.Channel;

public interface MessageService {

    void receiveDirectMsg(Message message, Channel channel);
}

实现类:

package cn.qlq.rabbitmq;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.annotation.RabbitListeners;
import org.springframework.stereotype.Service;

import com.rabbitmq.client.Channel;

import lombok.extern.slf4j.Slf4j;

@Service
@Slf4j
public class MessageServiceImpl implements MessageService {

    /**
     * 监听器接收消息。不需要手动调用,springboot会自动监听。设置应答模式为手动
     */
    @RabbitListeners({ @RabbitListener(queues = { "bootDirectQueue" }, ackMode = "MANUAL") })
    public void receiveDirectMsg(Message message, Channel channel) {
        try {
            log.info("basicReject, 监听器接收到的消息: " + new String(message.getBody()));
            // 当消费者把消息消费成功,再手动应答RabbitMQ
            // channel.basicAck(message.getMessageProperties().getDeliveryTag(),
            // false);
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

查看日志: 一直接收消息,而且队列不会删除消息。

2020-11-07 21:05:20.625  INFO 22044 --- [ntContainer#0-1] cn.qlq.rabbitmq.MessageServiceImpl       : basicReject, 监听器接收到的消息: bootDirect msg===123456
2020-11-07 21:05:20.626  INFO 22044 --- [ntContainer#0-1] cn.qlq.rabbitmq.MessageServiceImpl       : basicReject, 监听器接收到的消息: bootDirect msg===123456
2020-11-07 21:05:20.629  INFO 22044 --- [ntContainer#0-1] cn.qlq.rabbitmq.MessageServiceImpl       : basicReject, 监听器接收到的消息: bootDirect msg===123456
2020-11-07 21:05:20.632  INFO 22044 --- [ntContainer#0-1] cn.qlq.rabbitmq.MessageServiceImpl       : basicReject, 监听器接收到的消息: bootDirect msg===123456
2020-11-07 21:05:20.634  INFO 22044 --- [ntContainer#0-1] cn.qlq.rabbitmq.MessageServiceImpl       : basicReject, 监听器接收到的消息: bootDirect msg===123456
。。。

4.fanout消息类型的发送和接收

  因为是fanout消息类型的广播形式,可以用上面的bean形式来进行声明队列、绑定交换机,这种形式可以保证分布式应用一个应用多实例部署的情况下,只有一个队列,消息不会被多个应用重复消费。也可以用spring随机生成队列的形式来进行绑定生成队列。

1.消费者:

  两个方法,模拟两个消费者,直接绑定队列和交换机。队列名称用随机名称,而且自动删除(没有消费者的时候自动删除)。

    @RabbitListeners({ @RabbitListener(bindings = {
            @QueueBinding(value = @Queue(), exchange = @Exchange(name = "fanoutExchange", type = "fanout")) }) })
    @Override
    public void receiveFanoutMsg(Message message, Channel channel) {
        log.info("receiveFanoutMsg, 监听器接收到的消息: " + new String(message.getBody()));
    }
    
    @RabbitListeners({ @RabbitListener(bindings = {
            @QueueBinding(value = @Queue(), exchange = @Exchange(name = "fanoutExchange", type = "fanout")) }) })
    @Override
    public void receiveFanoutMsg2(Message message, Channel channel) {
        log.info("receiveFanoutMsg2, 监听器接收到的消息: " + new String(message.getBody()));
    }

启动应用后查看队列:(spring创建的默认队列。)

2.生产者:

    @Override
    public void sendFanoutMsg(String msg) {
        // 发送fanout消息,routingKey可以不指定
        amqpTemplate.convertSendAndReceive("fanoutExchange", "", msg);
    }

3.测试:

    @Test
    public void sendFanoutMsg() {
        for (int i= 0; i< 5; i ++) {
            messageService.sendFanoutMsg("bootFanout msg===" + i);
        }
    }

4.查看消费者控制台结果:

2020-11-07 21:45:49.255  INFO 20244 --- [ntContainer#3-1] cn.qlq.rabbitmq.MessageServiceImpl       : receiveFanoutMsg, 监听器接收到的消息: bootFanout msg===0
2020-11-07 21:45:49.259  INFO 20244 --- [ntContainer#0-1] cn.qlq.rabbitmq.MessageServiceImpl       : receiveFanoutMsg2, 监听器接收到的消息: bootFanout msg===0
2020-11-07 21:45:54.042  INFO 20244 --- [ntContainer#3-1] cn.qlq.rabbitmq.MessageServiceImpl       : receiveFanoutMsg, 监听器接收到的消息: bootFanout msg===1
2020-11-07 21:45:54.051  INFO 20244 --- [ntContainer#0-1] cn.qlq.rabbitmq.MessageServiceImpl       : receiveFanoutMsg2, 监听器接收到的消息: bootFanout msg===1
2020-11-07 21:45:59.040  INFO 20244 --- [ntContainer#0-1] cn.qlq.rabbitmq.MessageServiceImpl       : receiveFanoutMsg2, 监听器接收到的消息: bootFanout msg===2
2020-11-07 21:45:59.040  INFO 20244 --- [ntContainer#3-1] cn.qlq.rabbitmq.MessageServiceImpl       : receiveFanoutMsg, 监听器接收到的消息: bootFanout msg===2
2020-11-07 21:46:04.061  INFO 20244 --- [ntContainer#0-1] cn.qlq.rabbitmq.MessageServiceImpl       : receiveFanoutMsg2, 监听器接收到的消息: bootFanout msg===3
2020-11-07 21:46:04.064  INFO 20244 --- [ntContainer#3-1] cn.qlq.rabbitmq.MessageServiceImpl       : receiveFanoutMsg, 监听器接收到的消息: bootFanout msg===3
2020-11-07 21:46:09.044  INFO 20244 --- [ntContainer#3-1] cn.qlq.rabbitmq.MessageServiceImpl       : receiveFanoutMsg, 监听器接收到的消息: bootFanout msg===4
2020-11-07 21:46:09.054  INFO 20244 --- [ntContainer#0-1] cn.qlq.rabbitmq.MessageServiceImpl       : receiveFanoutMsg2, 监听器接收到的消息: bootFanout msg===4

5.Topic类型交换机的实使用

  其实各种类型交换机的消费者都一样,只需要监听队列就可以了。只是交换机类型不同,所以需要绑定的时候指定不同的routingKey。

1.消费者

    /**
     * 这个注解声明队列与交换机并且进行绑定
     */
    @RabbitListeners({ @RabbitListener(bindings = { @QueueBinding(value = @Queue("topic01"), key = {
            "aa" }, exchange = @Exchange(name = "topicExchange", type = "topic")) }) })
    public void receiveTopicMsg(Message message, Channel channel) {
        log.info("receiveTopicMsg, 监听器接收到的消息: " + new String(message.getBody()));
    }

    @RabbitListeners({ @RabbitListener(bindings = { @QueueBinding(value = @Queue("topic02"), key = {
            "aa.*" }, exchange = @Exchange(name = "topicExchange", type = "topic")) }) })
    public void receiveTopicMsg2(Message message, Channel channel) {
        log.info("receiveTopicMsg2, 监听器接收到的消息: " + new String(message.getBody()));
    }

    @RabbitListeners({ @RabbitListener(bindings = { @QueueBinding(value = @Queue("topic03"), key = {
            "aa.#" }, exchange = @Exchange(name = "topicExchange", type = "topic")) }) })
    public void receiveTopicMsg3(Message message, Channel channel) {
        log.info("receiveTopicMsg3, 监听器接收到的消息: " + new String(message.getBody()));
    }

  声明了3个队列。topic01通过routingkey为"aa"的与topicExchange绑定;topic02通过routingkey为"aa.*"的与topicExchange绑定;topic03通过routingkey为"aa.#"的与topicExchange绑定。

2.生产者:

    @Override
    public void sendTopicMsg(String msg, String routingKey) {
        amqpTemplate.convertSendAndReceive("topicExchange", routingKey, msg);
    }

3.测试类:

    @Test
    public void sendTopicMsg() {
        messageService.sendTopicMsg("bootTopic msg=== aa", "aa");
        messageService.sendTopicMsg("bootTopic msg=== aa.bb", "aa.bb");
        messageService.sendTopicMsg("bootTopic msg=== aa.bb.cc", "aa.bb.cc");
    }

4.查看消费者控制台:

2020-11-07 22:35:27.569  INFO 22764 --- [ntContainer#1-1] cn.qlq.rabbitmq.MessageServiceImpl       : receiveTopicMsg, 监听器接收到的消息: bootTopic msg=== aa
2020-11-07 22:35:27.574  INFO 22764 --- [ntContainer#2-1] cn.qlq.rabbitmq.MessageServiceImpl       : receiveTopicMsg3, 监听器接收到的消息: bootTopic msg=== aa
2020-11-07 22:35:32.547  INFO 22764 --- [ntContainer#0-1] cn.qlq.rabbitmq.MessageServiceImpl       : receiveTopicMsg2, 监听器接收到的消息: bootTopic msg=== aa.bb
2020-11-07 22:35:37.531  INFO 22764 --- [ntContainer#2-1] cn.qlq.rabbitmq.MessageServiceImpl       : receiveTopicMsg3, 监听器接收到的消息: bootTopic msg=== aa.bb.cc

6.direct类型的交换机实现延迟队列

  简单的实现设置队列中消息的生存时间是1分钟,超时后自动转换路由器进入死信队列。

1.配置类:

    /**** S 实现延迟队列 **/
    public static final String ORDER_DIRECT_EXCHANGE = "order.exchange";
    public static final String ORDER_DIRECT_ROUTING_KEY = "order.routingKey";
    public static final String ORDER_DIRECT_QUEUE = "order.queue";

    public static final String ORDER_DIRECT_EXCHANGE_DEAD = "order.dead.exchange";
    public static final String ORDER_DIRECT_QUEUE_DEAD = "order.dead.queue";

    @Bean
    public DirectExchange orderExchange() {
        System.out.println("==============orderExchange");
        return new DirectExchange(ORDER_DIRECT_EXCHANGE);
    }

    @Bean
    public DirectExchange orderDeadExchange() {
        return new DirectExchange(ORDER_DIRECT_EXCHANGE_DEAD);
    }

    @Bean
    public Queue orderQueue() {
        Queue queue = new Queue(ORDER_DIRECT_QUEUE);
        // 设置消息生存时间内60s
        Map<String, Object> arguments = queue.getArguments();
        arguments.put("x-message-ttl", 60000);
        // 声明DLX名称(死信队列名称)
        arguments.put("x-dead-letter-exchange", ORDER_DIRECT_EXCHANGE_DEAD);

        return queue;
    }

    @Bean
    public Queue orderDeadQueue() {
        Queue queue = new Queue(ORDER_DIRECT_QUEUE_DEAD);
        return queue;
    }

    @Bean
    public Binding orderQueueExchangeBinding(DirectExchange orderExchange, Queue orderQueue) {
        return BindingBuilder.bind(orderQueue).to(orderExchange).with(ORDER_DIRECT_ROUTING_KEY);
    }

    @Bean
    public Binding orderDeadQueueExchangeBinding(DirectExchange orderDeadExchange, Queue orderDeadQueue) {
        return BindingBuilder.bind(orderDeadQueue).to(orderDeadExchange).with(ORDER_DIRECT_ROUTING_KEY);
    }
    /**** E 实现延迟队列 **/

2.消费者:

    /**
     * 监听器接收消息。不需要手动调用,springboot会自动监听
     */
    @RabbitListener(queues = { "order.dead.queue" })
    public void receiveExpiredOrderMsg(Message message, Channel channel) {
        log.info("receiveExpiredOrderMsg, 监听器接收到的消息: " + new String(message.getBody()));
    }

3.生产者:

    @Override
    public void sendOrderMsg(String msg) {
        amqpTemplate.convertSendAndReceive("order.exchange", "order.routingKey", msg);
    }

4.测试类:

    @Test
    public void sendOrderMsgTest() {
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        for (int i = 0; i < 5; i++) {
            messageService.sendOrderMsg("order " + i + ", time: " + simpleDateFormat.format(new Date()));
        }
    }

5.结果(可以看到接到消息的时候和订单生成时间相差1min) 

2020-11-07 23:06:18.042  INFO 23200 --- [ntContainer#0-1] cn.qlq.rabbitmq.MessageServiceImpl       : receiveExpiredOrderMsg, 监听器接收到的消息: order 0, time: 2020-11-07 23:05:17
2020-11-07 23:06:22.981  INFO 23200 --- [ntContainer#0-1] cn.qlq.rabbitmq.MessageServiceImpl       : receiveExpiredOrderMsg, 监听器接收到的消息: order 1, time: 2020-11-07 23:05:22
2020-11-07 23:06:27.963  INFO 23200 --- [ntContainer#0-1] cn.qlq.rabbitmq.MessageServiceImpl       : receiveExpiredOrderMsg, 监听器接收到的消息: order 2, time: 2020-11-07 23:05:27
2020-11-07 23:06:32.962  INFO 23200 --- [ntContainer#0-1] cn.qlq.rabbitmq.MessageServiceImpl       : receiveExpiredOrderMsg, 监听器接收到的消息: order 3, time: 2020-11-07 23:05:32
2020-11-07 23:06:37.966  INFO 23200 --- [ntContainer#0-1] cn.qlq.rabbitmq.MessageServiceImpl       : receiveExpiredOrderMsg, 监听器接收到的消息: order 4, time: 2020-11-07 23:05:37

补充:一般生产者和发送者不在一个服务中,所以在MQ的声明中一般是建议消费者服务和生产者服务都声明交换机、队列、以及binding,防止报错。但是如果消息接收者只接收消息的话,也可以在消息接收者端只声明队列;交换机、队列、绑定等在生产者声明,如下:

(1)生产者端配置: 声明交换机、队列、绑定

package cn.qz.cloud.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author: qlq
 * @Description
 * @Date: 21:25 2020/11/13
 */
@Configuration
public class RabbitConfig {

    /**
     * 配置一个directExchange
     *
     * @return
     */
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("bootDirectExchange");
    }

    /**
     * 配置一个队列
     *
     * @return
     */
    @Bean
    public Queue directQueue() {
        return new Queue("bootDirectQueue");
    }

    /**
     * 建立一个绑定:队列和交换机绑定
     *
     * @param directExchange 交换机,上面的bean,用于自动注入
     * @param directQueue    队列,上面的队列,自动注入
     * @return
     */
    @Bean
    public Binding directBinding(DirectExchange directExchange, Queue directQueue) {
        return BindingBuilder.bind(directQueue).to(directExchange).with("bootDirectRoutingKey");
    }
}

(2)消费者端配置:声明队列

package cn.qz.cloud.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author: qlq
 * @Description
 * @Date: 21:25 2020/11/13
 */
@Configuration
public class RabbitConfig {

    /**
     * 配置一个队列
     *
     * @return
     */
    @Bean
    public Queue directQueue() {
        return new Queue("bootDirectQueue");
    }
}

  这样无论是先启动生产者服务还是消费者服务都不会报错。

补充:SimpleMessageListenerContainer 和 MQMessageListener消息监听器的用法

SimpleMessageListenerContainer:简单消息监听容器。这个类有很多设置:监听队列(多个队列)、自动启动、自动声明功能、设置事务特性、事务管理器、事务属性、事务容量(并发)、是否开启事务、回滚消息等、设置消费者数量、最大最小数量、批量消费、设置消息确认和自动确认模式、是否重回队列、异常捕获handler函数等。

如下:

(1) 声明监听器:  实现ChannelAwareMessageListener接口

package cn.qz.cloud.mq;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;

/**
 * 消息监听器
 *
 * @Author: qlq
 * @Description
 * @Date: 22:28 2020/11/13
 */
public class MyChannelAwareMessageListener implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        String msg = new String(message.getBody(), "UTF-8");
        System.out.println("---------->>消息为:" + msg);

        // 最后要应答或拒绝消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
//        channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
    }
}

(2) 配置类中声明

package cn.qz.cloud.config;

import cn.qz.cloud.mq.MyChannelAwareMessageListener;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author: qlq
 * @Description
 * @Date: 21:25 2020/11/13
 */
@Configuration
public class RabbitConfig {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 配置一个队列
     *
     * @return
     */
    @Bean
    public Queue directQueue() {
        return new Queue("bootDirectQueue");
    }

    @Bean
    public SimpleMessageListenerContainer myChannelAwareMessageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
        // 添加队列监听
        messageListenerContainer.setQueues(directQueue());
        // 设置监听数据
        messageListenerContainer.setConcurrentConsumers(1);
        // 设置最大监听数据数量
        messageListenerContainer.setMaxConcurrentConsumers(5);
        //设置是否重回队列
        messageListenerContainer.setDefaultRequeueRejected(false);
        //手动确认消息
        messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        // 设置消息监听器
        messageListenerContainer.setMessageListener(new MyChannelAwareMessageListener());
        return messageListenerContainer;
    }
}

  启动服务即可实现消息的监听。

补充:关于rabbittemplate的消息转换机器

  默认使用的消息转换器是org.springframework.amqp.support.converter.SimpleMessageConverter。 这个抓换器发送消息时,发送的信息必须实现Serializable接口,就是一个普通的序列化。可以使用Jackson2JsonMessageConverter。

1. 默认的消息转换器

(1)存放一条消息,内容如下:

        Map<String, Object> map = new HashMap<>();
        map.put("name", "张三");

(2)控制台查看消息如下:

 (3) 消费者接收:

        String msg = new String(message.getBody(), "UTF-8");
        System.out.println("---------->>消息为:" + msg);

结果:

---------->>消息为:�� sr java.util.HashMap���`� F 
loadFactorI     thresholdxp?@           t namet 张三x

(4) 正确处理,反序列一下:

        Map<String, Object> msg = (Map<String, Object>) SerializationUtils.deserialize(message.getBody());
        System.out.println("---------->>消息为:" + msg);

结果:

---------->>消息为:{name=张三}

2. 使用Jackson2JsonMessageConverter--实际就是转为JSON,在作为普通的JSON字符串序列化到mq中

(1) 设置RebbitTemplate的消息转换器

package cn.qz.cloud.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

/**
 * @Author: qlq
 * @Description
 * @Date: 21:25 2020/11/13
 */
@Configuration
public class RabbitConfig {

    /**
     * 配置一个directExchange
     *
     * @return
     */
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("bootDirectExchange");
    }

    /**
     * 配置一个队列
     *
     * @return
     */
    @Bean
    public Queue directQueue() {
        return new Queue("bootDirectQueue");
    }

    /**
     * 建立一个绑定:队列和交换机绑定
     *
     * @param directExchange 交换机,上面的bean,用于自动注入
     * @param directQueue    队列,上面的队列,自动注入
     * @return
     */
    @Bean
    public Binding directBinding(DirectExchange directExchange, Queue directQueue) {
        return BindingBuilder.bind(directQueue).to(directExchange).with("bootDirectRoutingKey");
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void changeRabbitConverter() {
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
    }
}

(2)  发送消息同上

(3) 控制台查看

 (4) 消费者端: 

        String msg = new String(message.getBody(), "UTF-8");
        System.out.println("---------->>消息为:" + msg);

结果:

---------->>消息为:{"name":"张三"}

(5) 也可以将读取到的消息转为JSON处理

        String msg = new String(message.getBody(), "UTF-8");
        HashMap hashMap = JSONObject.parseObject(msg, HashMap.class);
        System.out.println(hashMap);

  按理说消息接收者端应该是不用给 messageListenerContainer 设置消息转换器的,也看到代码有给消息的监听器容器设置消息转换器。。。。

原文地址:https://www.cnblogs.com/qlqwjy/p/13939544.html