RabbitMQ消息中间件(第三章)第一部分-笔记

RabbitMQ高级特性

本章导航

  • 消息如何保障100%的投递成功?
  •  幂等性概念详解
  • 在海量订单产生的业务高峰期,如何避免消息的重复消费问题?
  • Confirm确认消息、Return返回消息
  • 自定义消费者
  • 消息的ACK与重回队列
  • 消息的限流
  • TTL消息
  • 死信队列

消息如何保障100%的投递成功?

什么是生产端的可靠性投递?

  • 保障消息能成功发出
  • 保障MQ节点的成功接收
  • 发送端收到MQ节点(Broker)确认应答
  • 完善的消息进行补偿机制

生产端-可靠性投递

BAT/TMD 互联网大厂的解决方案:

  • 消息落库,对消息状态进行打标 (将消息持久化到数据库,通过发送并变更数据表消息状态,1表示消息成功,2表示失败,3表示发送中,对于3超时无响应的消息可进行数据库轮询查询,然后在重新发送等操作,可配置重试的临界值,比如5次或4次之后该消息结束重试,变更为2)

   消息信心落库,对消息状态进行打标:

  

   中间蓝色框属于生产者,在实际的开发中,生产者不会先发送消息到MQ Broker上,而是现在消息持久化到数据库里,通常存放一个业务DB和消息DB,

  并发量不大的话也可以将两者放到一个DB里存放。流程:生产端producer -> 数据库持久化Step1(标记MSG DB消息为0发送中)->发送消息Step2到MQ Broker

  ->生产端会有监听Confirm Listener负责接收Step3:消息的确认-> 找到MSG DB对应的消息(标记为1,发送成功)

  在生产中也会出现极端情况,例如Step2发送到MQ Broker无响应,或者MQ Broker发送到生产端监听出现网络不稳定情况,导致接收失败,这时候就需要系统的补偿

  机制,通过分布式定时任务,轮询查询MSG DB状态0,重试次数小于3次则重新发送消息到MQ,这里谨记,重试的时间间隔不能太短,否则导致消息下一秒就要接收到

  时又重发消息到MQ上。根据实际情况配置重试间隔时间,例如2,5,10。不过这种方式的缺点就是会多次请求DB。所以就有了【消息的延迟投递,做二次确认,回调检查】

  减少与DB间的请求处理。

  • 消息的延迟投递,做二次确认,回调检查

   

  upstream service:上游服务

  downstream service:下游服务

  callback service:相当于补偿服务

  流程: upstream service上游服务将消息的业务存入BIZ DB,这里只存业务 -> Step1 第一次发送消息到MQ Broker,同时2分钟后发送第二条消息延迟投递检查到MQ Broker

  -> Step3下游服务通过监听获取到消息 -> Step4下游服务构建消息在发送给MQ Broker -> Step5这时候callback service会去监听这条消息,获取成功后存入MSG DB -> 而上游服务

  发送的消息延迟投递检查也会被callback service监听到,不过这里监听的与下游发送的消息不能同个队列,callback service收到消息延迟投递检查后,会去MSG DB里检查这条消息

  是否处理成功,成功则结束,失败(Step3、Step4、Step5处理或接受消息无响应)则会发送RPC ReSend Command到上游服务,告诉它处理失败在重新发送。

  优点:在于业务DB和消息DB解耦,上游服务一开始只要存业务的DB,省去存放消息DB,只有真正接收成功才会存放消息DB,减少不必要的DB请求,同时callback service也属于补偿

  机制。 

幂等性概念详解

  •  我们可以借鉴数据库的乐观锁机制
  • 比如我们执行一条更新库存的SQL语句
  • UPDATE T_REPS SET COUNT = COUNT - 1, VERSION = VERSION + 1 where VERSION = 1

  幂等性其实是说可能对一个事情进行操作,这个操作可能执行一百次一千次,那我们最终的操作结果这一百次一千次的结果都是相同的,那样就是一个幂等性

消费端-幂等性保障

  在海量订单产生的业务高峰期,如何避免消息的重复消费问题?

  • 消费端实现幂等性,就意味着,我们的消息永远不会消费多次,即时我们收到了多一条一样的消息

  业界主流的幂等性操作:

  • 唯一ID + 指纹码 机制,利用数据库主键去重
    • 唯一ID + 指纹码机制,利用数据库主键去重
    • SELECT COUNT(1) FROM T_ORDER WHERE ID = 唯一ID + 指纹码
    • 好处:实现简单
    • 坏处:高并发下有数据库写入的性能瓶颈
    • 解决方案:跟进ID进行分库分表进行算法路由
  • 利用Redis的原子性去实现  
    • 使用Redis进行幂等,需要考虑的问题
    • 第一:我们是否要进行数据落库,如果落库的话,关键解决的问题是数据库和缓存如何做到原子性?
    • 第二:如果不进行落库,那么都存储到缓存中,如何设置定时同步的策略?

Confirm确认消息

理解Confirm消息确认机制:

  • 消息的确认,是指生产者投递消息后,如何Broker收到消息,则会给我们生产者一个应答。
  • 生产者进行接收应答,用来确认这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递的核心保障!

如何实现Confirm确认消息?

  • 第一步:在channel上开启确认模式:channel.confirmSelect()
  • 第二步:在channel上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理!

以下是代码的实现

package com.cx.temp.common.rabbitmq.confirm;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

/**
 * 确认模式-生产端
 */
public class Producer {

    public static void main(String[] args)  throws Exception {

        //1 创建一个ConectionFacory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/test001");
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("123456");

        //2 通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3 通过connection创建一个Channel
        Channel channel = connection.createChannel();

        //4 指定我们的消息投递模式:消息的确认模式
        channel.confirmSelect();

        String exchangeName = "test_confirm_exchange";
        String routingKey = "confirm.save";

        //5 发送消息
        String msg = "Hello RabbitMQ send confirm message!";
        channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());

        //6 添加一个确认监听
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("---------ack!---------");
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("---------no ack!---------");
            }
        });



    }

}
package com.cx.temp.common.rabbitmq.confirm;

import com.cx.temp.common.rabbitmq.quickstart.QueueingConsumer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 确认模式-消费端
 */
public class Consumer {

    public static void main(String[] args)  throws Exception {

        //1 创建一个ConectionFacory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/test001");
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("123456");

        //2 通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3 通过connection创建一个Channel
        Channel channel = connection.createChannel();

        String exchangeName = "test_confirm_exchange";
        String routingKey = "confirm.#"; //topic可模糊匹配 *表示可以匹配一个单词,#表示可以匹配多个单词
        String queueName = "test_confirm_queue";

        //声明交换机和队列然后进行绑定,最后指定路由KEY
        channel.exchangeDeclare(exchangeName, "topic", true);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);

        //5 创建消费者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, queueingConsumer);

        while (true) {
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("消费端:" + msg);
        }
    }

}

消费的启动后,管理控制台展示

   

运行生产端代码

原文地址:https://www.cnblogs.com/huihui-hui/p/14320729.html