RocketMQ(4.8.0)——Broker 过期文件删除机制

Broker 过期文件删除机制

  RocketMQ 中主要保存了 CommitLog、Consume Queue、Index File 三种数据文件。由于内存和磁盘都是有限的资源,Broker 不可能永久地保存所有数据,所以一些超过保存期限的数据会被定期删除。RocketMQ 通过设置数据过期时间来删除额外的数据文件,具体的实现逻辑是通过 D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreDefaultMessageStore.start() 方法启动的周期性执行方法 cleanFilesPeriodically()方法,该方法的代码路径:D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreDefaultMessageStore.java 来实现的。

 一、CommitLog 文件的删除过程

  CommitLog 文件由 D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreDefaultMessageStore.CleanCommitLogService 类提供的一个线程服务周期执行删除操作,代码路径:

  1     class CleanCommitLogService {
  2 
  3         private final static int MAX_MANUAL_DELETE_FILE_TIMES = 20;
  4         private final double diskSpaceWarningLevelRatio =
  5             Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "0.90"));
  6 
  7         private final double diskSpaceCleanForciblyRatio =
  8             Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio", "0.85"));
  9         private long lastRedeleteTimestamp = 0;
 10 
 11         private volatile int manualDeleteFileSeveralTimes = 0;
 12 
 13         private volatile boolean cleanImmediately = false;
 14 
 15         public void excuteDeleteFilesManualy() {
 16             this.manualDeleteFileSeveralTimes = MAX_MANUAL_DELETE_FILE_TIMES;
 17             DefaultMessageStore.log.info("executeDeleteFilesManually was invoked");
 18         }
 19 
 20         public void run() {
 21             try {
 22                 this.deleteExpiredFiles();  #删除过期文件
 23 
 24                 this.redeleteHangedFile();
 25             } catch (Throwable e) {
 26                 DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
 27             }
 28         }
 29 
 30         private void deleteExpiredFiles() {
 31             int deleteCount = 0;
 32             long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
 33             int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
 34             int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
 35 
 36             boolean timeup = this.isTimeToDelete();
 37             boolean spacefull = this.isSpaceToDelete();
 38             boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
 39 
 40             if (timeup || spacefull || manualDelete) {
 41 
 42                 if (manualDelete)
 43                     this.manualDeleteFileSeveralTimes--;
 44 
 45                 boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
 46 
 47                 log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}",
 48                     fileReservedTime,
 49                     timeup,
 50                     spacefull,
 51                     manualDeleteFileSeveralTimes,
 52                     cleanAtOnce);
 53 
 54                 fileReservedTime *= 60 * 60 * 1000;
 55 
 56                 deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
 57                     destroyMapedFileIntervalForcibly, cleanAtOnce);
 58                 if (deleteCount > 0) {
 59                 } else if (spacefull) {
 60                     log.warn("disk space will be full soon, but delete file failed.");
 61                 }
 62             }
 63         }
 64 
 65         private void redeleteHangedFile() {
 66             int interval = DefaultMessageStore.this.getMessageStoreConfig().getRedeleteHangedFileInterval();
 67             long currentTimestamp = System.currentTimeMillis();
 68             if ((currentTimestamp - this.lastRedeleteTimestamp) > interval) {
 69                 this.lastRedeleteTimestamp = currentTimestamp;
 70                 int destroyMapedFileIntervalForcibly =
 71                     DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
 72                 if (DefaultMessageStore.this.commitLog.retryDeleteFirstFile(destroyMapedFileIntervalForcibly)) {
 73                 }
 74             }
 75         }
 76 
 77         public String getServiceName() {
 78             return CleanCommitLogService.class.getSimpleName();
 79         }
 80 
 81         private boolean isTimeToDelete() {
 82             String when = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen();
 83             if (UtilAll.isItTimeToDo(when)) {
 84                 DefaultMessageStore.log.info("it's time to reclaim disk space, " + when);
 85                 return true;
 86             }
 87 
 88             return false;
 89         }
 90 
 91         private boolean isSpaceToDelete() {
 92             double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
 93 
 94             cleanImmediately = false;
 95 
 96             {
 97                 double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic());
 98                 if (physicRatio > diskSpaceWarningLevelRatio) {
 99                     boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
100                     if (diskok) {
101                         DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full");
102                     }
103 
104                     cleanImmediately = true;
105                 } else if (physicRatio > diskSpaceCleanForciblyRatio) {
106                     cleanImmediately = true;
107                 } else {
108                     boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
109                     if (!diskok) {
110                         DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok");
111                     }
112                 }
113 
114                 if (physicRatio < 0 || physicRatio > ratio) {
115                     DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio);
116                     return true;
117                 }
118             }
119 
120             {
121                 String storePathLogics = StorePathConfigHelper
122                     .getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir());
123                 double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
124                 if (logicsRatio > diskSpaceWarningLevelRatio) {
125                     boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
126                     if (diskok) {
127                         DefaultMessageStore.log.error("logics disk maybe full soon " + logicsRatio + ", so mark disk full");
128                     }
129 
130                     cleanImmediately = true;
131                 } else if (logicsRatio > diskSpaceCleanForciblyRatio) {
132                     cleanImmediately = true;
133                 } else {
134                     boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
135                     if (!diskok) {
136                         DefaultMessageStore.log.info("logics disk space OK " + logicsRatio + ", so mark disk ok");
137                     }
138                 }
139 
140                 if (logicsRatio < 0 || logicsRatio > ratio) {
141                     DefaultMessageStore.log.info("logics disk maybe full soon, so reclaim space, " + logicsRatio);
142                     return true;
143                 }
144             }
145 
146             return false;
147         }
148 
149         public int getManualDeleteFileSeveralTimes() {
150             return manualDeleteFileSeveralTimes;
151         }
152 
153         public void setManualDeleteFileSeveralTimes(int manualDeleteFileSeveralTimes) {
154             this.manualDeleteFileSeveralTimes = manualDeleteFileSeveralTimes;
155         }
156         public boolean isSpaceFull() {
157             String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
158             double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
159             double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
160             if (physicRatio > ratio) {
161                 DefaultMessageStore.log.info("physic disk of commitLog used: " + physicRatio);
162             }
163             if (physicRatio > this.diskSpaceWarningLevelRatio) {
164                 boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
165                 if (diskok) {
166                     DefaultMessageStore.log.error("physic disk of commitLog maybe full soon, used " + physicRatio + ", so mark disk full");
167                 }
168 
169                 return true;
170             } else {
171                 boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
172 
173                 if (!diskok) {
174                     DefaultMessageStore.log.info("physic disk space of commitLog OK " + physicRatio + ", so mark disk ok");
175                 }
176 
177                 return false;
178             }
179         }
180     }

