RabbitMQ常见面试题总结

1. 对于MQ的理解

MQ全称为Message Queue,即消息队列。它是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。
可以看出消息的生产和消费都是异步的,生产者和消费者只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。

RabbitMQ是erlang语言开发的并且开源,支持多种语言。对于消息的丢失,消息重复问题等问题都有比较成熟的解决方案。SpringBoot对于RabbitMQ提供了很好的支持,整合十分方便。
它的异步处理、服务解耦、流量控制(削峰)都是我们目前互联网系统所亟需的。



2. 怎么保证MQ的高可用

RabbitMQ 有三种模式:单机模式,普通集群模式,镜像集群模式。

  • 单机模式:就是demo级别的,一般就是你本地启动了玩玩儿的,没人生产用单机模式
  • 普通集群模式:意思就是在多台机器上启动多个RabbitMQ实例,每个机器启动一个。
  • 镜像集群模式:这种模式,才是所谓的RabbitMQ的高可用模式,跟普通集群模式不一样的是,你创建的queue,无论元数据(元数据指RabbitMQ的配置数据)还是queue里的消息都会存在于多个实例上,然后每次你写消息到queue的时候,都会自动把消息到多个实例的queue里进行消息同步。


3. 如何设置消息过期时间

在RabbitMQ中分别可以对 消息和队列 设置过期时间

3.1 对消息设置过期时间

方法一:通过队列属性设置消息过期时间,队列中所有消息都会有相同的过期时间。一旦消息过期,就会从队列中抹去。
方法二:通过消息本身进行单独设置,每条消息的过期时间可以不同。若消息过期,也不会马上从队列中抹去,因为每条消息是否过期是在即将投递到消费者之前判定的【消息过期后,只有消息在队列顶端,将要被消费的时候,才回判断是否过期,过期了就移除掉】。

// 如果上面2种方法同时使用,那么消息过期时间以最先到期的时间为准。

==注意:=消息到期后,就会变成“死信”( Dead Message ),消费者将无法再收到该消息。
注:如果队列设置了死信队列,那么这条“死信”消息就会被转发到死信队列上,该消息就可以被正常消费。

方法一:通过队列属性设置

    /**
     * 1、声明交换机
     */
    @Test
    public void decalreExchange() throws Exception {
 
        String exchange = "hello_ttl";
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 获取通道
        Channel channel = connection.createChannel();
 
        // 声明exchange,指定类型为direct
        channel.exchangeDeclare(exchange, BuiltinExchangeType.DIRECT,true,false,false,new HashMap<>());
    }





    /**
     * 2、声明队列并绑定到交换机
     */
    @Test
    public void decalreQueueAndBind() throws Exception {
 
        String exchange = "hello_ttl";
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 获取通道
        Channel channel = connection.createChannel();
 
        //将队列hello_ttl_c1 绑定到交换机hello_ttl上
        String queueName1 = "hello_ttl_c1";
        Map<String, Object> argss = new HashMap<String , Object>();
        argss.put("x-message-ttl" , 30*1000);//***************设置队列里消息的ttl的时间30s******************
        // 声明队列
        channel.queueDeclare(queueName1, true, false, false, argss);
        // 绑定队列到交换机
        channel.queueBind(queueName1, exchange, "aaa");
    }




    /**
     * 测试队列设置的ttl
     * @throws Exception
     */
    @Test
    public void sendMessage1() throws Exception {
        String exchange = "hello_ttl";
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 获取通道
        Channel channel = connection.createChannel();
        // 消息内容
        String message = "Less is more";
        channel.basicPublish(exchange, "aaa", null, message.getBytes());
        log.debug("Producer send message:{}",message);
        channel.close();
        connection.close();
    }






  
  // ***************************SpringBoot方式***************************
    @Bean
    public Queue payQueue(){
        Map<String,Object> params = new HashMap<>();
        //设置队列的过期时间
        params.put("x-message-ttl",10000);
        //也可以使用下面这种写法
        //QueueBuilder.durable("ttl-quequ").ttl(10000).build();
        return QueueBuilder.durable("ttl-quequ").withArguments(params).build();    // 构建者那种方式
    }



    // 不采用构建者那种方式
    @Bean
    public Queue getQueue(){
        Map<String,Object> params = new HashMap<>();
        //设置队列的过期时间
        params.put("x-message-ttl",10000);
        // 参数1:队列名称    参数2:是否持久化    参数3:是否独占    参数4:是否自动删除    参数5:指定当前队列的其他信息
        return new Queue("boot-queue",true,false,false,params);
    }

   
    // 此时,若该队列收到消息后,10秒内没有被消费者消费,则该消息就会从队列中被删除消失。

