Native RabbitMQ DLX

在消息过期、超过队列最大长度、被拒绝后,如果不重新入队,那可以选择进入死信队列(dead letter),通常死信队列配合消息过期策略可以应用在延时订单的业务场景下。

以下场景为:两个正常消费者A和B,一个拒绝消息的消费者C,和一个监听死信消息的消费者。

正常消费者A

/**
 * 普通消费者A
 *
 * @author zhangjianbing
 * time 2020/09/04
 */
@SuppressWarnings("Duplicates")
public class NormalConsumerA {

    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("1.1.1.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("beijing");
        connectionFactory.setPassword("123456");
        connectionFactory.setVirtualHost("beijing");
        // 创建连接
        Connection connection = connectionFactory.newConnection();
        // 创建信道
        final Channel channel = connection.createChannel();

        String queue = "WULIU.CALLBACK.QUEUE";
        System.out.println("正在等待消息。。。。。。");

        /**声明一个消费者**/
        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, StandardCharsets.UTF_8);
                System.out.println("NormalConsumerA收到的消息:" + message);
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        /**手动确认模式**/
        channel.basicConsume(queue, false, consumer);
    }

}

正常消费者B

/**
 * 普通消费者B
 *
 * @author zhangjianbing
 * time 2020/09/04
 */
@SuppressWarnings("Duplicates")
public class NormalConsumerB {

    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("10.1.3.37");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("beijing");
        connectionFactory.setPassword("123456");
        connectionFactory.setVirtualHost("beijing");
        // 创建连接
        Connection connection = connectionFactory.newConnection();
        // 创建信道
        final Channel channel = connection.createChannel();

        String queue = "WULIU.CALLBACK.QUEUE";
        System.out.println("正在等待消息。。。。。。");

        /**声明一个消费者**/
        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, StandardCharsets.UTF_8);
                System.out.println("NormalConsumerB收到的消息:" + message);
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        /**手动确认模式**/
        channel.basicConsume(queue, false, consumer);
    }

}

拒绝消费者C

/**
 * 拒绝消费者
 *          当多个消费者共同消费同一个队列的时候
 *          此消费者会根据业务需求来拒绝消息,可以选择是否将消息重新入队
 *          重新入队的消息会被打上一个重新入队的标签
 *          重新入队的消息会放在队列的末尾,再次轮询投递给队列上的消费者
 *
 * 拒绝消息有两种方式:
 *          ① basicNack
 *          ② basicReject
 *          这俩唯一的不同就是basicNack有批量拒绝参数
 *
 * @author zhangjianbing
 * time 2020/09/04
 */
@SuppressWarnings("Duplicates")
public class RejectConsumer {

    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("10.1.3.37");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("beijing");
        connectionFactory.setPassword("123456");
        connectionFactory.setVirtualHost("beijing");
        // 创建连接
        Connection connection = connectionFactory.newConnection();
        // 创建信道
        final Channel channel = connection.createChannel();

        String queue = "WULIU.CALLBACK.QUEUE";
        System.out.println("正在等待消息。。。。。。");

        /**声明一个消费者**/
        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, StandardCharsets.UTF_8);
                System.out.println("RejectConsumer收到的消息:" + message);
                try {
                    throw new RuntimeException("RejectConsumer业务处理发生异常");
                } catch (Exception re) {
                    System.out.println(re.getMessage() + ",拒绝消息");
                    channel.basicReject(envelope.getDeliveryTag(), false);// 不重新入队
                }
            }
        };

        /**手动确认模式**/
        channel.basicConsume(queue, false, consumer);
    }

}

死信消费者

/**
 * 死信队列消费者
 *
 * @author zhangjianbing
 * time 2020/9/4
 */
@SuppressWarnings("Duplicates")
public class DLXConsumer {

    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("10.1.3.37");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("beijing");
        connectionFactory.setPassword("123456");
        connectionFactory.setVirtualHost("beijing");
        // 创建连接
        Connection connection = connectionFactory.newConnection();
        // 创建信道
        final Channel channel = connection.createChannel();

