RabbitMQ 延时队列

一、延时队列概念

延时队列最重要的特性体现在它的延时属性上,队列内部是有序的,延时队列中的消息是希望在到了指定时间之前或者之后被取出处理的

二、延时队列的应用场景

1、用户下了订单,十分钟之内未进行支付则自动取消订单

2、新创建的店铺,如果在十天之内都没有上架商品,则发送消息进行提醒

3、用户注册账号成功后,如果半个月没有登录,则发送消息进行提醒

4、用户发起退款,如果三天之内都没有得到处理,则发送消息通知相关运营人员进行处理

5、预定会议后,需要在预定的时间点前十分钟通知各个与会人员参与

上面的这些场景都有一个特点,需要在某个时间发生之前或者之后完成某一项任务,例如发生订单生成时间,在十分钟之后需要检查该订单的支付状态,如果订单未进行支付,需要将该订单关闭,理论上我们通过定时任务,一直轮询数据,每秒都查一次,取出所有十分钟之后未支付的订单,然后关闭就好了,如果数据量比较少,使用定时任务确实是一个不错的选择,但是,如果数据量比较大怎么办呢,轮询大量的数据对数据库的压力是很大的,并且实时性也不好(轮询大量数据需要时间),这样就无法满足业务要求,并且性能低下.这种情况下我们就可以使用 RabbitMQ 的延时队列了

三、延迟队列实战一

1、原理图

2、引入 Maven 依赖

<dependencies>
    <!--RabbitMQ 依赖-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.47</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
    <dependency>
        <groupId>io.springfox</groupId>
        <artifactId>springfox-swagger-ui</artifactId>
        <version>2.9.2</version>
    </dependency>
    <!--RabbitMQ 测试依赖-->
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <source>8</source>
                <target>8</target>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
            <configuration>
                <excludes>
                    <exclude>
                        <groupId>org.projectlombok</groupId>
                        <artifactId>lombok</artifactId>
                    </exclude>
                </excludes>
            </configuration>
        </plugin>
    </plugins>
</build>

3、application.yml

spring:
  rabbitmq:
    host: 192.168.59.130
    port: 5672
    username: admin
    password: admin123

4、RabbitMQ 配置类

/**
 * RabbitMQ 配置类
 */
@Configuration
public class RabbitConfig {
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    private static final String DEAD_EXCHANGE = "dead_exchange";
    private static final String NORMAL_QUEUE_1 = "normal_queue_1";
    private static final String NORMAL_QUEUE_2 = "normal_queue_2";
    private static final String DEAD_QUEUE = "dead_queue";
    private static final String NORMAL_ROUTING_KEY_1 = "nk1";
    private static final String NORMAL_ROUTING_KEY_2 = "nk2";
    private static final String DEAD_ROUTING_KEY = "dk";

    // 声明普通交换机
    @Bean("normal_exchange")
    public DirectExchange normalExchange() {
        return new DirectExchange(NORMAL_EXCHANGE, true, false);
    }

    // 声明普通队列 1,当该队列超时(30s)后会将消息传递到死信交换机
    @Bean("normal_queue_1")
    public Queue normalQueue1() {
        Map<String, Object> arguments = new HashMap<>();
        // 设置死信交换机
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        // 设置 routing-key
        arguments.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
        // 设置消息超时时间(ms)
        arguments.put("x-message-ttl", 30 * 1000);
        // 普通队列 1 绑定死信交换机
        return QueueBuilder.durable(NORMAL_QUEUE_1).withArguments(arguments).build();
    }

