(四)RabbitMQ:RabbitMQ进阶

1.生产者确认

在使用RabbitMQ的时候,可以通过消息持久化操作来解决因为服务器的异常崩溃而导致的消息丢失,除此之外,还会遇到一个问题,当消息的生产者将消息发送出去之后,消息到底有没有正确地到达服务器呢?如果不进行特殊配置,默认情况下发送消息的操作是不会返回任何信息给生产者的,也就是默认情况下生产者是不知道消息有没有正确地到服务器。如果在消息到达服务器之前已经丢失,持久化操作也解决不了这个问题。
RabbitMQ针对这个问题,提供了两种解决方式:

  • 通过事务机制实现。
  • 通过发送方确认(publisher confirm)机制实现。

1.1事务机制

RabbitMQ客户端中与事务机制相关的方法有三个:

  1. channel.txSelect:用于将当前信道设置成事务模式。
  2. channel.txRollback:用于提交事务。
  3. channel.txRollback:用于事务回滚。

使用代码示例:

        channel.txSelect();
        channel.basicPublish("exchange_name","routing_key",null,"msg info".getBytes());
        channel.txCommit();


开启事务机制与不开启相比多了四个步骤:

  • 客户端发送Tx.Select,将信道设置为事务模式。
  • Broker回复Tx.Select-Ok,确认已将信道设置为事务模式。
  • 在发送完消息之后,客户端发送Tx.COmmit提交事务。
  • Broker回复Tx-Commit-Ok,确认事务提交。

回滚代码示例:

        try {
            channel.txSelect();
            channel.basicPublish("exchange_name", "routing_key", null, "msg info".getBytes());
            int result = 1 / 0;
            channel.txCommit();
        } catch (Exception e) {
            e.printStackTrace();
            channel.txRollback();
        }

事务确实能够解决消息发送方和RabbitMQ之间消息确认的问题,只有消息成功被RabbitMQ接收,事务才能提交成功,否则便可以在捕获异常之后进行事务回滚,与此同时可以进行消息重发。当时使用事务机制会“吸干”RabbitMQ的性能。

1.2发送方确认机制(publisher confirm)

生产者将信道设置成confirm(确认)模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后。RabbitMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这就使得生产者知晓消息已经正确到达了目的地了。如果消息和队列是可持久化的,那么确认消息会在消息写入磁盘之后发出。RabbitMQ回传给生产者的确认消息中的deliveryTag包含了确认消息的序号,此外RabbitMQ也可以设置channel.basicAck方法中的multiple参数,表示到这个序号之前的所有消息都已经得到了处理。

发送方确认机制最大的好处在于它是异步的,相比事务机制在一条消息发送之后会使发送端阻塞。

如何实现Confirm确认消息:

  1. 在channel上开启确认模式:channel.confirmSelect()。
  2. 在channel上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送,或记录日志等后续处理。

生产端代码示例:

import com.mine.rabbitmq.rabbitmqbegin.util.ConnectionUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        String exchange = "exchange_test_confirm";

        String routing_key = "confirm.save";


        //指定我们的消息投递模式: 消息的确认模式
        channel.confirmSelect();

        //发送一条消息
        String msg = "message info confirm";
        channel.basicPublish(exchange, routing_key, null, msg.getBytes());

        //添加一个确认监听
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("-----------------ack-------------------");
                System.out.println("deliveryTag:" + deliveryTag);
                System.out.println("multiple:" + multiple);

            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("-----------------no ack-------------------");
                System.out.println("deliveryTag:" + deliveryTag);
                System.out.println("multiple:" + multiple);
            }
        });

        //记得要关闭相关的连接
        //channel.close();
        //connection.close();
    }
}

handleNack(no ack)出现情况有:磁盘写满,队列到达上限等。

消费端代码示例:

import com.mine.rabbitmq.rabbitmqbegin.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        String exchange = "exchange_test_confirm";

        String queue = "queue_test_confirm";

        String routing_key = "confirm.#";


        channel.exchangeDeclare(exchange, "topic", false, false, false, null);

        channel.queueDeclare(queue, true, false, false, null);

        channel.queueBind(queue, exchange, routing_key);

        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);

                System.out.println("消费端:" + new String(body));
            }
        };
        
        channel.basicConsume(queue, true, consumer);
    }
}

2.消息何去何从

