RabbitMQ进阶笔记

一、消息发布确认与回退

在RabbitMQ中生产者发布消息,需要先经过exchange,由交换机分法到不同的Queue中在在这过程中,我们不能确定消息是否真正的到达了exchange,又是否真正的从exchange路由的到达了Queue。在这个过程中可能会出现生产者发布的消息丢失的情况。默认情况下消息发布端执行BasicPublish方法后,消息是否到达指定的队列的结果发布端是未知的。BasicPublish方法的返回值是void。要想知道消息准确的到达exchange和queue我们需要利用RabbitMQ的发布确认机制和回退机制。producer--->rabbitmq broker--->exchange--->queue--->consumer

  1. 搭建SpringBoot环境

    pom依赖

    <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-devtools</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
            </dependency>
        </dependencies>
    

    yml配置

    spring:
      rabbitmq:
        host: 127.0.0.1 #主机
        port: 5672 # 端口
        username: guest #用户
        password: guest #密码
        virtual-host: /rabbitmqdemo #虚拟主机
        connection-timeout: 15000 #连接超时时间
        publisher-returns: true  # 开启回退模式
        publisher-confirm-type: correlated # 开启发布确认,
        # 如果是低版本RabbitMQ发配确认的配置为
        #publisher-confirms: true
    

    编写配置类RabbitMQConfig

    @Configuration
    public class RabbitMQConfig {
    
    
        @Bean
        public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
            RabbitTemplate rabbitTemplate = new RabbitTemplate();
            rabbitTemplate.setConnectionFactory(connectionFactory);
    		//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用返回调函数
            rabbitTemplate.setMandatory(true);
            /**
             * 消息的可靠性投递 投递消息给交换机执行的回调函数
             * correlationData:相关配置信息
             * ack: exchange是否收到消息的确认信号
             * cause:原因
             */
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    System.out.println("ConfirmCallback:     "+"相关数据correlationData:"+correlationData);
                    System.out.println("ConfirmCallback:     "+"确认情况ack:"+ack);
                    System.out.println("ConfirmCallback:     "+"原因cause:"+cause);
                }
            });
    
            /**
             * Exchange路由到Queue失败会执行ReturnCallback
             * 1.开启回退模式
             * 2.给rabbitTemplate注入ReturnCallback
             * 3.设置Exchange处理消息的模式
             *      1.如果消息没有路由到Queue,则丢弃消息(默认);
             *      2.如果消息没有路由到Queue,则消息返回给发送方ReturnCallback
             */
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                @Override
                public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                    System.out.println("ReturnCallback:     "+"消息message:"+message);
                    System.out.println("ReturnCallback:     "+"回应码replyCode:"+replyCode);
                    System.out.println("ReturnCallback:     "+"回应信息replyText:"+replyText);
                    System.out.println("ReturnCallback:     "+"交换机exchange:"+exchange);
                    System.out.println("ReturnCallback:     "+"路由键routingKey:"+routingKey);
                }
            });
    
            return rabbitTemplate;
        }
    
        /**
         * 交换机
         * @return
         */
        @Bean("bootExchange")
        public Exchange bootExchange(){
            return ExchangeBuilder.topicExchange("boot_topic").durable(true).autoDelete().build();
        }
    
        /**
         * 队列
         * @return
         */
        @Bean("bootQueue")
        public Queue bootQueue(){
    
            return QueueBuilder.durable("boot_queue").build();
        }
    
        /**
         * 绑定交换机和队列
         * @param queue
         * @param exchange
         * @return
         */
        @Bean
        public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){
            return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
        }
    
    }
    
    

    上边我们创建了topic类型的交换机boot_topic并且绑定了boot_queue队列,并设置routeKey为boot.#,同时给RabbitTemplate注入两个回调函数,一个叫 ConfirmCallback ,一个叫 RetrunCallback。

    第一种情况:消息没能到exchange(故意把交换机写错)

    @RequestMapping("/confirm")
        public String testConfirm(){
           
            rabbitTemplate.convertAndSend("boot_topic23","222.boot.haha","消息发布确认机制!");
            return "";
        }
    

    结果如下: 执行了ConfirmCallback 回调函数ack为false,并返回了失败原因

    image-20201027144935781

第二种情况,消息到了交换机但是没有到队列

@RequestMapping("/confirm")
    public String testConfirm(){
        rabbitTemplate.convertAndSend("boot_topic","444.boot.haha","消息发布确认机制!");
        return "";
    }

结果如下:由于消息没有到达队列执行了ReturnCallback函数,但是到达了交换机,所有ConfirmCallback回调函数的ack为true

image-20201027150602188

第三种情况,消息成功到达交换机和队列

 @RequestMapping("/confirm")
    public String testConfirm(){
        rabbitTemplate.convertAndSend("boot_topic","boot.haha","消息发布确认机制!");
        return "";
    }

结果如下:ConfirmCallback回调函数的ack为true,没有执行ReturnCallback函数

image-20201027150843205