    // 声明普通队列 1 绑定普通交换机
    @Bean
    public Binding normalQueue1BindingNormalExchange(@Qualifier("normal_queue_1") Queue queue, @Qualifier("normal_exchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(NORMAL_ROUTING_KEY_1);
    }

    // 声明普通队列 2,当该队列超时(10s)后会将消息传递到死信交换机
    @Bean("normal_queue_2")
    public Queue normalQueue2() {
        Map<String, Object> arguments = new HashMap<>();
        // 设置死信交换机
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        // 设置 routing-key
        arguments.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
        // 设置消息超时时间(ms)
        arguments.put("x-message-ttl", 10 * 1000);

        return QueueBuilder.durable(NORMAL_QUEUE_2).withArguments(arguments).build();
    }

    // 声明普通队列 2 绑定普通交换机
    @Bean
    public Binding normalQueue2BindingNormalExchange(@Qualifier("normal_queue_2") Queue queue, @Qualifier("normal_exchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(NORMAL_ROUTING_KEY_2);
    }

    // 声明死信交换机
    @Bean("dead_exchange")
    public DirectExchange deadExchange() {
        return new DirectExchange(DEAD_EXCHANGE, true, false);
    }

    // 声明死信队列
    @Bean("dead_queue")
    public Queue deadQueue() {
        return new Queue(DEAD_QUEUE, true, false, false);
    }

    // 声明死信队列绑定死信交换机
    @Bean
    public Binding deadQueueBindDeadExchange(@Qualifier("dead_queue") Queue queue, @Qualifier("dead_exchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_ROUTING_KEY);
    }
}

5、Consumer

@Slf4j
@Component
public class Consumer {
    private static final String DEAD_QUEUE = "dead_queue";
    @RabbitListener(queues = DEAD_QUEUE)
    public void receiveD(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);
    }
}

6、Producer

@Slf4j
@RestController
public class Producer {
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    private static final String NORMAL_ROUTING_KEY_1 = "nk1";
    private static final String NORMAL_ROUTING_KEY_2 = "nk2";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/ttl/sendMsg/{message}")
    public void sendMsg(@PathVariable String message) {
        log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date(), message);
        rabbitTemplate.convertAndSend(NORMAL_EXCHANGE, NORMAL_ROUTING_KEY_1, "消息来自 ttl 为 30S 的队列: " + message);
        rabbitTemplate.convertAndSend(NORMAL_EXCHANGE, NORMAL_ROUTING_KEY_2, "消息来自 ttl 为 10S 的队列: " + message);
    }
}

7、测试过程及结果

启动项目,可以看到交换机和队列注册到了 RabbitMQ 服务器上

接着发送请求 http://localhost:8080/ttl/sendMsg/小毛毛变身

控制台显示 Producer 发送消息之后,Consumer 分别在 10 s 和 30 s 后从死信队列中消费到了消息,也就是生产者发送的两个消息分别延时了 10 s 和 30 s

 

四、延迟队列实战二

从上面的案例中我们已经实现了消息延迟功能,但是上面的案例存在不足,我们是将延时的时间直接设置在了 normal_queue_1、normal_queue_2 中,如果有新的需求,需要在 1 小时、2 小时之后发送呢,按照实战一中的逻辑,就必须要新增两个队列 normal_queue_3、normal_queue_4,让它们将消息延时 1 小时、2 小时,这样就很不友好,为了解决上述的弊端,我们可以设置一个通用的队列,然后通过 Producer 来设置消息的延时时间,这样就可以很灵活的控制延迟队列的时间了

1、原理图

2、RabbitMQ 配置类

@Configuration
public class RabbitConfig {
    private static final String COMMON_EXCHANGE = "common_exchange";
    private static final String DEAD_EXCHANGE = "dead_exchange";
    private static final String COMMON_QUEUE = "common_queue";
    private static final String DEAD_QUEUE = "dead_queue";
    private static final String COMMON_ROUTING_KEY = "common";
    private static final String DEAD_ROUTING_KEY = "dk";

    // 声明通用交换机
    @Bean("common_exchange")
    public DirectExchange commonExchange() {
        return new DirectExchange(COMMON_EXCHANGE, true, false);
    }

    // 声明通用队列,当该队列超时(30s)后会将消息传递到死信交换机
    @Bean("common_queue")
    public Queue commonQueue() {
        Map<String, Object> arguments = new HashMap<>();
        // 设置死信交换机
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        // 设置 routing-key
        arguments.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
        // 普通队列 1 绑定死信交换机
        return QueueBuilder.durable(COMMON_QUEUE).withArguments(arguments).build();
    }

