rabbitmq的延迟队列(五)

延迟队列

在RabbitMQ中并未提供延迟队列功能。

但是可以使用:TTL+死信队列 组合实现延迟队列的效果。

 rabbitmq-high-producer项目

 application.properties文件

复制代码
server.port=8081
# ip
spring.rabbitmq.host=127.0.0.1
#默认5672
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=guest
#密码
spring.rabbitmq.password=guest
#连接到代理时用的虚拟主机
spring.rabbitmq.virtual-host=/
#是否启用【发布确认】,默认false
#spring.rabbitmq.publisher-confirm-type=correlated替换spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-confirm-type=correlated
#是否启用【发布返回】,默认false
spring.rabbitmq.publisher-returns=true
#表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#rabbitmq限流,必须在ack确认才能使用
#消费者最小数量
spring.rabbitmq.listener.simple.concurrency=1
#最大的消费者数量
spring.rabbitmq.listener.simple.max-concurrency=10
#在单个请求中处理的消息个数,他应该大于等于事务数量(unack的最大数量)
spring.rabbitmq.listener.simple.prefetch=2
        
复制代码

DelayController类
package com.qingfeng.rabbitmqhighproducer.delay;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

@RestController
@RequestMapping("delay")
public class DelayController {
    @Autowired
    private RabbitTemplate rabbitTemplate;



    // 发送延迟队列消息
    //http://127.0.0.1:8081/delay/sendDelay
    @GetMapping("/sendDelay")
    public String sendDelay() {
        String messageId = String.valueOf(UUID.randomUUID());
        //order_exchange正常交换机  test.order.wq:正常交换机与正常绑定的队列的路由
        rabbitTemplate.convertAndSend("order_exchange", "order.wq", "订单id"+messageId);
        return "ok";
    }

}
DelayQueueRabbitConfig类
package com.qingfeng.rabbitmqhighproducer.delay.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 延迟队列
 */
@Configuration
public class DelayQueueRabbitConfig {

    //正常队列名称
    public static final String ORDER_QUEUE = "order_queue";
    //正常交换机名称
    public static final String ORDER_EXCHANGE = "order_exchange";

    //ttl过期时间  10s
    private static final int ORDER_EXPIRATION = 10000;

    //设置正常队列长度限制
    private static final int ORDER_LENGTH = 10;

    //死信队列名称
    public static final String ORDER_DLX_QUEUE = "order_dlx_queue";
    //死信交换机名称
    public static final String ORDER_DLX_EXCHANGE = "order_dlx_exchange";


    //声明正常交换机
    @Bean("orderExchange")
    public TopicExchange orderExchange(){
        return new TopicExchange(ORDER_EXCHANGE);
    }

    //声明正常队列绑定死信队列的交换机
    //.withArgument("x-dead-letter-routing-key", "order.cencel")为死信队列和死信交换机的绑定路由dlx.order.#
    @Bean("orderQueue")
    public Queue orderQueue(){
        return QueueBuilder.durable(ORDER_QUEUE)
                .withArgument("x-dead-letter-exchange", ORDER_DLX_EXCHANGE)
                .withArgument("x-dead-letter-routing-key", "dlx.order.cancel")
                .withArgument("x-message-ttl", ORDER_EXPIRATION)
                .withArgument("x-max-length",ORDER_LENGTH)
                .build();
    }

    //声明正常队列和正常交换机的绑定
    @Bean
    public Binding orderBinding(){
        return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("order.#");
    }

//=========================================================================

    //声明死信队列
    @Bean("orderDlxQueue")
    public Queue orderDlxQueue(){
        return new Queue(ORDER_DLX_QUEUE);
    }
    //声明死信交换机
    @Bean("orderDlxExchange")
    public TopicExchange orderDlxExchange(){
        return new TopicExchange(ORDER_DLX_EXCHANGE);
    }
    //声明死信队列和死信交换机的绑定
    @Bean
    public Binding orderDlxBinding(){
        return BindingBuilder.bind(orderDlxQueue()).to(orderDlxExchange()).with("dlx.order.#");
    }


}
启动rabbitmq-high-producer项目
访问:http://127.0.0.1:8081/delay/sendDelay

 我们在设置的ttl过期时间10000毫秒过后,也就是10秒后,正常队列的消息会转到死信队列里面去

rabbitmq-high-consumer项目的

DelayListener类
package com.qingfeng.rabbitmqhighconsumer.delay;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * Consumer ACK机制:
 *  1. 设置手动签收。 spring.rabbitmq.listener.simple.acknowledge-mode=manual
 *  2. 让监听器类实现ChannelAwareMessageListener接口
 *  3. 如果消息成功处理,则调用channel的 basicAck()签收
 *  4. 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer
 */

@Component
public class DelayListener {

    //手动签收
    //延迟队列消息,是到了死信队列里面去了,我们要监听死信队列
    @RabbitHandler
    @RabbitListener(queues = "order_dlx_queue")
    public void onMessage(Message message, Channel channel) throws Exception {
        //Thread.sleep(1000);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {
            //1.接收转换消息
            System.out.println("接受到的消息为"+new String(message.getBody()));

            //2. 处理业务逻辑
            System.out.println("处理业务逻辑...");
            System.out.println("根据订单id查询其状态...");
            System.out.println("判断状态是否为支付成功");
            System.out.println("取消订单,回滚库存....");
            //int i = 1/0;//出现错误
            //3. 手动签收
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            /**
             * 4.有异常就拒绝签收
             * basicNack(long deliveryTag, boolean multiple, boolean requeue)
             * 第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费
             * requeue:true为将消息重返当前消息队列,还可以重新发送给消费者;
             * alse:将消息丢弃
             */
            System.out.println("有异常就拒绝签收");
            //拒绝签收,不重回队列,requeue为false,这样才能到死信队列里面去
            channel.basicNack(deliveryTag,true,false);
        }
    }
}

但我们启动rabbitmq-high-consumer项目

延迟队列小结:

  1. 延迟队列 指消息进入队列后,可以被延迟一定时间,再进行消费。

  2. RabbitMQ没有提供延迟队列功能,但是可以使用 : TTL + DLX 来实现延迟队列效果。

原文地址:https://www.cnblogs.com/Amywangqing/p/14696155.html