RocketMQ(4.8.0)——Broker消息存储机制

Broker消息存储机制

  RocketMQ 使用 CommitLog 文件将消息存储到磁盘上,那么 RocketMQ 存储消息到磁盘的过程是怎么样的呢?

  RocketMQ 首先将消息数据写入操作系统 PageCache,然后定时将数据刷入磁盘。

一、Broker 消息存储的流程是什么?

  下面主要介绍 RocketMQ 是如何接收发送消息请求并将消息写入 PageCache 的,整个过程如下:

(1)Broker 接收客户端发送消息的请求并做预处理。

  SendMessageProcessor.processRequest()方法会自动被调用者接收、解析客户端请求为消息实例。

    • 解析请求参数
    • 执行发送处理前的 Hook
    • 调用保存方法存储消息
    • 执行发送处理后的 Hook

(2)Broker存储前预处理消息。

  预处理方法:org.apache.rocketmq.broker.processor.SendMessageProcessor.sendMessage(),

  首先,设置请求处理返回对象标志,代码路径:D: ocketmq-masterrokersrcmainjavaorgapache ocketmqrokerprocessorSendMessageProcessor.java,代码如下:

1     final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
2     final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
3 
4     response.setOpaque(request.getOpaque());

  Netty 是异步执行的,请求发送到 Broker 被处理后,返回结果时,在客户端的处理线程已经不再是发送请求的线程,那么客户端如何确定返回结果对应哪个请求呢? 通过返回标志来判断。

  其次,做一些列存储前发送请求的数据检查,比如死信消息处理、Broker 是否拒绝事务消息处理、消息基本检查等。消息基本检查方法代码路径:D: ocketmq-masterrokersrcmainjavaorgapache ocketmqrokerprocessorAbstractSendMessageProcessor.java 中 msgCheck(),该方法的主要功能如下:

    • 校验 Broker 是否配置可写
    • 校验 Topic 名字是否为莫认证
    • 校验 Topic 配置是否存在
    • 校验 queueId 与读写队列数是否匹配
    • 校验 Broker 是否支持事务消息(msgCheck之后进行的校验)

 (3)执行 DefaultMessageStore.putMessage() 方法进行消息校验和存储模块检查

  在真正保存消息前,会对消息数据做基本检查、对存储服务做可用性检查、对 Broker 做是否 Slave 的检查等,总结如下:

    • 校验存储模块是否已经关闭
    • 校验 Broker 是否是Slave
    • 校验存储模块运行标记
    • 校验 Topic 长度
    • 校验扩展信息的长度
    • 校验操作系统 PageCache 是否繁忙。

  (4)执行 D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreCommitLog.java putMessage() 方法,将消息写入 CommitLog。

  存储消息的核心处理过程如下:

    • 设置消息保存时间为当前时间戳,设置消息完整性校验码 CRC(循环冗余码)。
    • 延迟消息处理。如果发送的消息是延迟消息,这里会单独设置延迟消息的数据字段,比如修改 Topic 为延迟消息特有的 Topic —— SCHEDULE_TOPIC_XXXX,并且备份原来的 Topic 和 queueId,以便延迟消息在投递后被消费者消费。

延迟消息的处理代码如下:

 1         final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
 2         if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
 3                 || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
 4             // Delay Delivery
 5             if (msg.getDelayTimeLevel() > 0) {
 6                 if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
 7                     msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
 8                 }
 9 
10                 topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
11                 queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
12 
13                 // Backup real topic, queueId
14                 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
15                 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
16                 msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
17 
18                 msg.setTopic(topic);
19                 msg.setQueueId(queueId);
20             }
21         }
  • 获取最后一个 CommitLog 文件实例 MappedFile,锁住该 MappedFile。默认为自旋锁,也可以通过 useReentrantLockWhenPutMessage 进行配置、修改和使用 ReentrantLock。
  • 校验最后一个 MappedFile,如果结果为空或已写满,则新创建一个 MappedFile 返回。
  • 调用 MappedFile.appendMessage(final MessageExtBrokerInner msg,final AppendMessageCallback cb),将消息写入 MappedFile。