小结:

  • 消息到达exchange,ConfirmCallback回调函数的ack返回true,没有到达返回false
  • 消息到达exchange,没有到达Queue,会执行ReturnConfirm回调函数

注意:为了保证消息的持久化,我们也应该做到,exchange持久化,queue持久化,以及message的持久化,以免服务重启后消息丢失。

在RabbitMQ中也提供了事务机制,但是性能比较差,使用channel下列方法,完成事务控制:
txSelect(), 开启事务
txCommit(),用于提交事务
txRollback(),用于回滚事务

二、消费者自动确认机制与限流

RabbitMQ在生产方有发布确认,同样在消费者放也存在消费者接受确认确认,消费者接受确认有三种方式

  • 自动确认(默认):acknowledge="none"
  • 手动确认:acknowledge="manual"
  • 自动,根据异常情况确认:acknowledge="auto"

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法或者channel.basicReject()方法,让其自动重新发送消息。

配置消费者手动确认并限流

spring:
    listener:
      simple:
      	prefetch: 1 # 消费端限流,每次只从队列种取一个消息
        acknowledge-mode: manual # 配置接受手动确认

消费者代码处理

@Component
public class RabbitMQListener  {

    /**
     *       acknowledge-mode: manual  # 签收机制为手动
     *      * 如果消息成功处理则调用Channel的basicAck()签收
     *      * 处理失败则调用Channel的basicNack()拒绝签收
     * @param message
     * @param channel
     * @throws InterruptedException
     * @throws IOException
     *	这里通过@RabbitListener(queues = "work")直接指定监听的队列,是因为我已经创建好了
     */
    @RabbitListener(queues = "work")
    public void rabbitmqListener(Message message, Channel channel) throws InterruptedException, IOException {
        TimeUnit.SECONDS.sleep(5);
        //获取消息tag
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println(new String(message.getBody()));
            System.out.println("签收成功!");
            channel.basicAck(deliveryTag,true);
        } catch (IOException e) {
            /**
            *第一个参数:消息标签
            *第二个参数:是否可以确定多个消息
            *第三个参数:是否重回消息队列
            */
            channel.basicNack(deliveryTag,true,true);
            //拒绝一条消息
            //channel.basicReject();
        }
    }
}

如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,true)方法根据deliveryTag确认签收消息。

如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息。

三、TTL(过期时间)

Time To Live,也就是生存时间,是一条消息在队列中的最大存活时间,当消息到达存活时间后,还没有被消费,会被自动清除。在RabbitMQ可以对消息和队列设置TTL(单位默认是毫秒)。

  • RabbitMQ支持设置消息的过期时间,在消息发送的时候可以进行指定,每条消息的过期时间可以不同。

  • RabbitMQ支持设置队列的过期时间,从消息入队列开始计算,队列过期后,会将队列所有消息全部移除。

  • 如果两种方式一起使用,则过期时间以两者中较小的那个数值为准。

注意:

RabbitMQ只会对队列头部的消息进行过期淘汰。如果单独给消息设置TTL,先入队列的消息过期时间如果设置比较长,后入队列的设置时间比较短。会造成消息不会及时地过期淘汰,导致消息的堆积。。

设置队列的过期时间:

	
	@Configuration
public class RabbitMQConfig {

    /**
     * 交换机
     * @return
     */
    @Bean("ttlExchange")
    public Exchange bootExchange(){
        return ExchangeBuilder.directExchange("ttl_direct").durable(true).autoDelete().build();
    }

    /**
     * 创建过期的队列
     * @return
     */
    @Bean("ttl_queue")
    public Queue bootQueue(){
        return QueueBuilder.durable("ttl_queue").ttl(6000).build();
    }

    /**
     * 绑定交换机和队列
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding bindQueueExchange(@Qualifier("ttl_queue") Queue queue, @Qualifier("ttlExchange") Exchange exchange){
        System.out.println("队列绑定交换机");
        return BindingBuilder.bind(queue).to(exchange).with("ttl_queue").noargs();
    }

}				                                 

不知道创建队列可以设置哪些参数的可以去UI控制台查看

image-20201027210234315

设置消息的过期时间:

@GetMapping("/ttl")
    public String ttl(){
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //默认单位是毫秒
                message.getMessageProperties().setExpiration("30000");
                return message;
            }
        };
        rabbitTemplate.convertAndSend("ttl_direct","ttl_queue","ttl过期时间30s",messagePostProcessor);
        return "";
    }

	//也可以这样
    @GetMapping("/ttl2")
    public String ttl2(){
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //默认单位是毫秒
                message.getMessageProperties().setExpiration("10000");
                return message;
            }
        };
        rabbitTemplate.convertAndSend("ttl_direct","ttl_queue","ttl过期时间10秒",messagePostProcessor);
        return "";
    }

四、死信队列

死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机)。当消息在一个队列中变成死信之后,如果该队列绑定了死信交换机,则他能被重新发送到另一个交换器中,这个交换器成为死信交换器,与该交换器绑定的队列称为死信队列,当然死信交换机路由到死信队列一样需要RoutingKey。本质上来说,死信交换机和死信队列和普通的没有区别。

消息在以下三种情况会成为死信

  • 队列消息长度到达限制,新加入的消息会成为死信。
  • 消费者拒接消费消息,并且不重回队列会成为死信。
  • 原队列存在消息过期设置,消息到达超时时间未被消费会成为死信。

队列绑定死信交换机需要设置两个参数

  • x-dead-letter-exchange :来标识一个交换机
  • x-dead-letter-routing-key:RoutingKey,当消息变为死信被转发到死信交换机后,死信交换机会根据这个RoutingKey路由到匹配的死信队列上。

image-20201027212230338

代码如下:

//修改RabbitMQConfig的配置文件如下
@Configuration
public class RabbitMQConfig {

    /**
     * 交换机
     * @return
     */
    @Bean("ttlExchange")
    public Exchange bootExchange(){
        return ExchangeBuilder.directExchange("ttl_direct").durable(true).autoDelete().build();
    }