    // 声明通用队列绑定通用交换机
    @Bean
    public Binding normalQueue1BindingNormalExchange(@Qualifier("common_queue") Queue queue, @Qualifier("common_exchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(COMMON_ROUTING_KEY);
    }

    // 声明死信交换机
    @Bean("dead_exchange")
    public DirectExchange deadExchange() {
        return new DirectExchange(DEAD_EXCHANGE, true, false);
    }

    // 声明死信队列
    @Bean("dead_queue")
    public Queue deadQueue() {
        return new Queue(DEAD_QUEUE, true, false, false);
    }

    // 声明死信队列绑定死信交换机
    @Bean
    public Binding deadQueueBindDeadExchange(@Qualifier("dead_queue") Queue queue, @Qualifier("dead_exchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_ROUTING_KEY);
    }
}

3、Producer

@Slf4j
@RestController
public class Producer {
    private static final String COMMON_EXCHANGE = "common_exchange";
    private static final String COMMON_ROUTING_KEY = "common";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/ttl/sendMsg/{message}/{ttlTime}")
    public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) {
        // 设置延时时间
        rabbitTemplate.convertAndSend(COMMON_EXCHANGE, COMMON_ROUTING_KEY, message, messagePostProcessor -> {
            messagePostProcessor.getMessageProperties().setExpiration(ttlTime);
            return messagePostProcessor;
        });
        log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", new Date(), ttlTime, message);
    }
}

4、Consumer

@Slf4j
@Component
public class Consumer {
    private static final String DEAD_QUEUE = "dead_queue";

    @RabbitListener(queues = DEAD_QUEUE)
    public void receiveD(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);
    }
}

5、测试过程及结果

启动 Springboot 项目,查看 RabbitMQ 控制台的 Exchange、Queue

浏览器分别发送请求

http://localhost:8080/ttl/sendMsg/小毛毛变身/30000
http://localhost:8080/ttl/sendMsg/小毛毛真机智/1000

查看 IDEA 控制台

两个延时消息成功发送出去了,一个消息延时 30 s,另外一个消息延时 1 s,但是为什么 Consumer 接收到消息的时间是一样的呢,延时 1 s 的消息不是应该更早消费掉吗?

因为我们的代码是使用在消息属性上设置 TTL 的方式,消息可能并不会按时死亡,RabbitMQ 只会检查第一个消息是否过期,过期则丢到死信队列,如果第一个消息的延时很长很长,而第二个消息的延时很短,第二个消息也不会优先得到执行

五、延迟队列实战三

实战二中出现的状况确实是一个比较麻烦的问题,如果不能实现在消息粒度上的 TTL,并使其在设置的 TTL 时间及时死亡,就无法设计成一个通用的延时队列,那么如何解决呢?

1、安装延时队列插件

从官网下载延时队列插件,把下载好的插件上传到 RabbitMQ 安装目录的 plugins 目录下,执行如下命令即可

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

然后重启 RabbitMQ,重启完成之后查看 RabbitMQ 控制台,发现交换机类型多了一种类型 x-delayed-message

2、原理图

3、RabbitMQ 配置类

@Configuration
public class RabbitConfig {
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";

    @Bean
    public Queue delayedQueue() {
        return new Queue(DELAYED_QUEUE_NAME);
    }

    //自定义交换机 我们在这里定义的是一个延迟交换机
    @Bean
    public CustomExchange delayedExchange() {
        Map<String, Object> args = new HashMap<>();
        // 自定义交换机的类型
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
    }

    @Bean
    public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue, @Qualifier("delayedExchange") CustomExchange delayedExchange) {
        return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

4、Producer

@Slf4j
@RestController
public class Producer {
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("sendDelayMsg/{message}/{delayTime}")
    public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
        rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message, correlationData -> {
            correlationData.getMessageProperties().setDelay(delayTime);
            return correlationData;
        });
        log.info("当前时间:{}, 发送一条延迟 {} 毫秒的信息给队列 delayed.queue:{}", new Date(), delayTime, message);
    }
}

5、Consumer

@Slf4j
@Component
public class Consumer {
    private static final String DELAYED_QUEUE = "delayed.queue";

    @RabbitListener(queues = DELAYED_QUEUE)
    public void receiveD(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到延迟队列信息{}", new Date().toString(), msg);
    }
}

6、测试过程及结果

启动 Springboot 项目,查看 RabbitMQ 控制台的 Exchange、Queue

浏览器分别发送

http://localhost:8080/sendDelayMsg/小毛毛变身/90000
http://localhost:8080/sendDelayMsg/小毛毛真可爱/10000

查看 IDEA 控制台,分别在 10 s、90 s 后消费者消费了该消息

 

原文地址:https://www.cnblogs.com/xiaomaomao/p/15550131.html