Spring整合RabbitMQ

1、安装RabbitMQ

  1)MAC:brew install rabbitmq    启动:rabbitmq-server

2、配置可视化RabbitMQ管理界面

  1)输入命令rabbitmq-plugins.bat enable rabbitmq_management,这样就可以添加可视化插件了。
  2)查看可视化插件是否成功:http://127.0.0.1:15672/

    用户名/密码:guest/guest

      

一、Spring整合RabbitMQ

官方文档:https://docs.spring.io/spring-amqp/docs/current/reference/html/#preface

要求:

 Spring版本>=5.2.0

 amqp-client版本>=5.7.0

1、依赖

<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit</artifactId>
  <version>2.3.5</version>
</dependency>

spring-rabbit的一些依赖:

    <dependency>
      <groupId>org.springframework.amqp</groupId>
      <artifactId>spring-amqp</artifactId>
      <version>2.3.5</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.9.0</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-context</artifactId>
      <version>5.3.4</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-messaging</artifactId>
      <version>5.3.4</version>
      <scope>compile</scope>
    </dependency>
View Code

2、配置 RabbitmqConfig

@Configuration
public class RabbitmqConfig {

    /**
     * 连接工厂
     *
     * @return
     */
    @Bean
    public CachingConnectionFactory connectionFactory() {
        // 指定rabbit服务地址
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory("localhost");
       /* // 集群环境中,指定集群地址,多个逗号隔开:host[:port],...
        cachingConnectionFactory.setAddresses("");
        // 集群环境中,表示随机设置连接顺序
        cachingConnectionFactory.setAddressShuffleMode(AbstractConnectionFactory.AddressShuffleMode.RANDOM);
        // 用户名和密码
        cachingConnectionFactory.setUsername("");
        cachingConnectionFactory.setPassword("");
        // 指定通道的缓存大小
        cachingConnectionFactory.setChannelCacheSize(25);*/
        return cachingConnectionFactory;
    }

    /**
     * @return
     */
    @Bean
    public RabbitAdmin amqpAdmin() {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());

        // 声明并创建队列
        Queue myQueue1 = new Queue("myqueue1");
        rabbitAdmin.declareQueue(myQueue1);
        Queue myQueue2 = new Queue("myqueue2");
        rabbitAdmin.declareQueue(myQueue2);

        // Direct交换器
        DirectExchange myExchange = new DirectExchange("myExchange");
        // 声明交换器,若不存在,则创建
        rabbitAdmin.declareExchange(myExchange);
        // 将队列myqueue绑定到Direct交换器myExchange上根据路由键myRouting
        rabbitAdmin.declareBinding(BindingBuilder.bind(myQueue()).to(myExchange).with("myRoutKey"));

        rabbitAdmin.declareBinding(BindingBuilder.bind(myQueue1).to(myExchange).with("myRoutKey1"));
        return rabbitAdmin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(connectionFactory());
    }

    /**
     * 创建一个队列,设置队列消息持久化
     *
     * @return
     */
    @Bean
    public Queue myQueue() {
        return new Queue("myqueue", true);
    }


    /**
     * 配置消息监听器容器
     *
     * @param connectionFactory
     * @return
     */
    @Bean
    public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        // 创建固定数量的监听器线程来处理监听到的消息,默认为1
//        container.setConcurrentConsumers(2);
        // 最大消息监听器并发线程数
//        container.setMaxConcurrentConsumers(10);
        // concurrentConsumers为2,maxConcurrentConsumers为10
        container.setConcurrency("2-10");
        // 监听的队列,多个队列逗号隔开
        container.setQueueNames("myqueue", "myqueue1", "myqueue2");
        // 消息监听器(适配器)
        container.setMessageListener(new MyMessageListener());
        return container;
    }

}

3、发送消息

可以使用下面的任何一种方法:

void send(Message message) throws AmqpException;

void send(String routingKey, Message message) throws AmqpException;

void send(String exchange, String routingKey, Message message) throws AmqpException;

    @Autowired
    RabbitTemplate rabbitTemplate;

        // 发送消息到默认的交换器,路由key为myqueue。默认交换器为"",因为所有队列都使用其队列名称作为绑定值自动绑定到默认交换器(直接交换),因此路由key就是队列名称
        rabbitTemplate.convertAndSend("myqueue", "hello rabbitmq");

        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setDelay(30);
        messageProperties.setMessageId("123456789");
        // 发送消息到myExchange交换器上根据路由键RountKey绑定的消息队列中
        rabbitTemplate.send("myExchange", "myRoutKey", new Message("hello rabbitmq".getBytes(), messageProperties));
        // 发送消息到myqueue队列中
        rabbitTemplate.send("myqueue", new Message("hello rabbitmq".getBytes(), messageProperties));