根据消息是单个消息还是批量消息来调用 AppendMessageCallback.doAppend()方法,并将消息写入 Page Cache,该方法的功能包含以下几点:

  1. 查找即将写入的消息物理机 Offset
  2. 事务消息单独处理。这里主要处理 Prepared 类型和 Rollback 类型的消息,设置消息 queueOffset 为 0 。
  3. 序列化消息,并将序列化结果保存到 ByteBuffer 中(文件内存映射的 Page Cache 或 Direct Memory,简称 DM)。特别地,如果将刷盘设置为异步刷盘,那么当ransientStorePoolEnablTrue时,会先写入DM,DM中的数据再异步写入文件内存映射的Page Cache 中,因为消费者始终是从 Page Cache 中读取消息消费的,所以这个机制也称为 "读写分离"。
  4. 更新消息所在 Queue 的位点。

  在消息存储完成后,会处理刷盘逻辑和主从同步逻辑,分别调用 D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreCommitLog.java 中 handleDiskFlush() 方法,代码如下

 1     public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
 2         // Synchronization flush
 3         if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
 4             final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
 5             if (messageExt.isWaitStoreMsgOK()) {
 6                 GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
 7                 service.putRequest(request);
 8                 CompletableFuture<PutMessageStatus> flushOkFuture = request.future();
 9                 PutMessageStatus flushStatus = null;
10                 try {
11                     flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
12                             TimeUnit.MILLISECONDS);
13                 } catch (InterruptedException | ExecutionException | TimeoutException e) {
14                     //flushOK=false;
15                 }
16                 if (flushStatus != PutMessageStatus.PUT_OK) {
17                     log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
18                         + " client address: " + messageExt.getBornHostString());
19                     putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
20                 }
21             } else {
22                 service.wakeup();
23             }
24         }
25         // Asynchronous flush
26         else {
27             if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
28                 flushCommitLogService.wakeup();
29             } else {
30                 commitLogService.wakeup();
31             }
32         }
33     }

和handleHA()方法,代码如下:

 1     public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
 2         if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
 3             HAService service = this.defaultMessageStore.getHaService();
 4             if (messageExt.isWaitStoreMsgOK()) {
 5                 // Determine whether to wait
 6                 if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
 7                     GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
 8                     service.putRequest(request);
 9                     service.getWaitNotifyObject().wakeupAll();
10                     PutMessageStatus replicaStatus = null;
11                     try {
12                         replicaStatus = request.future().get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
13                                 TimeUnit.MILLISECONDS);
14                     } catch (InterruptedException | ExecutionException | TimeoutException e) {
15                     }
16                     if (replicaStatus != PutMessageStatus.PUT_OK) {
17                         log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
18                             + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
19                         putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
20                     }
21                 }
22                 // Slave problem
23                 else {
24                     // Tell the producer, slave not available
25                     putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
26                 }
27             }
28         }
29 
30     }

  在 Broker 处理发送消息请求时,由于处理器 SendMessageProcessor 本身是一个线程池服务,所以设计了快速失败逻辑,方便在高峰时自我保护,代码路径:D: ocketmq-masterrokersrcmainjavaorgapache ocketmqrokerlatencyBrokerFastFailure.java中的cleanExpiredRequest()方法,代码如下:

 1     private void cleanExpiredRequest() {
 2         while (this.brokerController.getMessageStore().isOSPageCacheBusy()) {
 3             try {
 4                 if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) {
 5                     final Runnable runnable = this.brokerController.getSendThreadPoolQueue().poll(0, TimeUnit.SECONDS);
 6                     if (null == runnable) {
 7                         break;
 8                     }
 9 
10                     final RequestTask rt = castRunnable(runnable);
11                     rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", System.currentTimeMillis() - rt.getCreateTimestamp(), this.brokerController.getSendThreadPoolQueue().size()));
12                 } else {
13                     break;
14                 }
15             } catch (Throwable ignored) {
16             }
17         }
18 
19         cleanExpiredRequestInQueue(this.brokerController.getSendThreadPoolQueue(),
20             this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue());
21 
22         cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue(),
23             this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue());
24 
25         cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(),
26             this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue());
27 
28         cleanExpiredRequestInQueue(this.brokerController.getEndTransactionThreadPoolQueue(), this
29             .brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue());
30     }

  在 BrokerController 启动 BrokerFastFailure 服务时,会启动一个定时任务处理快速失败的异常,启动及扫描代码路径:D: ocketmq-masterrokersrcmainjavaorgapache ocketmqrokerlatencyBrokerFastFailure.java,具体代码如下:

    public void start() {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                if (brokerController.getBrokerConfig().isBrokerFastFailureEnable()) {
                    cleanExpiredRequest(); #每间隔 10ms 执行一次该方法,清理非法、过期的请求。
                }
            }
        }, 1000, 10, TimeUnit.MILLISECONDS);
    }

