RabbitMQ 第四课 RabbitMQ的高级特性

 

一、消息如何保障100%的投递成功?

1. 什么是生产端的可靠性投递?

(1)保障消息的成功发出

(2)保障MQ节点的成功接收

(3)发送端收到MQ节点(Broker)确认应答

(4)完善的消息进行补偿机制

2. 生产端 - 可靠性投递(BAT/TMD 互联网大厂的解决方案)

(1)消息落库,对消息状态进行打标(思考:在高并发的场景下是否适合?)

 (2)消息的延迟投递,做二次确认,回调检查(节省了数据落库的这一步)

  

二、幂等性概念

一句话概括:可能你要对一件事情进行操作100次,1000次,结果都是相同的。比如 对一个SQL执行100次1000次,我们可以借鉴数据库的乐观锁机制:比如我们执行一条更新库存的SQL语句:update T_reps set count = count -1, version = version +1 where version = 1

1. 消费端 - 幂等性保障

在海量订单产生的业务高峰期,如何避免消息的重复消费问题
1. 消费端实现幂等性,就意味着,我们的消息永远不会消费多次,即使我们收到了多条一样的消息。

业界主流的幂等性操作解决方案:
(1)唯一Id + 指纹码 机制,利用数据库主键去重
  1)select count(1) from T_order where id = 唯一ID + 指纹码
  2)好处:实现简单
  3)坏处:高并发下有数据库写入的性能瓶颈
  4)解决方案:跟进ID进行分库分表进行算法路由
(2)利用Redis的原子性取实现
  1)第一:我们是否要进行数据落库,如果落库的话,关键解决的问题是数据库和缓存如何做到原子性?
  2)第二:如果不进行落库,那么都存储到缓存中,如何设置定时同步的策略。

三、 Confirm确认

(1)消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们产生一个应答。

(2)生产者进行接收应答,用来确定这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递的核心保障。

如何实现Confirm确认消息?

第一步:在channel上开启确认模式:channel.confirmSelect() 

第二步:在channel上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理。

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.everjiankang.dependency.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;

public class Producer {

    //交换机名称
    private static final String EXCHANGE_NAME = "test_exchange_fanout_message_confirm";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout", true,true,null);
        //指定消息投递模式为:消息的确认模式
        channel.confirmSelect();
        //设置confirm返回监听
        channel.addConfirmListener(new ConfirmListener() {
            //1.处理失败场景,deliveryTag:消息的唯一标签
            //失败场景:磁盘写满了,队列数达到上限了mq出问题了等
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("-----------no ack------------" + deliveryTag + " | " + multiple);
            }
            //2.处理成功场景,deliveryTag:消息的唯一标签
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("-----------ack------------" + deliveryTag + " | " + multiple);
            }
            //3.第三种情况,Ack 和No Ack都没有收到,这就需要可靠性投递来解决。假设Broker端返回的确认突然出现网络的闪断,
            //那我连ACK到底成功还是失败都不知道,那怎么办呢?用定时任务取抓取一些中间状态的消息,然后重新触发发送,补偿。
        });

        String message = "发送一条需要确认的消息!!!!";
        channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
        System.out.println("生产者 send :"+message);
        channel.close();
        connection.close();
        System.out.println("over");
    }
}
Producer.java(Confirm机制)

四、Return消息机制

1. Return Listener用于处理一些不可路由的消息

2. 我们的消息生产者,通过指定一个Exchange和RoutingKey,把消息送达到某一个队列中去,然后我们的消费者监听队列,进行消费处理操作。

3. 但是在某些情况下,如果我们在发送消息的时候,当前的Exchange不存在或者指定的路由key路由不到,这个时候如果我们需要监听这种不可达的消息,就要使用Return Listener

4. 在API中有一个关键的配置项 Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么Broker会自动删除该消息。

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.everjiankang.dependency.ConnectionUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ReturnListener;

public class Producer {
    
