RabbitMQ持久化

在项目中,有时候需要消息保障100%投递,我们来看下 RabbitMQ 是怎么支持的

一、RabbitMQ 持久化配置

1.1 交换机持久化配置

设置 durable 属性为 true

实例:

String exchangeType = "topic";
String exchangeName = "persistenceExchange";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);

1.2 队列持久化配置

设置 durable 属性为 true

实例:

String queueName = "persistenceQueue";
channel.queueDeclare(queueName, false, false, false, null);

1.3 消息持久化配置

设置消息属性 Delivery mode2

实例:

AMQP.BasicProperties properties =
        new AMQP.BasicProperties().builder()
                // 设置消息是否持久化
                .deliveryMode(2)
    			.build();
channel.basicPublish(exchangeName, routingKey, properties, "持久化消息".getBytes());

二、生产者确认

标记为持久化后的消息也不能完全保证不会丢失。当 RabbitMQ 接收到生产者的消息,但是还没有来得及保存到磁盘上,服务器就挂了(比如机房断电),那么重启后,RabbitMQ 中的这条未及时保存的消息就会丢失。因为RabbitMQ 不做实时立即的磁盘同步(fsync)。这种情况下,对于持久化要求不是特别高的简单任务队列来说,还是可以满足的。如果需要更强大的保证,那么就需要使用生产者确认反馈机制。

2.1 confirm 确认机制

实现原理

生产者将信道设置成 confirm 模式 ,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ackmultiple 域,表示到这个序列号之前的所有消息都已经得到了处理;

confirm 模式 最大的好处在于是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息;

注意: confirm 机制是只保证了消息到达 exchange,并不保证消息可以路由到正确的 queue。

实例

消费者

public class Consumer {

    public static void main(String[] args) throws Exception {
        // 1.创建连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置主机
        connectionFactory.setHost("111.231.83.100");
        // 设置端口
        connectionFactory.setPort(5672);
        // 设置虚拟主机
        connectionFactory.setVirtualHost("/");

        // 2.获取一个连接对象
        final Connection connection = connectionFactory.newConnection();

        // 3.创建 Channel
        final Channel channel = connection.createChannel();

        String exchangeType = "topic";

        // 4.声明交换机
        String exchangeName = "confirmExchange";
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);

        // 5.申明队列
        String queueName = "confirmQueue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 6.将交换机和队列进行绑定关系
        String routingKey1 = "#";
        channel.queueBind(queueName, exchangeName, routingKey1);

        // 7.循环消费
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, queueingConsumer);
        System.err.println("消费端启动");
        while (true) {
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.err.println("消费端消费: " + msg);
        }
    }
}

生产者:

public class Producer {

    public static void main(String[] args) throws Exception {
        // 1.创建连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置主机
        connectionFactory.setHost("111.231.83.100");
        // 设置端口
        connectionFactory.setPort(5672);
        // 设置虚拟主机
        connectionFactory.setVirtualHost("/");

        // 2.获取一个连接对象
        final Connection connection = connectionFactory.newConnection();

        // 3.创建 Channel
        final Channel channel = connection.createChannel();
        // 4.开启消息确认模式
        channel.confirmSelect();
        // 5.声明交换机
        String exchangeName = "confirmExchange";
        // 6.发送消息
        channel.basicPublish(exchangeName, "message", null, "确认事件消息".getBytes());
        // 7. 添加确认监听
        channel.addConfirmListener(new ConfirmListener() {
            /**发送成功回调**/
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.err.println(" deliveryTag=" + deliveryTag);
                System.err.println(" multiple=" + multiple);
                System.err.println("-------发送消息至MQServer成功!-----------");
            }

            /**发送失败回调**/
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.err.println(" deliveryTag=" + deliveryTag);
                System.err.println(" multiple=" + multiple);
                System.err.println("-------发送消息至MQServer失败!-----------");
            }
        });

    }
}

先启动消费者,然后启动生产者,观察控制台输出:

消费者控制台输出:

消费端启动
消费端消费: 确认事件消息

生产者控制台输出:

 deliveryTag=1
 multiple=false
-------发送消息至MQServer成功!-----------

2.2 Return 消息机制

Return 消息是用于处理一些不可路由的消息!生产者通过指定一个 exchangeroutingkey 把消息送达到某个队列中去,然后消费者监听队列,进行消费处理。但是在某些情况下,如果我们在发送消息时,指定的 routingkey 路由不到,要监听这种不可达的消息,就要使用 return Listener

原理

在基础 API 中有一个关键的配置项 Mandatory ,如果为 true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为 false,则 broker 端自动删除该消息。

实例: routingKey 不存在

生产者:

public class Producer {

    public static void main(String[] args) throws Exception {
        // 1.创建连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置主机
        connectionFactory.setHost("111.231.83.100");
        // 设置端口
        connectionFactory.setPort(5672);
        // 设置虚拟主机
        connectionFactory.setVirtualHost("/");

        // 2.获取一个连接对象
        final Connection connection = connectionFactory.newConnection();

        // 3.创建 Channel
        final Channel channel = connection.createChannel();
        channel.confirmSelect();
        // 4.监听 ReturnListener
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange,
                                     String routingKey, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                System.err.println("replyCode: " + replyCode);
                System.err.println("replyText: " + replyText);
                System.err.println("exchange: " + exchange);
                System.err.println("routingKey: " + routingKey);
                System.err.println("properties: " + properties);
                System.err.println("body: " + new String(body));
            }
        });
        // 5.声明交换机,使用 defaultExchange
        String exchangeName = "";
        // 6.发送消息
        // 配置Mandatory为 true,否则 RabbitMQ 会自动删除该消息,ReturnListener 无法监听
        channel.basicPublish(exchangeName, "business.a",true, null, "ReturnListener消息".getBytes());
    }
}

启动生产者,观察控制台输出:

replyCode: 312
replyText: NO_ROUTE
exchange: 
routingKey: business.a
properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body: ReturnListener消息

注:Return 只有MQ服务成功接收到消息,并且路由失败的时候才会回调, 当 exchange 不存在的时候, return Listener 将不会被回调。

三、消费者手工签收

RabbitMQ 服务器将消息发送给消费者后,如果没有消费者没有返回确认信息的话, RabbitMQ 服务器将会持续发送直至这个消息被消费掉或过期。

消费者确认有两种方式:

  • 自动确认模式(automatic acknowledgement model):当 RabbbitMQ 将消息发送给应用后,消费者端自动回送一个确认消息,此时 RabbitMQ 删除此消息。
  • 显式确认模式(explicit acknowledgement model):消费者收到消息后,可以在执行一些逻辑后,消费者自己决定什么时候发送确认回执(acknowledgement),RabbitMQ 收到回执后才删除消息,这样就保证消费端不会丢失消息

3.1自动确认

自动确认模式 只需要设置 autoAck = true 即可。

QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);

3.2 手动确认

  1. 设置关闭自动确认 autoAck = false 即可。
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, false, queueingConsumer);
  1. 手动确认
// 消费成功确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
// 消费失败,重回队列
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
// 消费失败,丢弃消息
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
//或者使用 basicReject
// 消费失败,重回队列
channel.basicReject(delivery.getEnvelope().getDeliveryTag(), true);    
// 消费失败,丢弃消息
channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);   
原文地址:https://www.cnblogs.com/markLogZhu/p/13267826.html