延迟队列模拟关单

上接以前写的一篇博客:RabbitMQ:TTL队列/消息&&死信队列

image-20201123201417434

部分配置文件:

spring:  
  rabbitmq:
    host: 192.168.1.43
    virtual-host: /
    port: 5672
    publisher-confirms: true #开启发送端确认
    publisher-returns: true #开启消息发送端消息抵达队列的确认
    template:
      #只要未抵达队列,以异步方式优先回调我们returnCallback
      mandatory: true
    listener:
      simple:
        #手动确认消息(关闭自动ack)
        acknowledge-mode: manual

controller:用controller模拟

@Controller
public class HelloController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @ResponseBody
    @GetMapping("/test/createOrder")
    public R createOrderTest(){
        OrderEntity entity = new OrderEntity();
        entity.setOrderSn(UUID.randomUUID().toString());
        entity.setModifyTime(new Date());
        //向MQ发送消息
        rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",entity);
        return R.ok().addData(entity);
    }

}

配置类:按照上图,创建对应的Queue,Binding,Exchange

@Slf4j
@Configuration
public class MQConfiguration {

    @RabbitListener(queues = "order.release.order.queue")
    public void listener(OrderEntity entity, Channel channel, Message message) throws IOException {
        log.info("关闭订单,订单信息{}",entity.toString());
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }


    @Bean
    public Queue orderDelayQueue(){

        Map<String,Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", "order-event-exchange");
        arguments.put("x-dead-letter-routing-key", "order.release.order");
        arguments.put("x-message-ttl", 60000);

        return new Queue("order.delay.queue",
                true,
                false,
                false,
                arguments);
    }

    @Bean
    public Queue orderReleaseQueue(){
        return new Queue("order.release.order.queue",
                true,
                false,
                false);
    }

    @Bean
    public Exchange orderEventExchange(){
        return new TopicExchange("order-event-exchange", true, false);
    }

    @Bean
    public Binding orderCreateOrderBinding(){
        return new Binding("order.delay.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.create.order",
                null);
    }

    @Bean
    public Binding orderReleaseOrderBinding(){
        return new Binding("order.release.order.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.release.order",
                null);
    }
}
@Configuration
public class RabbitConfig {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    /**
     * 定制rabbitTemplate
     */
    @PostConstruct
    public void initRabbitTemplate() {
        /**
         * 确认回调
         *correlationData: 当前消息的唯一关联数据(id)
         * ack 是否成功收到
         * cause:失败原因
         */
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            System.out.println("消息确认" + correlationData + ":" + ack + ":" + cause);
        });

        //消息尚未抵达队列:回调
        /**
         * message 投递失败的消息详细信息
         * replyCode 回复的状态码
         * replyText 回复的文本内容
         * exchange 交换机
         * routingKey 用哪个路由键
         */
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            System.out.println(message + ":" + replyCode + ":" + replyText + ":" + exchange + ":" + routingKey);
        });
    }
}

然后用controller模拟发送订单请求:

image-20201123201902797

rabbitmq management出现消息在死信队列中:

image-20201123202033568

一分钟后,控制台打印信息:

image-20201123201928500

延时队列测试成功。

原文地址:https://www.cnblogs.com/wwjj4811/p/14026702.html