    private static final String EXCHANGE_NAME = "test_exchange_fanout_message_confirm";//交换机名称

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout", true,true,null);
        //设置消息发送后匹配失败时的处理方法
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode,String replyText,String exchange,
                                String routingKey,AMQP.BasicProperties properties,byte[] body)throws IOException {
                System.out.println("-----------ReturnListener start ------------");
                System.out.println(replyCode + " | " + replyText  + " | " +  exchange  + " | " +  routingKey +  " | " +  properties + " | " + new String(body));
                System.out.println("-----------ReturnListener end ------------");
            }
        });
        String message = "发送一条需要确认的消息!!!!";
        
        boolean mandatory = true; //[ˈmændətəri] adj.强制的; 法定的; 义务的; n.    受托者;   
        channel.basicPublish(EXCHANGE_NAME, "", mandatory, null, message.getBytes());
        System.out.println("生产者 send :"+message);
        channel.close();
        connection.close();
        System.out.println("over");
    }
}
Producer.java(Return机制)

  

五、消费端限流

  假设一个场景,首先,我们Rabbitmq服务器上有上万条未处理的消息,我们随便打开一个消费者客户端,就会出现如下情况:巨量的消息瞬间全部推送过来,但是我们单个客户端无法同事处理这么多数据。

  RabbitMQ提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consume或者channel设置QoS的值)未被确认前,不尽兴消费新的消息。

限流API

void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

参数讲解:

prefetchSize:消息的最大大小,比如5M,0为不限制;

prefetchCount:最多发送多少条消息,实际工作中设置为1。会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,

即一旦有N个消息还没有ack,则该consumer将block(阻塞)掉,直到有消息ack.

global:是否将上面设置应用于channel级别还是consumer级别。true:在channel通道级别做限制;false:在consumer级别做限制


注意:
prefetchSize和global这两项,Rabbitmq没有实现,暂且不研究,一个设置为0,一个设置为false就好了。
prefetch_count在no_ask=false的情况下生效,即在自动应答的情况下这两个值是不生效的。(必须手动签收,不能自动签收)

//同一时刻服务器只会发一条数据给消费者
        channel.basicQos(0, 1, false);
//        channel.basicQos(1);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                String message = new String(body,"UTF-8");
                System.out.println("收到消息:"+message);
                
                boolean multiple = false; //是否批量签收
                //这个方法会主动回送给Broker一个应答,表示这条消息我已经处理完了,你可以再给我下一条了
                channel.basicAck(envelope.getDeliveryTag(),multiple); 
            }
        };
        boolean autoAck = false; //不自动确认
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
注意:
如果 channel.basicAck(envelope.getDeliveryTag(),multiple); 代码被注释掉,则收到一条消息后则没法再继续处理了下一条消息了。
比如生产者发了5个,处理完第一个后,就卡在这了,其余4条没有继续处理。


六、消息的ACK(手动确认)与NACK(重回队列)

消费端的手工ACK和NACK
手工ACK:消费成功了,向发起者确认
NACK:消费失败,让生产者重新发
 消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿。
如果由于服务器宕机等严重问题,那我们就需要手工进行ACK保障消费端消费成功。

消费端的重回队列
消费端重回队列是为了对没有处理成功的消息,把消息重新回递给Broker!
一般我们在实际应用中,都会关闭重回队列,也就是设置为false
boolean autoAck = false; //不自动确认
channel.basicConsume(QUEUE_NAME,autoAck,consumer);


/**
 * 生产者端只发送消息给Exchange还有指定路由key,并不再指定具体队列名称了
 * 具体队列名称有消费者端创建,随便创建,只要其队列绑定到了本Exchange和路由即可
 */
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import com.everjiankang.dependency.ConnectionUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer {
    
     private static final String EXCHANGE_NAME = "test_exchange_topic";
     private final static String ROUTING_KEY_NAME = "order.update";
     
     public static void main(String[] args) throws IOException, TimeoutException {
         Connection connection = ConnectionUtil.getConnection();
         Channel channel = connection.createChannel();
         //声明交换机
         channel.exchangeDeclare(EXCHANGE_NAME,"topic");
         //发送五条消息,属性参数properties
         for (int i = 0; i < 3; i++) {
            //【设置属性参数】:AMQP.BasicProperties() start
             Map<String,Object> headers = new HashMap<>();
             headers.put("id", i);
             headers.put("name", "xiaochao");
             AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                     .contentEncoding("UTF-8")
                     .deliveryMode(2)
                     .expiration("10000")
                     .headers(headers)
                     .build();
             String message = "匹配insert" + i;
             channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY_NAME,true,properties,message.getBytes());
         }
         channel.close();
         connection.close();
         System.out.println("game over");
     }
 }