cleanExpiredRequest()方法处理方式有3种:

  1. 系统繁忙时发送消息请求快速失败处理。当操作系统 PageCache繁忙时,会将发送消息请求从发送消息请求线程池工作队列中取出来,直接返回SYSTEM_BUSY。如果此种情况发生说明系统已经不堪重负,需要增加系统资源或者扩容来减轻当前 Broker 的压力。
  2. 发送请求超时处理。
  3. 拉取消息请求超时处理。

  第2种和第3种的代码逻辑和第1种代码逻辑处理类似,如果出现了,说明请求在线程池的工作队列中的排队时间超过预期配置的时间,那么增加排队等待时间即可。如果请求持续超时,说明系统可能达到瓶颈,那么需要增加系统资源或者扩容。

二、Broker如何保证高效存储?——内存映射机制与高效写磁盘

  RocketMQ 在存储设计中通过内存映射、顺序写文件等方式实现了高吞吐。

  那么这些怎么实现的呢?

  RocketMQ 的基本数据结构:

  org.apache.rocketmq.store.CommitLog:RocketMQ 对存储消息的物理文件的抽象实现,也就是物理 CommitLog 文件的具体实现。

  org.apache.rocketmq.store.MappedFile:CommitLog 文件在内存中的映射文件,映射文件同时具有内存的写入速度和磁盘一样可靠的持久化方式。

  org.apache.rocketmq.store.MappedFileQueue:映射文件队列中有全部的 CommitLog 映射文件,第一个映射文件为最先过期的文件,最后一个文件是最后过期的文件,最新的消息总是写入最后一个映射文件中。

  CommitLog、MappedFile、MappedFileQueue 与物理 CommitLog 文件的关系如下:

  每个 MappedFileQueue 包含多个 MappedFile,就是真实的物理 CommitLog文件,Java 通过 java.nio.MappedByteBuffer 来实现文件的内存映射,即文件读写都是通过 MappedByteBuffer(其实是 Page Cache)来操作的。

  写入数据时先加锁,然后通过 Append 方式写入最新 MappedFile。对于读取消息,大部分情况下用户只关心最新数据,而这些数据都在 Page Cache 中,也就是说,读写文件就是在 Page Cache 中进行的,其速度几乎等于志杰操作内存的速度。