方法二:通过消息本身进行单独设置

    /**
     * 1、声明交换机
     */
    @Test
    public void decalreExchange() throws Exception {
 
        String exchange = "hello_ttl";
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 获取通道
        Channel channel = connection.createChannel();
 
        // 声明exchange,指定类型为direct
        channel.exchangeDeclare(exchange, BuiltinExchangeType.DIRECT,true,false,false,new HashMap<>());
    }



    /**
     * 2、声明队列并绑定到交换机
     */
    @Test
    public void decalreQueueAndBind() throws Exception {
 
        String exchange = "hello_ttl";
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 获取通道
        Channel channel = connection.createChannel();
 
 
        //队列hello_ttl_c2  这个是为了测试通过发送时设置ttl
        String queueName2 = "hello_ttl_c2";
        // 声明队列
        channel.queueDeclare(queueName2, true, false, false, null);
        // 绑定队列到交换机
        channel.queueBind(queueName2, exchange, "bbb");
 
    }




    /**
     * 测试消息发送时设置ttl
     * @throws Exception
     */
    @Test
    public void sendMessage2() throws Exception {
        String exchange = "hello_ttl";
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 获取通道
        Channel channel = connection.createChannel();
        // 消息内容
        String message = "Less is more";
        AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
        builder.deliveryMode(2); //DeliveryMode等于2就说明这个消息是persistent的。1是默认,不是持久的。
        builder.expiration("30000");// *******************设置TTL=30000ms**********************
        AMQP.BasicProperties properties = builder. build() ;

        channel.basicPublish(exchange, "bbb", properties, message.getBytes());
        log.debug("Producer send message:{}",message);
        channel.close();
        connection.close();
    }





    //***************SpringBoot方式*********************
    // 在发送消息的时候设置过期时间
    MessagePostProcessor messagePostProcessor = new MessagePostProcessor(){
        @Overeide
        public Message postProcessMessage(Message message) throws AmqpException{
            message.getMessageProperties().setExpiration("30000");
            return message;
        }
    };

    rabbitTemplate.convertAndSend("exchangeName","routingKey","消息内容",messagePostProcessor); // 指定过期时间,需要MessagePostProcessor对象

3.2 对队列设置过期时间

public static void main(String[] args) throws IOException {
    // 获取连接对象
    Connection conn = RabbitMQUtil.createConn();
    // 获取连接通道
    Channel channel = conn.createChannel();

    Map<String, Object> map = new HashMap<>();
    map.put ("x-expires" , 30000);
    //给队列设置ttl
    //创建一个过期时间为 30 秒的队列
    channel.queueDeclare("ttl-quequ3",true,false,true,map);
}


// SpringBoot 方式
@Bean
public Queue payQueue(){
    Map<String,Object> params = new HashMap<>();
    //设置队列的过期时间
    params.put("x-expires",10000);
    //也可以使用下面这种写法
    //return QueueBuilder.durable(queueName).expires(5000).build();
    return QueueBuilder.durable(queueName).withArguments(params).build();
}

4. RabbitMQ如何确保消息发送与消息接收

思考如下几个问题:

  1. 如果消息已经到达了RabbitMQ,还没发送给消费者时,RabbitMQ宕机了,消息会丢失吗?————————————————————不会,因为RabbitMQ的队列有持久化机制,若消息到了RabbitMQ已经到了队列那里了,就能持久化。当RabbitMQ重连的时候消息就能发送给消费者了。

  2. 消费者在消费消息的时候,还没消费完,此时消费者宕机了,消息会丢失吗?————————————————————不会,因为RabbitMQ提供了手动ACK。当成功消费完消息的时候再手动ACK告诉生产者我消费完了。

  3. 生产者在发布消息到RabbitMQ的交换机时,由于网络问题,导致没有真发送成功到交换机,消息会丢失吗?————————————————————会,因为生产者已经执行了发布消息的方法,就会认为已经发布过去了。可利用Confirm(确认)机制实现或利用其提供的事务操作机制【影响效率,这里不介绍它】。 PS:Confirm机制是保证了消息发送到Exchange上。而消费者监听的不是Exchange,而是队列。

  4. 生产者成功发布消息到交换机了,但是交换机分发消息到队列的时候出现了问题,导致没有真分发成功,消息会丢失吗?————————————————————会,因为消费者是与队列交互的,如果消息没有分发到队列,队列就没有消息。可利用Return机制实现。 PS:Return机制是保证Exchange的消息分发到队列。


4.1 如何确保消息发送到交换机

RabbitMQ提供了confirm机制与return机制。


4.2 如何确保消息从MQ发送到消费者

RabbitMQ提供了持久化机制与手动ACK机制。


注意:RabbitMQ不会对没有ack的消息设置超时时间,他判断消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开。这么设计的原因是RabbitMQ允许消费者消费一条消息的时间可以很长,以保证消息的最终一致性。若消费者返回ack之前断开了连接,RabbitMQ会重新分给下一个订阅的消费者。(可能出现消息重复消费的隐患)


总结:
可以将几种情况分为:消息到MQ的过程中搞丢,MQ自己搞丢,MQ到消费过程中搞丢。

生产者——>RabbitMQ:事务机制和Confirm机制,注意:事务机制和 Confirm 机制是互斥的,两者不能共存,会导致 RabbitMQ 报错。

RabbitMQ自身:持久化、集群、普通模式、镜像模式。

RabbitMQ——>消费者:basicAck机制、死信队列、消息补偿机制。



5. 交换器无法根据自身类型和路由键找到符合条件队列时,有哪些处理?

  • mandatory:true,返回消息给生产者
  • mandatory:false,直接丢弃


6. 什么是死信队列

出现死信消息的情况:

  • 消息过期,也就是笔者在上篇提到的 TTL。消息在队列的存活时间超过所设置的 TTL 时间。
  • 消息被拒绝。调用了 channel.basicNack 或 channel.basicReject方法,井且设置 requeue 参数为false。 requeue: 参数设置为 true,则 RabbitMQ 会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者;如果 requeue 参数设置为 false,则 RabbitMQ 会立即把消息从队列中移除。
  • 队列的接收消息数长度达到最大长度。

死信交换机:
       它其实就是是一个正常的交换机,和一般交换机没什么区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
       当某个队列中存在死信时,且该队列配置了死信交换机, RabbitMQ 就会自动地将这个消息新发布到设置的 死信交换机 上去,进而被路由到一个队列,这个队列就被称为死信队列。我们可以监听这个死信交换机的死信队列中的消息、以进行相应的处理。

原文地址:https://www.cnblogs.com/itlihao/p/14961534.html