Producer.java

import java.io.IOException;

import com.everjiankang.dependency.ConnectionUtil;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
 
 public class Consumer1 {
     
     private static final String EXCHANGE_NAME = "test_exchange_topic";
     private  static final String QUEUE_NAME = "test_queue_topic_1";
     private final static String ROUTING_KEY_NAME = "order.*";//order.#
 
     public static void main(String[] args) throws IOException {
         Connection connection = ConnectionUtil.getConnection();
         Channel channel = connection.createChannel();
         channel.queueDeclare(QUEUE_NAME,false,false,false,null);
         channel.exchangeDeclare(EXCHANGE_NAME,"topic");
         channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY_NAME);
         //channel.basicQos(1); //允许一次多个消息进到队列里来
         Consumer consumer = new DefaultConsumer(channel) {
             @Override
             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                 throws IOException {
                 super.handleDelivery(consumerTag, envelope, properties, body);
                 System.out.println(properties.getHeaders().get("id") + " | "+ new String(body,"UTF-8"));
                 if(properties.getHeaders().get("id").equals(1)) {
                     try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                     channel.basicNack(envelope.getDeliveryTag(), false, true);
                 } else {
                     channel.basicAck(envelope.getDeliveryTag(), false);
                 }
             }
         };
         channel.basicConsume(QUEUE_NAME,false,consumer);
     }
 }
Consumer.java
返回结果:
0 | 匹配insert0
1 | 匹配insert1
2 | 匹配insert2  (由于没有限制basicQos(1):允许一次多个消息进来,所以会先消费所有队列里的,然后)
1 | 匹配insert1
1 | 匹配insert1
1 | 匹配insert1
1 | 匹配insert1

id为1的重发了4次,总共5次,
如果将channel.basicQos(1);放开,每次只允许一个进来,那么结果如下:
0 | 匹配insert0
1 | 匹配insert1
1 | 匹配insert1(由于basicQos(1):一次只允许一个消息进来,所以1会一直重发)
1 | 匹配insert1
1 | 匹配insert1
1 | 匹配insert1

七、TTL队列 / 消息(Time To Live)生存时间

1. RabbitMQ支持消息的过期时间,在消息发送时可以进行指定

2. RabbitMQ支持队列的过期时间,从消息入队开始计算,只要超过了队列的超时时间配置,那么消息会自动地被删除

界面操作步骤:

1. 创建队列并设置超时时间

2. 创建Exchange 

3.  Exchange上绑定路由

 4. 在能被路由到的队列里也可以看到刚刚由Exchange里创建的绑定

 5. Exchange中发消息

 6. 在队列里看到了这条消息

7. 10秒后消息消失

 

8. 发消息时可以设定超时属性:

 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() .expiration("10000").build();

八、死信队列(DLX:Dead-Letter-Exchange)

 0. 何为死信、死信队列

死信:当一个消息没有消费者取消费,此消息就是死信了。任何MQ中都有死信的概念。
利用DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX

1. 消息变成死信的几种情况

1. 消息被绝(basic.reject / basic.nack) 并且requeue=false:不需要重回队列了
2. 消息TTL过期
3. 队列达到最大长度

  DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何队列上被指定,实际上就是设置某个队列的属性
当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。
可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ3.0以前支持的immediate参数的功能。


2. 死信队列的具体实现实战