4、消费消息

  方式1:@RabbitListener 注解

@Component
public class MyService {

    @RabbitListener(queues = "myQueue")
    public void processOrder(String data) {
        ...
    }

}

  方式2:定义消息监听器实现 org.springframework.amqp.core.MessageListener 接口

    ①:定义实现MessageListener 接口的消息监听器

public class MyMessageListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        String mesg = new String(message.getBody());
    }

}

    或者实现MessageListener的子接口 ChannelAwareMessageListener,可以手动提交ack

public class MyMessageListener implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        String msg = new String(message.getBody());
        // 处理完成确认消息,multiple:如果为true,确认之前接受到的消息;如果为false,只确认当前消息
        // void basicAck(long deliveryTag, boolean multiple) throws IOException;
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        // 拒绝一个或多个接收到的消息,requeue为true,消息应该被重新排队,而不是被丢弃/死信; requeue为false,消息被丢弃
        // void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
//        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        // 拒绝一个消息,requeue为true,消息应该被重新排队,而不是被丢弃/死信; requeue为false,消息被丢弃
        // void basicReject(long deliveryTag, boolean requeue) throws IOException;
//        channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
    }

}

    ②:在RabbitConfig中配置消息监听器容器并设置消息监听器

   /**
     * 配置消息监听器容器
     *
     * @param connectionFactory
     * @return
     */
    @Bean
    public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        // 创建固定数量的监听器线程来处理监听到的消息,默认为1
//        container.setConcurrentConsumers(2);
        // 最大消息监听器并发线程数
//        container.setMaxConcurrentConsumers(10);
        // concurrentConsumers为2,maxConcurrentConsumers为10
        container.setConcurrency("2-10");
        // 监听的队列,多个队列逗号隔开
        container.setQueueNames("myqueue1", "myqueue2", "myqueue3");
        // 消息监听器(适配器)
        container.setMessageListener(new MyMessageListener());
        return container;
    }

   增加消息监听器线程:如果 maxConcurrentConsumers 尚未达到,并且现有使用者已连续十个周期处于活动状态(

如果消费者在batchSize*receiveTimeout毫秒内至少收到一条消息,则认为该消费者处于活动状态

),并且自启动最后一个使用者以来至少经过了10秒钟,则将启动新的使用者。

  减少消息监听器线程:如果运行的并发消费者数量超过了,并且使用者检测到十个连续超时(空闲),并且最后一个使用者至少在60秒前停止,则该使用者将停止。超时取决于receiveTimeout和batchSize属性。如果使用者未接收到batchSize * receiveTimeout毫秒中的消息,则被认为是空闲的。因此,在默认超时(一秒)和batchSize为4的情况下,在40秒的空闲时间后考虑停止使用程序(四个超时对应于一个空闲检测)

  方式3:自定义消息监听器,使用 MessageListenerAdapter(消息监听适配器),消息监听适配器不是必须的,它用于适配没有实现 MessageListener的消息监听器和消息监听器容器的绑定

   通过消息监听适配器我们可以自定义消息监听器 不必实现 MessageListener ,及自定义消息处理方法

    ①:定义消息监听器

/**
 * 自定义消息监听器,需与监听器适配器一起使用
 */
public class MyCustomListener {
    public void receiveMessage(String message) {
        System.out.println("Received <" + message + ">");
    }
}

    ②:RabbitConfig 中配置消息监听器容器

   /**
     * 配置消息监听器容器
     *
     * @param connectionFactory
     * @return
     */
    @Bean
    public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        // 监听的队列,多个队列逗号隔开
        container.setQueueNames("myqueue1", "myqueue2", "myqueue3");
        // 消息监听器(适配器)
        container.setMessageListener(listenerAdapter());
        return container;
    }

    /**
     * 消息监听器适配器:适配消息监听器和接收消息的方法名称
     * (必须注入此适配器,否则报错)
     *
     * @return
     */
    @Bean
    MessageListenerAdapter listenerAdapter() {
        return new MessageListenerAdapter(new MyCustomListener(), "receiveMessage");
    }

  

二、SpringBoot整合RabbitMQ

官方地址:https://docs.spring.io/spring-boot/docs/current/reference/html/spring-boot-features.html#boot-features-amqp 

示例:https://spring.io/guides/gs/messaging-rabbitmq/#scratch

springboot版本:2.4.2

1、依赖

      <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
        <version>2.4.2</version>
      </dependency>

 spring-boot-starter-amqp的依赖:

 <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
      <version>2.4.2</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-messaging</artifactId>
      <version>5.3.3</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework.amqp</groupId>
      <artifactId>spring-rabbit</artifactId>
      <version>2.3.4</version>
      <scope>compile</scope>
    </dependency>
View Code

END.

原文地址:https://www.cnblogs.com/yangyongjie/p/14463654.html