RabbitMQ消费端限流策略

消息限流处理

如果 RabbitMQ 一次性将所有消息都发送给消费端,有很大几率会导致消费端崩掉,所以需要进行限流操作。让 RabbitMQ 每次最多发送指定数量的消息,一般情况下都设置数量为1。

通过调用 channel.basicQos(0, 1, false); 方法实现限流

实例

public class Produce {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建连接工厂并进行配置相关信息
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("111.231.83.100");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        // 2.通过连接工厂获取一个连接对象
        Connection connection = connectionFactory.newConnection();

        // 3.通过连接对象获取数据通信信道对象
        Channel channel = connection.createChannel();

        // 4.循环发送消息
        String exchange = "test_qos_exchange";
        String routingKey = "qos.save";
        for (int i = 0; i < 10; i++) {
            String msg = "Hello RabbitMQ! ";
            msg += i;
            channel.basicPublish(exchange, routingKey, null, msg.getBytes());
        }

        // 5.关闭资源
        channel.close();
        connection.close();
        connectionFactory.clone();
    }
}

消费端:

public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 1.创建连接工厂并进行配置相关信息
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("111.231.83.100");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        // 2.通过连接工厂获取一个连接对象
        Connection connection = connectionFactory.newConnection();

        // 3.通过连接对象创建一个通信信道对象
        Channel channel = connection.createChannel();

        String exchangeName = "test_qos_exchange";
        String queueName = "test_qos_queue";
        String routingKey = "qos.#";

        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);

        // 限流,autoAck设置为 false
        channel.basicQos(0, 1, false);
        channel.basicConsume(queueName, false, new MyConsumer(channel));
    }
}

自定义消费处理:

public class MyConsumer extends DefaultConsumer {

    private Channel channel ;

    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
            throws IOException {
        System.err.println("body: " + new String(body));
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
}

控制台输出:

body: Hello RabbitMQ! 0
body: Hello RabbitMQ! 1
body: Hello RabbitMQ! 2
body: Hello RabbitMQ! 3
body: Hello RabbitMQ! 4
body: Hello RabbitMQ! 5
body: Hello RabbitMQ! 6
body: Hello RabbitMQ! 7
body: Hello RabbitMQ! 8
body: Hello RabbitMQ! 9
原文地址:https://www.cnblogs.com/markLogZhu/p/13268005.html