正常声明交换机、队列、绑定:
(1)Exchange:dlx.excahnge
(2)Queue:dlx.queue
(3)RoutingKey: #
(4)队列上加参数:argumentgs.put("x-dead-letter-exchange","dlx.exchange");


启动消费端,创建交换机和队列以及它们之间的绑定关系,然后关闭消费者端程序,使其无法接收并处理队列信息 
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import com.everjiankang.dependency.ConnectionUtil;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
 
 public class Consumer1 {
     
     private static final String EXCHANGE_NAME = "test_dlx_exchange";
     private  static final String QUEUE_NAME = "test_dlx_queue";
     private final static String ROUTING_KEY_NAME = "dlx.#";
 
     public static void main(String[] args) throws IOException {
         Connection connection = ConnectionUtil.getConnection();
         Channel channel = connection.createChannel();
         
         //1. 声明(创建)正常接收消息的队列
         Map<String,Object> arguments = new HashMap<>();
         arguments.put("x-dead-letter-exchange", "dlx.exchange");  //设置死信队列参数:交换机名称
         channel.exchangeDeclare(EXCHANGE_NAME,"topic");
         channel.queueDeclare(QUEUE_NAME,false,false,false,arguments); //本队列绑定死信队
         channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY_NAME);
         channel.basicQos(1); //允许一次多个消息进到队列里来
         
         //2. 声明(创建)死信队列
         channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
         channel.queueDeclare("dlx.queue", true, false, false,null);
         channel.queueBind("dlx.queue", "dlx.exchange", "#");
         
         Consumer consumer = new DefaultConsumer(channel) {
             @Override
             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                 throws IOException {
                 super.handleDelivery(consumerTag, envelope, properties, body);
                 System.out.println(properties.getHeaders().get("id") + " | "+ new String(body,"UTF-8"));
                 boolean multiple = false;
                 if(properties.getHeaders().get("id").equals(1)) {
                     try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    boolean requeue = false; //重回队列
                    channel.basicNack(envelope.getDeliveryTag(), multiple, requeue);
                 } else {
                     channel.basicAck(envelope.getDeliveryTag(), multiple); 
                 }
             }
         };
         boolean autoAck = false; //是否自动确认,设置为否
         channel.basicConsume(QUEUE_NAME,autoAck,consumer);
     }
 }
Consumer1.java(队列绑定死信队列)
启动消息发送者程序 
/**
 * 生产者端只发送消息给Exchange还有指定路由key,并不再指定具体队列名称了
 * 具体队列名称有消费者端创建,随便创建,只要其队列绑定到了本Exchange和路由即可
 */
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import com.everjiankang.dependency.ConnectionUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer {
    
     private static final String EXCHANGE_NAME = "test_dlx_exchange";
     private final static String ROUTING_KEY_NAME = "dlx.save";
     
     public static void main(String[] args) throws IOException, TimeoutException {
         Connection connection = ConnectionUtil.getConnection();
         Channel channel = connection.createChannel();
         //声明交换机
         channel.exchangeDeclare(EXCHANGE_NAME,"topic");
         //发送五条消息,属性参数properties
         for (int i = 0; i < 1; i++) {
            //【设置属性参数】:AMQP.BasicProperties() start
             Map<String,Object> headers = new HashMap<>();
             headers.put("id", i);
             headers.put("name", "xiaochao");
             AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                     .contentEncoding("UTF-8")
                     .deliveryMode(2)
                     .expiration("10000")
                     .headers(headers)
                     .build();
             String message = "hello DLX message" + i;
             channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY_NAME,true,properties,message.getBytes());
         }
         channel.close();
         connection.close();
         System.out.println("game over");
     }
 }
Producer.java(设置过期时间)

界面端现象:
1.启动消费者端程序,然后暂停消费者端程序,再启动生产者发生消息,消息首先发送到了test_dlx_queue

2. 由于消费端程序被关闭了,且消息又设置了过期时间,所以10s后RabbitMQ将消息转至死信队列
 
原文地址:https://www.cnblogs.com/guchunchao/p/13139428.html