        String queue = "DLX.CALLBACK.QUEUE";
        System.out.println("死信消费者正在等待消息。。。。。。");

        /**声明一个消费者**/
        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, StandardCharsets.UTF_8);
                System.out.println("死信消费者收到的消息:" + message);
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        /**手动确认模式**/
        channel.basicConsume(queue, false, consumer);
    }

}

生产者

/**
 * @author zhangjianbing
 * time 2020/9/4
 */
@SuppressWarnings("Duplicates")
public class Producer {

    public static final String EXCHANGE_NAME = "reject-exchange";

    public static final String DLX_EXCHANGE_NAME = "dlx-exchange";

    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("10.1.3.37");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("beijing");
        connectionFactory.setPassword("123456");
        connectionFactory.setVirtualHost("beijing");
        // 创建连接
        Connection connection = connectionFactory.newConnection();
        // 创建信道
        final Channel channel = connection.createChannel();

        /** 声明普通交换器、队列以及绑定,并将死信交换器参数传进来 **/
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
        Map<String, Object> argsMap = new HashMap<>();
        argsMap.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);// 将死信交换器的参数argsMap传进来
        String queue = "WULIU.CALLBACK.QUEUE";
        channel.queueDeclare(queue, true, false, false, argsMap);
        String routeKey = "error";
        channel.queueBind(queue, EXCHANGE_NAME, routeKey);

        /** 声明死信交换器、队列以及绑定 **/
        channel.exchangeDeclare(DLX_EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
        String dlxQueue = "DLX.CALLBACK.QUEUE";// 死信队列的名称
        channel.queueDeclare(dlxQueue, true, false, false, null);
        /**
         * 这里如果死信交换器声明成DIRECT交换器,那么消息被重新publish到死信交换器的时候是自带路由键的,
         * 所以这里需要将死信交换器和原来的路由键完全匹配绑定,消息才会被路由到死信队列
         * 如果死信交换器声明成TOPIC交换器,路由键可以设置成 #,这样所有被拒绝的消息都会重新进入死信队列中
         */
        channel.queueBind(dlxQueue, DLX_EXCHANGE_NAME, "#");

        for (int i = 1; i < 11; i++) {
            String message = "hello_world_" + i;
            channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes());
            System.out.println("生产者发送消息:" + message);
        }
        // 关闭信道和连接
        channel.close();
        connection.close();
    }

}

测试结果

生产者

生产者发送消息:hello_world_1
生产者发送消息:hello_world_2
生产者发送消息:hello_world_3
生产者发送消息:hello_world_4
生产者发送消息:hello_world_5
生产者发送消息:hello_world_6
生产者发送消息:hello_world_7
生产者发送消息:hello_world_8
生产者发送消息:hello_world_9
生产者发送消息:hello_world_10

消费者A

正在等待消息。。。。。。
NormalConsumerA收到的消息:hello_world_1
NormalConsumerA收到的消息:hello_world_4
NormalConsumerA收到的消息:hello_world_7
NormalConsumerA收到的消息:hello_world_10

消费者B

正在等待消息。。。。。。
NormalConsumerB收到的消息:hello_world_2
NormalConsumerB收到的消息:hello_world_5
NormalConsumerB收到的消息:hello_world_8

拒绝消费者C

正在等待消息。。。。。。
RejectConsumer收到的消息:hello_world_3
RejectConsumer业务处理发生异常,拒绝消息
RejectConsumer收到的消息:hello_world_6
RejectConsumer业务处理发生异常,拒绝消息
RejectConsumer收到的消息:hello_world_9
RejectConsumer业务处理发生异常,拒绝消息

死信消费者

死信消费者正在等待消息。。。。。。
死信消费者收到的消息:hello_world_3
死信消费者收到的消息:hello_world_6
死信消费者收到的消息:hello_world_9
原文地址:https://www.cnblogs.com/zhangjianbing/p/13624090.html