十四、死信、延时和优先级队列

死信队列

DLX(Dead-Letter-Exchange),可以称为死信交换器。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称为死信队列。

消息变成死信队列有下面几个情况:

  1. 消息被拒绝(channel.basicNack或channel.basicReject),并且设置requeue参数为false;
  2. 消息过期;
  3. 队列达到最大长度。

在声明队列时,在队列参数属性中指定DLX,RabbitMQ就会将死信重新发布到指定的DLX上,从而被路由到死信队列中。通过监听这个队列的消息进行处理,并将消息的TTL设置为0配合使用可以弥补immediate参数的功能。

  • 通过在channel.queueDeclare方法添加DLX:

     //创建DLX
    channel.exchangeDeclare("dlx_exchange", BuiltinExchangeType.DIRECT, true);
    Map<String, Object> arguments = new HashMap<>();
    arguments.put("x-dead-letter-exchange", DLX_EXCHANGE);
    //为队列添加DLX
    channel.queueDeclare(NOREMAL_QUEUE, true, false, false, arguments);
    

    也可以为DLX指定路由键,如果没有指定,则使用原队列的路由键:

    arguments.put("x-dead-letter-routing-key", DLX_QUEUE);
    

    下面代码设置了TTL、DLX、DLK(DLX路由键):

    public class Send {
        final static String NOREMAL_EXCHANGE = "normal_exchange";
        final static String NOREMAL_QUEUE= "normal_queue";
        final static String DLX_EXCHANGE= "dlx_exchange";
        final static String DLX_QUEUE= "dlx_queue";
    
        public static void main(String[] args) {
            Connection connection = null;
            Channel channel;
            try {
                connection = ConnectionUtils.getConnection();
                channel = connection.createChannel();
    
                //正常交换器
                channel.exchangeDeclare(NOREMAL_EXCHANGE, BuiltinExchangeType.DIRECT, true);
                //创建死信交换器
                channel.exchangeDeclare(DLX_EXCHANGE, BuiltinExchangeType.DIRECT, true);
    
                //设置正常队列参数,添加死信队列
                Map<String, Object> arguments = new HashMap<>();
                //队列消息过期时间
                arguments.put("x-message-ttl", 1000);
                //设置死信队列
                arguments.put("x-dead-letter-exchange", DLX_EXCHANGE);
                //设置DLX路由键,不设置,则使用原队列的路由键
                arguments.put("x-dead-letter-routing-key", DLX_QUEUE);
                channel.queueDeclare(NOREMAL_QUEUE, true, false, false, arguments);
                channel.queueBind(NOREMAL_QUEUE, NOREMAL_EXCHANGE, NOREMAL_QUEUE);
    
                channel.queueDeclare(DLX_QUEUE, true, false, false, null);
                channel.queueBind(DLX_QUEUE,DLX_EXCHANGE,DLX_QUEUE);
    
                channel.basicPublish(NOREMAL_EXCHANGE, NOREMAL_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN, "死信队列".getBytes("utf-8"));
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } finally {
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    

    代码里创建了两个交换器normal_exchangedlx_exchange,分别绑定了两个队列normal_exchangedlx_exchange

    运行上面代码后,10秒后,在Web管理页面中,可以看到如下内容。在10秒消息过期后变成死信,消息发布到了死信队列。也可以看到第二列中的DTTLDLX标记,D为持久化durable

延时队列

延时队列存储的对象是对应的延迟消息,延迟消息指当消息被发送到队列后,并不立即被消费者拿到消息,而是等待特定的时间后,消费者才能拿到消息。

延时队列应用场景有:

  • 在订单系统中,用户下单后,通常有30分钟的付款时间。如果30分钟后没有付款,则订单被取消,这里可以使用延时队列处理订单。
  • 用户通过远程控制家里的智能设备在指定时间进行工作,可以将用户指定发送到延时队列,到达时间后再推送到智能设备。

在RabbitMQ中,延时队列通过前面的DLX和TTL共同作用可以模拟出延迟队列功能。

和死信队列消费者监听正常队列不同,延时队列中消费者监听的是死信队列。当消费在设置DLX和TTL后,发送到队列中经过指定时候后变成死信,死信重新发送到死信队列,而消费者监听到死信队列中有消息而进行消费,这样就达到了消息的延时。

优先级队列、消息

优先级队列,具有优先级的队列具有高的优先权,优先级高的消息具有优先被消费的特权。

通过设置队列的x-max-priority参数设置队列优先级,之后在发送消息时设置消费优先级:

public class Send {
    final  static String exchange = "priority_test";

    public static void main(String[] args) {
        Connection connection = null;
        Channel channel;
        try {
            connection = ConnectionUtils.getConnection();
            channel = connection.createChannel();
            channel.exchangeDeclare(exchange, BuiltinExchangeType.DIRECT);

            //设置队列的优先级为10
            Map<String, Object> arguments = new HashMap<>();
            arguments.put("x-max-priority", 10);
            channel.queueDeclare(exchange, true, false, false, arguments);

            channel.queueBind(exchange, exchange, exchange);

            //设置消息的优先级
            AMQP.BasicProperties build = new AMQP.BasicProperties
                    .Builder()
                    .priority(5)   //设置消息的优先级为5,默认最低为0。最大不能超过队列优先级
                    .build();

            channel.basicPublish(exchange, exchange, build, "队列、消息优先级".getBytes("utf-8"));
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

运行上面代码,可以在Web管理页面看到优先级队列第二列有Pri标志:

原文地址:https://www.cnblogs.com/zenghi-home/p/10065439.html