2.1mandatory参数

  • mandatory:当mandatory参数设置为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。当mandatory参数设置为false时,出现上述情形,则消息直接被丢弃。(mandatory参数告诉服务器至少将消息路由到一个队列中,否则将消息返回给生产者。)
    那么生产者如何获取到没有被正确路由到合适队列的消息呢?这时候可以通过调用channel.addReturnListener来添加ReturnListener监视器实现。
    示例代码:
import com.mine.rabbitmq.rabbitmqbegin.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        String exchange = "exchange_test_return";

        String routing_key = "return.save";

        String routing_key_error = "abc.save";

        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("---------handle  return----------");
                System.out.println("replyCode: " + replyCode);
                System.out.println("replyText: " + replyText);
                System.out.println("exchange: " + exchange);
                System.out.println("routingKey: " + routingKey);
                System.out.println("properties: " + properties);
                System.out.println("body: " + new String(body));
            }
        });

        //发送一条消息
        String msg = "message info return";
        String msg_error = "message info error return";

        //交换器无法根据自身的类型和路由键找到一个符合条件的队列 mandatory设置为true,消息才会返回给生产者,否则直接丢弃。
        channel.basicPublish(exchange, routing_key, true,null, msg.getBytes());
        channel.basicPublish(exchange, routing_key_error, true,null, msg_error.getBytes());


        //记得要关闭相关的连接
        //channel.close();
        //connection.close();
    }
}
import com.mine.rabbitmq.rabbitmqbegin.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        String exchange = "exchange_test_return";

        String queue = "queue_test_return";

        String routing_key = "return.#";


        channel.exchangeDeclare(exchange, "topic", false, false, false, null);

        channel.queueDeclare(queue, true, false, false, null);

        channel.queueBind(queue, exchange, routing_key);

        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);

                System.out.println("消费端:" + new String(body));
            }
        };
        
        channel.basicConsume(queue, true, consumer);
    }
}

Return机制流程图:

2.2immediate参数

  • immediate:immediate参数设置为true时,如果交换器在将消息路由到队列时发现队列上并不存在任何消费者,那么这条消息将不会存入队列中。当与路由键匹配的所有队列都没有消费者时,该消息会通过Basic.Return返回至生产者。(immediate参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递;如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。)
    RabbitMQ3.0版本开始去掉了对immediate参数的支持,对此RabbitMQ官方解释是:immediate参数会影响镜像队列的性能,增加了代码复杂性,建议采用TTL和DLX的方法替代。

2.3备份交换器

备份交换器,英文名称为 Alternate Exchange,简称AE,或者更直白地称之为“备胎交换器”。生产者在发送消息的时候如果不设置 mandatory参数,那么消息在未被路由的情况下将会丢失;如果设置了 mandatory参数,那么需要添加 Returnlistener的编程逻辑,生产者的代码将变得复杂。如果既不想复杂化生产者的编程逻辑,又不想消息丢失,那么可以使用备份交换器,这样可以将未被路由的消息存储在 RabbitMQ中,再在需要的时候去处理这些消息。可以通过在声明交换器(调用 channe1. exchange Declare方法)的时候添加alternate- exchange参数来实现,也可以通过策略( Policy)的方式实现。如果两者同时使用,则前者的优先级更高,会覆盖掉 Policy的设置

  Map<string,Object> args=new HashMap<string,Object>();
  args.put("alternate-exchange","myAe");
  channel.exchangeDeclare("normalExchange",direct", true, false, args);
  channel.exchangeDeclare("myAe","fanout", true, false, null);
  channel.queueDeclare("normalQueue", true, false, false, null);
  channel.queueBind("normal Queue","normalExchange","normalKey");
  channel.queueDeclare("unroutedQueue", true, false, false, null);
  channel.queueBind("unroutedQueue","myAe","");

对于备份交换器,总结了以下几种特殊情况:

  • 如果设置的备份交换器不存在,客户端和 RabbitMQ服务端都不会有异常出现,此时消息会丢失。
  • 如果备份交换器没有绑定仼何队列,客户端和 RabbitMQ服务端都不会有异常出现,此时消息会丢失。
  • 如果备份交换器没有任何匹配的队列,客户端和 RabbitMQ服务端都不会有异常出现,此时消息会丢失。
  • 如果备份交换器和 mandatory参数一起使用,那么 mandatory参数无效。

3.消费端要点

3.1消费端限流

什么是消端的限流?
假设一个场景,首先,我们 RabbitMQ服务器有上万条未处理的消息,我们随便打开一个消费者客户端,会出现下面情况:巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据!
RabbitMQ提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于 consume或者 channel设置Qos的值)未被确认前,不进行消费新的消息。

    /**
     * Request a specific prefetchCount "quality of service" settings
     * for this channel.
     *
     * @see #basicQos(int, int, boolean)
     * @param prefetchCount maximum number of messages that the server
     * will deliver, 0 if unlimited
     * @throws java.io.IOException if an error is encountered
     */
    void basicQos(int prefetchCount) throws IOException;
  • prefetchCount:prefetch Count:会告诉 RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该 consumer将 block掉,直到有消息ack。
    消费端示例代码:
		String exchangeName = "test_qos_exchange";
		String queueName = "test_qos_queue";
		String routingKey = "qos.#";
		
		channel.exchangeDeclare(exchangeName, "topic", true, false, null);
		channel.queueDeclare(queueName, true, false, false, null);
		channel.queueBind(queueName, exchangeName, routingKey);
		
		//1 限流方式  第一件事就是 autoAck设置为 false
		
		channel.basicQos(0, 1, false);
		Boolean autoAck=false;
		channel.basicConsume(queueName, autoAck, new MyConsumer(channel));