    /**
     * 创建过期队列队列
     * @return
     */
    @Bean("ttl_queue")
    public Queue bootQueue(){
        //设置队列的过期时间为6秒,并且绑定死信交换机dead_exchange,并且设置routingKey为deadKey
        return QueueBuilder.durable("ttl_queue").ttl(6000).deadLetterExchange("dead_exchange").deadLetterRoutingKey("deadKey").build();
    }

    /**
     * 绑定交换机和队列
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding bindQueueExchange(@Qualifier("ttl_queue") Queue queue, @Qualifier("ttlExchange") Exchange exchange){
        System.out.println("队列绑定交换机");
        return BindingBuilder.bind(queue).to(exchange).with("ttl_queue").noargs();
    }

    /**
    *创建死信交换机
    */
    @Bean("deadExchange")
    public Exchange deadExchange(){
        return ExchangeBuilder.directExchange("dead_exchange").durable(true).autoDelete().build();
    }
	
    /**
    *创建死信队列
    */
    @Bean("deadQueue")
    public Queue deadQueue(){
        return QueueBuilder.durable("dead_queue").build();
    }

    /**
    *绑定死信队列和死信交换机
    */
    @Bean
    public Binding bindDeadQueueWithExchange(@Qualifier("deadQueue") Queue queue, @Qualifier("deadExchange") Exchange exchange){
       return BindingBuilder.bind(queue).to(exchange).with("deadKey").noargs();
    }
}

测试代码:

	@GetMapping("/ttl")
    public String ttl(){
        rabbitTemplate.convertAndSend("ttl_direct","ttl_queue","ttl过期时间");
        return "";
    }

结果:当消息过期成为死信就会经过死信交换机路由到死信队列,消息成为死信有三种情况,这里演示了消息因过期而成为死信的情况

image-20201027221319723

五、延迟机制

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。可惜的是AMQP 协议中,或者 RabbitMQ 本身没有直接支持延迟队列的功能,但是可以使用:TTL+死信队列 组合实现延迟队列的效果。

img

简单说一下,就是生产者将消息发送到一个具有过期时间的队列当中,这个过期时间就是我们想要延迟的时间,但是消费者并不直接从这个队列种取出消息,我们需要将这个队列绑定一个死信交换机,当消息过期时,被转发到死信交换级经过routingKey路由到匹配的死信队列,消费者就从这个死信队列中取出消息进行消费,以此到达延迟队列的效果。

六、消息追踪之rabbitmq_tracing

rabbitmq_tracing是Rabbitmq的一款图形化的消息追踪插件,它能跟踪RabbitMQ中消息的流入流出情况

# 启动rabbitmq_tracing插件
 rabbitmq-plugins enable rabbitmq_tracing

开启插件后可以在RabbitMQ的UI界面添加tracing追踪

image-20201028110300583

对new trace中部分字段的解释:

  • Format:表示输出的消息日志格式,有Text和JSON两种, JSON格式的payload(消息体)默认会采用Base64进行编码更安全。
  • Max payload bytes:表示每条消息的最大限制,单位为B,如果消息超过了每条消息的最大限制就会被截断。
  • Pattern:用来设置匹配的模式,#(匹配所有消息的流入流出),publish.# (”匹配所有消息流入),deliver.#(”匹配所有消息流出) ,#.amq.directueue(指定交换机), #.myqueue(指定一个Queue)

新建trace后Queue中会多出一个队列

image-20201028112511235

tracing栏目下的all traces也会多出一条记录

image-20201028112726652

可以点击my--tracing.log查看日志,但是需要输入账户和密码。新建trace后默认的log文件会保存在/var/tmp/rabbitmq-tracing目录下。

# 关闭rabbitmq_tracing插件
 rabbitmq-plugins disable rabbitmq_tracig

有关docker搭建RabbitMQ集群可以参考这篇文章:https://www.cnblogs.com/vipstone/p/9362388.html

原文地址:https://www.cnblogs.com/myblogstart/p/13890218.html