三、文件刷盘机制

  消息存储完成后,会被操作系统持久化到磁盘,也就是刷盘。

  RocketMQ 支持2种刷盘方式,在 Broker 启动时:

    • 配置 flushDiskType = SYNC_FLUSH 表示同步刷盘
    • 配置 flushDiskType = ASYNC_FLUSH 表示异步刷盘

                                                 GroupCommitService 就是 org.apahce.rocketmq.store.CommitLog.GroupCommitServie —— 同步刷盘服务。在 Broker 存储消息到 Page Cache 后,同步将 Page Cache 刷到磁盘,再返回客户端消息并写入结果,具体过程如下所示: 

  FlushRealTimeService 就是 org.apahce.rocketmq.store.CommitLog.FlushRealTimeService —— 异步刷盘服务。在 Broker 存储消息到 Page Cache 后,立即返回客户端写入结果,然后异步刷盘服务将 Page Cache 异步刷盘到磁盘。

  CommitRealTimeService 就是 org.apahce.rocketmq.store.CommitLog.CommitRealTimeService —— 异步转存服务。Broker 通过配置读写分离将消息写入直接内存(Direct Memory),简称 DM),然后通过异步转存服务,将 DM 中的数据再次存储到 Page Cache 中,以供异步刷盘服务将 Page Cache 刷到磁盘中,转存服务过程如下:

  将消息成功保存到 CommitLog 映射文件后,调用 D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreCommitLog.java 中 handleDiskFlush() 方法处理刷盘逻辑,代码如下:

 1     public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
 2         // Synchronization flush
 3         if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
 4             final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
 5             if (messageExt.isWaitStoreMsgOK()) {
 6                 GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
 7                 service.putRequest(request);
 8                 CompletableFuture<PutMessageStatus> flushOkFuture = request.future();
 9                 PutMessageStatus flushStatus = null;
10                 try {
11                     flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
12                             TimeUnit.MILLISECONDS);
13                 } catch (InterruptedException | ExecutionException | TimeoutException e) {
14                     //flushOK=false;
15                 }
16                 if (flushStatus != PutMessageStatus.PUT_OK) {
17                     log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
18                         + " client address: " + messageExt.getBornHostString());
19                     putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
20                 }
21             } else {
22                 service.wakeup();
23             }
24         }
25         // Asynchronous flush
26         else {
27             if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
28                 flushCommitLogService.wakeup();
29             } else {
30                 commitLogService.wakeup();
31             }
32         }
33     }

  通过以上代码可知,同步刷盘、异步刷盘都是在这里发起的。异步刷盘的实现根据是否配置读写分离机制而稍有不同。

  接下来我们介绍两种刷盘方式:

  (1)同步刷盘:

  同步刷盘是一个后台线程服务 ,消息进行同步刷盘的流程如下图:

  存储消息线程:主要负责将消息存储到 Page Cache 或者 DM 中,存储成功后通过调用 handleDiskFlush() 方法将同步刷盘请求 "发送" 给 GroupCommitService 服务,并在该刷盘请求上执行锁等待,代码路径:D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreCommitLog.java 中 handleDiskFlush(),具体代码如下:

 1     public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
 2         // Synchronization flush
 3         if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
 4             final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
 5             if (messageExt.isWaitStoreMsgOK()) {           #客户端可以设置,默认为True
 6                 GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
 7                 service.putRequest(request);               #保存同步磁盘请求
 8                 CompletableFuture<PutMessageStatus> flushOkFuture = request.future(); #请求同步锁等待
 9                 PutMessageStatus flushStatus = null;
10                 try {
11                     flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
12                             TimeUnit.MILLISECONDS);
13                 } catch (InterruptedException | ExecutionException | TimeoutException e) {
14                     //flushOK=false;
15                 }
16                 if (flushStatus != PutMessageStatus.PUT_OK) {
17                     log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() #记录刷盘超时设置
18                         + " client address: " + messageExt.getBornHostString());
19                     putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
20                 }
21             } else {
22                 service.wakeup();    #异步刷盘,不用同步返回
23             }
24         }
25         // Asynchronous flush
26         else {
27             if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
28                 flushCommitLogService.wakeup();
29             } else {
30                 commitLogService.wakeup();
31             }
32         }
33     }

  同步刷盘服务线程:通过 GroupCommitService 类实现的同步刷盘服务。

  具体同步刷盘是怎么执行的,执行完成后又是如何将刷盘结果通知存储数据线程的呢?

  正常同步刷盘线程会间隔 10ms 执行一次 org.apache.rocketmq.store.CommitLog.GroupCommitServcie.doCommit()方法,该方法循环每一个同步刷盘请求,如果刷盘成功,那么唤醒等待刷盘请求锁的存储消息线程,并告知刷盘成功。

  由于操作系统刷盘耗时及每次刷多少字节数据到磁盘等,都不是 RocketMQ 进程能掌控的,所以在每次刷盘前都需要做必要的检查,以确认当前同步刷盘请求对应位点的消息是否已经被刷盘,如果已经被刷盘,当前刷盘请求就不需要执行。

  在 RocketMQ 进程正常关闭时,如果有同步刷盘请求未执行完,那么数据会丢失吗?

  答案是:不会的。在上图,我们得知,关闭刷盘服务时,会执行 Thread.sleep(10) 等待所有的同步刷盘请求保存到刷盘请求队列中后,交换保存刷盘请求的队列,再执行 doCommit() 方法。

(2)异步刷盘:

  如果 Broker 配置读写分离,则异步刷盘过程包含异步转存数据和真正的异步刷盘操作。

  异步转存数据是通过 org.apache.rocketmq.store.CommitLog.GroupCommitServcie.doCommit()方法实现的。

  下面将介绍异步转存数据服务的核心的执行过程。

  (1)获取转存参数。整个转存过程的参数都是可配置的。

  (2)执行转存数据。

  (3)转存失败,唤醒异步刷盘线程。转存数据失败,并不代表没有数据被转存到 Page Cache 中,而是说明有部分数据转存成功,部分数据转存失败。所以可以唤醒刷盘线程执行刷盘操作。而如果转存成功,则正常执行异步刷盘即可。

  在异步转存服务和存储服务把消息写入 Page Cache 后,由异步刷盘将消息刷入磁盘中。异步刷盘服务的主要功能是将 Page Cache 中的数据异步刷入磁盘,并记录 Checkpoint 信息。异步刷盘的实现代码主要在 org.apache.rocketmq.store.CommitLog.FlushRealTimeService.run() 方法中,步骤拆解如下:

  第一步:获取刷盘参数;

  第二步:等待刷盘间隔;

  第三步:执行刷盘;

  第四步:记录 CheckPoint 和耗时日志。这里主要记录最后刷盘成功过时间和刷盘耗时超过 500ms 的情况。

总结:

原文地址:https://www.cnblogs.com/zuoyang/p/14447942.html