3.2消息顺序性

消息的顺序性是指消费者消费到的消息和发送者发布的消息的顺序是一致的。举个例子,不考虑消息重复的情况,如果生产者发布的消息分别为msg1、msg2、msg3,那么消费者必然也是按照msg1、msg2、msg3的顺序进行消费的。
目前很多资料显示 RabbitMQ的消息能够保障顺序性,这是不正确的,或者说这个观点有很大的局限性。在不使用任何 RabbitMQ的高级特性,也没有消息丢失、网络故障之类异常的情况发生,并且只有一个消费者的情况下,最好也只有一个生产者的情况下可以保证消息的顺序性。如果有多个生产者同时发送消息,无法确定消息到达 Broker的前后顺序,也就无法验证消息的顺序性。

3.3弃用QueueingConsumer

QueueingConsumer本身有几个大缺陷,需要在使用时特别注意。首当其冲的就是内存溢出的问题,如果由于某些原因,队列之中堆积了比较多的消息,就可能导致消费者客户端内存溢出假死,于是发生恶性循环,队列消息不断堆积而得不到消化。
QueueingConsumer还包含(但不仅限于)以下一些缺陷:

  • QueueingConsumer会拖累同一个 Connection下的所有信道,使其性能降低。
  • 同步递归调用 QueueingConsumer会产生死锁。
  • RabbitMQ的自动连接恢复机制(automatic connection recovery)不支持 QueueingConsumer的这种形式。
  • QueueingConsumer不是事件驱动的。

4.过期时间

RabbitMQ可以对消息和队列设置过期时间TTL

4.1设置消息的TTL

设置消息TTL的两种方法:

  1. 通过队列属性设置,队列中所有消息都有相同的过期时间。
  2. 对消息本身进行单独设置,每条消息的TTL可以不同。

如果两种方法一起使用,则消息的TTL以两者之间较小的那个数值为准。
消息在队列中的生存时间一旦超时设置的TTL值时,就会变成“死信”(Dead Message),消息者将无法再收到该消息(不绝对,可通过配置“死信”交换器路由到相应的队列)

通过队列属性设置消息TTL的方法是在channel.queueDeclare方法中加入x-message-ttl参数实现的,单位是毫秒

        Map<String, Object> arguments = new HashMap<>();
        //设置队列消息过期时间为6s
        arguments.put("x-message-ttl", 6000);
        channel.queueDeclare("queue_name", true, false, false, arguments);

针对每条消息设置TTL的方法是再channel.basicPublish方法中加入expiration的属性参数,单位是毫秒

        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .deliveryMode(2)
                .contentEncoding("utf-8")
                .expiration("10000")
                .headers(headers)
                .build();

        String msg = "message info";

        channel.basicPublish("exchange_name", "queue_name", properties, msg.getBytes());

对于第一种设置队列TTL属性的方法,一旦消息过期,就会从队列中抹去,而在第二种方法中,即使消息过期,也不会马上从队列中抹去,因为每条消息是否过期是在即将投递到消费者之前判断的。

为什么这两种方法处理的方式不一样?
第一种方法里,队列中已过期的消息肯定在队列头部,RabbitMQ只要定期从队头开始扫描是否有过期的消息即可。
第二种方法里,每条消息的过期时间不同,如果要删除所有过期消息势必要扫描整个队列,所以不如等到此消息即将被消费时再判定是否过期,如果过期再进行删除即可。

