【日常摘要】- RabbitMq实现延时队列

简介

  • 什么是延时队列?
    • 一种带有延迟功能的消息队列

过程:

  • 使用场景
    • 比如存在某个业务场景
      • 发起一个订单,但是处于未支付的状态?如何及时的关闭订单并退还库存?
      • 如何定期检查处于退款订单是否已经成功退款?
    • 为了解决上述的场景,就可以通过延时队列去处理
  • 简单实现
    /**
     * rabbitTemplate
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * rabbitAdmin
     */
    @Autowired
    private RabbitAdmin rabbitAdmin;

    /**
     * messageProperties
     */
    @Autowired
    private MessageProperties messageProperties;

    /**
     * 发送延迟队列消息
     *
     * @param exchange
     * @param content
     * @param millseconds
     */
    public void delayPublish(String exchange, String content, int millseconds) {
        String delayExchangeName = exchange + "_delay";
        String delayQueueName = delayExchangeName + "->queue_delay";
        String delayRouteKey = "dead";
        Map<String, Object> arguments = new HashMap<>();
        arguments.putIfAbsent("x-dead-letter-exchange", exchange);
        declareExchange(exchange);
        declareExchange(delayExchangeName);
        rabbitAdmin.declareQueue(new Queue(delayQueueName, true, false, false, arguments));
        rabbitAdmin.declareBinding(new Binding(delayQueueName, Binding.DestinationType.QUEUE, delayExchangeName,
                delayRouteKey, Collections.emptyMap()));
        MessageProperties messageProperties = getMessageProperties(Collections.emptyMap());
        messageProperties.setExpiration(Integer.toString(millseconds));
        publish(delayExchangeName, content, messageProperties);
    }
    
    /**
     * sendMessage
     *
     * @param exchange
     * @param content
     * @param messageProperties
     */
    private void publish(String exchange, String content, MessageProperties messageProperties) {
        declareExchange(exchange);
        Message message = new Message(content.getBytes(StandardCharsets.UTF_8), messageProperties);
        try {
            rabbitTemplate.send(exchange, "", message);
            log.debug("推送给exchange:{},消息体:{} 成功", exchange, content);
        } catch (Exception e) {
            log.error("推送给exchange:{},消息体:{} 失败!!", exchange, content, e);
            throw e;
        }
    }
    
    /**
     * declareExchange
     *
     * @param exchange
     */
    private void declareExchange(String exchange) {
        rabbitAdmin.declareExchange(new FanoutExchange(exchange, true, false, null));
    }
    
    /**
     * getMessageProperties
     *
     * @param header
     * @return
     */
    private MessageProperties getMessageProperties(Map<String, String> header) {
        if (header == null) {
            return this.messageProperties;
        }

        MessageProperties customMessageProperties = new MessageProperties();
        customMessageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        customMessageProperties.setHeader("content_encoding", "JSON");
        for (Map.Entry<String, String> item : header.entrySet()) {
            customMessageProperties.setHeader(item.getKey(), item.getValue());
        }
        return customMessageProperties;
    }

 
  • 通过上述代码就可以实现一个简单的延时队列消息的发布
  • 那么只需要进行对应的监听便可以进行消费,达到延时队列的发布及消费的功能
原文地址:https://www.cnblogs.com/lycsmzl/p/13356198.html