RocketMQ(4.8.0)——Broker 的关机恢复机制

Broker 的关机恢复机制

一、Broker关机恢复概述

  Broker关机恢复是指恢复 CommitLog、Consume Queue、Index File 等数据文件。Broker 关机分为正常调用命令关机和异常被迫进程停止关机两种情况。恢复过程的设计目标是使进程正常停止的进程实现零数据丢失,异常停止的进程实现最少量的数据丢失。与关机恢复相关的主要文件有两个:abort 和 checkpoint。

  abort 是一个空文件,标记当前 Broker 是否正常关机,Broker 进程正常启动的时候,创建该文件。Broker进程正常停止后,该文件会被删除;如果异常退出,则文件依旧存在,创建和删除的过程如下:

  checkpoint 是检查点文件,保存 Broker 最后正常存储各种数据的时间,在重启 Broker 时,恢复程序知道从什么时刻恢复数据。检查点逻辑由 org.apache.rocketmq.store.StoreCheckpoint 类实现。

  在 StoreCheckpoint 类中保存了 3 个时间,更新过程如下图:

StoreCheckpoint 中存储的 3 个时间参数:

  • physicMsgTimestamp:最后一条已存储 CommitLog 的消息的存储时间。
  • logicsMsgTimestamp:最后一条已存储 Consume Queue 的消息的存储时间。
  • indexMsgTimestamp:最后一条已存储 Index File 的消息的存储时间。

  physicMsgTimestamp 和 logicsMsgTimestamp 的更新都是在数据存储成功后进行的,过程比较简单。而 indexMsgTimestamp 的逻辑是在 Index File 刷盘时被更新的,Index File 刷盘方法 org.apache.rocketmq.store.index.IndexService.flush()。在Index File 刷盘后,已刷盘文件的最后存储消息时间被赋值给 indexMsgTimestamp,并对 Checkpoint 文件进行刷盘。

二、Broker关机恢复流程

  Broker 在启动时会初始化 abort、checkpoint 两个文件。正常关闭进程时会删除 abort 文件,将 checkpoint 文件刷盘;异常关闭时,通常来不及删除 abort 文件。由此,在重新启动 Broker 时会根据 abort 判断是否需要异常停止进程,而后恢复数据。

  Broker 启动时,会启动存储服务 org.apache.rocketmq.store.DefaultMessageStore。存储服务在初始化时会执行 load 方法加载全部数据。数据加载过程见下图:

  Broker 关机的恢复过程可以分为以下几步:

  第一步:Broker 异常退出检查。如果 abort 文件存在,说明上次是异常退出。

  第二步:加载延迟消息的位点信息。ScheduleMessageService 服务通过继承和重写 ConfigManager,调用个 load() 方法从磁盘加载延迟位点文件的内容,并根据配置项 messageDelayLevel 初始化延迟级别。

  第三步:加载全部 CommitLog文件,如上图#1部分。通过读取 CommitLog 目录下的所有文件,依次加载每个 CommitLog 为 MappedFile,并且设置写指针、已刷盘指针、已提交指针,使所有指针都指向该文件的最末位。

  第四步:加载全部 Consume Queue 文件及数据,如上图 #2、#3。调用 loadConsumeQueue 方法,读取 consumequeue/topic/queueId/目录,加载全部 topic、queueId 作为 ConsumeQueue 对象,再调用 load 方法初始化每一个 ConsumeQueue。

  第五步:初始化 Checkpoint 文件为 StoreCheckpoint 对象,并且初始化三个数据:physicMsgTimestamp、logicsMsgstamp、indexMsgTimestamp。

  第六步:加载 Index File 索引,如上图#4所示。加载index目录下全部索引文件,如果上次进程异常退出并且索引文件操作的最后时间戳大于 Checkpoint 中保存的时间,则说明当前文件有部分数据可能存在错误,须立即销毁文件。

  第七步:回复全部数据,如上图#5部分。

原文地址:https://www.cnblogs.com/zuoyang/p/14467949.html