this.deleteExpiredFiles(),当满足3个条件时执行删除操作:

  • 第一,当前时间等于已经配置的删除时间。
  • 第二,磁盘使用空间超过85%。
  • 第三,手动执行删除

上面代码,第56行,DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,destroyMapedFileIntervalForcibly, cleanAtOnce),代码路径:D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreDefaultMessageStore.java,该方法调用的了return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately),代码路径:D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreCommitLog.java。我么讲讲this.mappedFileQueue.deleteExpiredFileByTime()方法是如何删除 CommitLog文件的,代码如下:

1     public int deleteExpiredFile(
2         final long expiredTime,
3         final int deleteFilesInterval,
4         final long intervalForcibly,
5         final boolean cleanImmediately
6     ) {
7         return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);
8     }
 1     public int deleteExpiredFileByTime(final long expiredTime,
 2         final int deleteFilesInterval,
 3         final long intervalForcibly,
 4         final boolean cleanImmediately) {
 5         Object[] mfs = this.copyMappedFiles(0);  #全部 commitLog 文件
 6 
 7         if (null == mfs)
 8             return 0;
 9 
10         int mfsLength = mfs.length - 1;
11         int deleteCount = 0;
12         List<MappedFile> files = new ArrayList<MappedFile>(); #已经删除的文件
13         if (null != mfs) {
14             for (int i = 0; i < mfsLength; i++) {
15                 MappedFile mappedFile = (MappedFile) mfs[i];
16                 long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
17                 if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) { #删除条件:过期或者必须立即删除
18                     if (mappedFile.destroy(intervalForcibly)) {  #关闭文件映射,删除物理文件
19                         files.add(mappedFile);
20                         deleteCount++;
21 
22                         if (files.size() >= DELETE_FILES_BATCH_MAX) {
23                             break;
24                         }
25 
26                         if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
27                             try {
28                                 Thread.sleep(deleteFilesInterval);
29                             } catch (InterruptedException e) {
30                             }
31                         }
32                     } else {
33                         break;
34                     }
35                 } else {
36                     //avoid deleting files in the middle
37                     break;
38                 }
39             }
40         }
41 
42         deleteExpiredFile(files); #删除内存中的文件信息
43 
44         return deleteCount;
45     }