4.2设置队列的TTL

通过channel.queueDeclare方法中的x-expires参数可以控制队列被自动删除前处于未使用状态的时间。未使用的意思是队列上没有任何的消费者,队列也没有被重新声明,并且在过期时间段内也未调用过Basic.Get命令。
RabbitMQ会确保在过期时间到达后将队列删除,但是不保障删除的动作有多及时。在RabbitMQ重启后,持久化的队列的过期时间会被重新计算。

        Map<String, Object> arguments = new HashMap<>();
        //创建一个过期时间为1分钟的队列(表示如果1分钟之内未使用则会被删除)
        arguments.put("x-expires", 60000);
        channel.queueDeclare("queue_name", true, false, false, arguments);

5.死信队列

DLX,全称为Dead-Letter-Exchange,可以称为死信交换器,当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称为死信队列。
消息变成死信一般是由于这几种情况:

  • 消息被拒绝(Basic.Reject/Basic.Nack),并且设置requeue参数为false。
  • 消息过期。
  • 队列达到最大长度。
    DLX也是一个正常的交换器,和一般交换器没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中存在死信时,RabbitMQ就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。可以监听这个队列中的消息以进行相应的处理,这个特性与将消息的TTL设置为0配合使用可以弥补immediate参数的功能。
    通过channel.queueDeclare方法中设置x-dead-letter-exchange参数来为这个队列添加DLX:
        //创建DLX:dlx_exchange
        channel.exchangeDeclare("dlx_exchange", "direct");

        Map<String, Object> arguments = new HashMap<>();
        //为队列添加DLX
        arguments.put("x-dead-letter-exchange", "dlx_exchange");
        //也可以为这个DLX指定路由键,如果没有特殊指定,则使用原队列的路由键
        arguments.put("x-dead-letter-routing-key", "dlx-routing-key");

        channel.queueDeclare("queue_name", false, false, false, arguments);

完整代码示例:


6.延迟队列

延迟队列存储的对象是对应的延迟消息,所谓“延迟消息”是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。
RabbitMQ本身没有直接支持延迟队列的功能,但是可以通过DLX和TTL模拟出延迟队列的功能。

7.优先级队列

具有高优先级的队列具有高的优先权,优先级高的消息具备优先被消费的特权。
可以通过设置队列的x-max-priority参数来实现:

        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-max-priority", 10);

        channel.queueDeclare("queue_name", false, false, false, arguments);

上面代码配置一个队列的最大优先级。在此之后,需要在发送消息中设置消息当前的优先级。

        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .priority(5)
                .build();

        String msg = "message info";
        channel.basicPublish("exchange_name", "queue_name", properties, msg.getBytes());

上面的代码中设置消息的优先级为5.默认最低为0,最高为队列设置的最大优先级。优先级高的消息可以被优先消费,这个这个也是有前提的:如果在消费者的消费速度大于生产者的速度且Broker中没有消息堆积的情况下,对发送的消息设置优先级也就没有上面实际意义。

8.持久化

持久化可以提高RabbitMQ的可靠性,以防止在异常情况(重启,关闭,宕机等)下的数据丢失。
RabbitMQ的持久化分为三个部分:交换器的持久化,队列的持久化,消息的持久化。

  • 交换器的持久化:是通过在声明队列时,将durable参数设置为true实现的。如果交换器不设置持久化,那么在RabbitMQ服务重启之后,相关的交换器元数据会丢失,不过消息不会丢失,只是不能将消息发送到这个交换器中了。对一个长期使用的交换器来说,建议将其置为持久化的。
  • 队列的持久化:是通过在声明队列时,将durable参数置为true实现的,如果队列不设置持久化,那么在RabbitMQ服务器重启之后,相关队列的元数据会丢失,此时消息数据也会丢失。
  • 消息的持久化:队列的持久化保证其本身的元数据不会因异常情况而丢失,但是并不能保证内部所存储的消息不会丢失。要确保消息不会丢失,需要将其设置为持久化。通过将消息的投递模式(BasicProperties中deliveryMode属性)设置为2即可实现消息的持久化。

设置了队列和消息的持久化,当 RabbitS服务重启之后,消息依旧存在。单单只设置队列持久化,重启之后消息会丢失;单单只设置消息的持久化,重启之后队列消失,继而消息也丢失。单单设置消息持久化而不设置队列的持久化显得毫无意义。

9.RPC实现

后续补充

原文地址:https://www.cnblogs.com/everyingo/p/12906320.html