deleteExpiredFileByTime()方法的实现分为如下两步:

  • 克隆全部的 CommitLog 文件。CommitLog 文件可能随时有数据写入,为了不影响正常写入,所以可能一份来操作。
  • 检查每一个 CommitLog 文件是否过期,如果已过期则立即通过调用 destroy() 方法进行删除。在删除前会做一系列检查:检查文件被引用的次数、清理映射的所有内存数据对象、释放内存。清理完成后,删除物理文件。

 二、Consume Queue、Index File 文件的删除过程

  Consume Queue 和 Index File 都是索引文件,在 CommitLog 文件被删除后,对应的索引文件其实没有存在的意义,并且占用磁盘空间,所以这些文件应该被删除。

  RocketMQ 的删除策略是定时检查,满足删除条件时会删除过期或者无意义的文件。

  程序调用 D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreDefaultMessageStore.java 中 deleteExpiredFiles(),代码如下:

 1         private void deleteExpiredFiles() {
 2             int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();
 3 
 4             long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();   #CommitLog 全部文件中的最小物理位点。
 5             if (minOffset > this.lastPhysicalMinOffset) { #上次检查到的最小物理位点。当 if (minOffset > this.lastPhysicalMinOffset) 条件成立时,说明当前有新数据没有被检查过,就会
 6             调用 org.apache.rocketmq.store.MappedFileQueue.deleteExpiredFileByOffset()方法进行检查及删除。
 7                 this.lastPhysicalMinOffset = minOffset;
 8 
 9                 ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
10 
11                 for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
12                     for (ConsumeQueue logic : maps.values()) {
13                         int deleteCount = logic.deleteExpiredFile(minOffset);
14 
15                         if (deleteCount > 0 && deleteLogicsFilesInterval > 0) {
16                             try {
17                                 Thread.sleep(deleteLogicsFilesInterval);
18                             } catch (InterruptedException ignored) {
19                             }
20                         }
21                     }
22                 }
23 
24                 DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
25             }
26         }

  org.apache.rocketmq.store.MappedFileQueue.deleteExpiredFileByOffset() 的代码路径:D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreMappedFileQueue.java,代码如下:

 1     public int deleteExpiredFileByOffset(long offset, int unitSize) {
 2         Object[] mfs = this.copyMappedFiles(0);
 3 
 4         List<MappedFile> files = new ArrayList<MappedFile>();
 5         int deleteCount = 0;
 6         if (null != mfs) {
 7 
 8             int mfsLength = mfs.length - 1;
 9 
10             for (int i = 0; i < mfsLength; i++) {
11                 boolean destroy;
12                 MappedFile mappedFile = (MappedFile) mfs[i];
13                 SelectMappedBufferResult result = mappedFile.selectMappedBuffer(this.mappedFileSize - unitSize);
14                 if (result != null) {
15                     long maxOffsetInLogicQueue = result.getByteBuffer().getLong();
16                     result.release();
17                     destroy = maxOffsetInLogicQueue < offset;  #maxOffsetInLogicQueue:Consume Queue 中最大的位点值。 offset:检查的最小位点。如果maxOffsetInLogicQueue < offset 成立,则说明 Consume Queue 已经过期了,可以删除。
18                     if (destroy) {
19                         log.info("physic min offset " + offset + ", logics in current mappedFile max offset "
20                             + maxOffsetInLogicQueue + ", delete it");
21                     }
22                 } else if (!mappedFile.isAvailable()) { // Handle hanged file. 说明存储服务已经被关闭(或者该文件曾经被删除,但是删除失败)
23                     log.warn("Found a hanged consume queue file, attempting to delete it.");
24                     destroy = true;
25                 } else {
26                     log.warn("this being not executed forever.");
27                     break;
28                 }
29 
30                 if (destroy && mappedFile.destroy(1000 * 60)) {
31                     files.add(mappedFile);
32                     deleteCount++;
33                 } else {
34                     break;
35                 }
36             }
37         }
38 
39         deleteExpiredFile(files);
40 
41         return deleteCount;
42     }
原文地址:https://www.cnblogs.com/zuoyang/p/14465764.html