this.allocateMappedFileService.shutdown();
}
return result;
}
代码@1:判断 ${ROCKET_HOME}/storepath/abort 文件是否存在,如果文件存在,则返回true,否则返回false,这个文件的作用是什么呢?原来,在DefaultMessageStore 启动时创建,在 shutdown 时删除,也就是如果该文件存在,说明不是正常的关闭。
private boolean isTempFileExist() {
String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
File file = new File(fileName);
return file.exists();
}
代码@2:延迟消息启动。
代码@3:commitlog文件加载。
代码@4:加载consumerqueue文件。
代码@5:文件存储检测点。
代码@6:索引文件加载。
代码@7:文件检测恢复。
接下来,本文重点分析步骤3-6都是基于物理磁盘上的文件,构建成内存映射文件(MappedFile)。
代码@7:验证commitlog、consumequeue、索引文件直接的一致性检测,也是本文重点分析内容。
1、文件恢复
======
DefaultMessageStore#recover
private void recover(final boolean lastExitOK) {
this.recoverConsumeQueue(); // @1
if (lastExitOK) { // @2
this.commitLog.recoverNormally(); // @21
} else {
this.commitLog.recoverAbnormally(); // @22
}
this.recoverTopicQueueTable(); // @3
}
代码@1:恢复消息队列。
代码@2:如果是正常退出,则按照正常修复;如果是异常退出,则走异常修复逻辑。
代码@3,修复主题队列。
1.1 消息队列恢复 DefaultMessageStore#recoverConsumeQueue
ConsumeQueue#recover
public void recover() {
final List mappedFiles = this.mappedFileQueue.getMappedFiles(); // @1
if (!mappedFiles.isEmpty()) {
int index = mappedFiles.size() - 3;
if (index < 0) // @2
index = 0;
int mappedFileSizeLogics = this.mappedFileSize; // @3 start
MappedFile mappedFile = mappedFiles.get(index);
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset(); // @3 end
long mappedFileOffset = 0;
long maxExtAddr = 1;
while (true) { //
for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) { // 4 start
long offset = byteBuffer.getLong(); // @5 start
int size = byteBuffe
《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》
【docs.qq.com/doc/DSmxTbFJ1cmN1R2dB】 完整内容开源分享
r.getInt();
long tagsCode = byteBuffer.getLong(); // @5 end
if (offset >= 0 && size > 0) {
mappedFileOffset = i + CQ_STORE_UNIT_SIZE;
this.maxPhysicOffset = offset;
if (isExtAddr(tagsCode)) {
maxExtAddr = tagsCode;
}
} else {
log.info("recover current consume queue file over, " + mappedFile.getFileName() + " "
- offset + " " + size + " " + tagsCode);
break;
}
} // @4 end
if (mappedFileOffset == mappedFileSizeLogics) { // @6
index++;
if (index >= mappedFiles.size()) {
log.info("recover last consume queue file over, last maped file "
- mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next consume queue file, " + mappedFile.getFileName());
}
} else {
log.info("recover current consume queue queue over " + mappedFile.getFileName() + " "
- (processOffset + mappedFileOffset));
break;
}
}
processOffset += mappedFileOffset; // @7
this.mappedFileQueue.setFlushedWhere(processOffset); // @8
this.mappedFileQueue.setCommittedWhere(processOffset); // @9
this.mappedFileQueue.truncateDirtyFiles(processOffset); // @10
if (isExtReadEnable()) {
this.consumeQueueExt.recover();
log.info(“Truncate consume queue extend file by max {}”, maxExtAddr);
this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);
}
}
}
代码@1:获取该消息队列的所有内存映射文件。
代码@2:只从倒数第3个文件开始,这应该是一个经验值。
代码@3 :首先介绍几个局部变量。
- mappedFileSizeLogics
consumequeue 逻辑大小。
- mappedFile
该queue对应的内存映射文件。
- byteBuffer
内存映射文件对应的ByteBuffer。
- processOffset :
处理的 offset,默认从 consumequeue 中存放的第一个条目开始。
代码@4:循环验证 consumeque 包含条目的有效性(如果offset大于0并且size大于0,则表示是一个有效的条目)
代码@5:读取一个条目的内容。
-
offset :commitlog中的物理偏移量
-
size : 该条消息的消息总长度
-
tagsCode :tag hashcode
如果 offset大于0并且size大于0,则表示是一个有效的条目,设置 consumequeue 中有效的 mappedFileOffset ,继续下一个条目的验证,如果发现不正常的条目,则跳出循环。
代码@6:如果该 consumeque 文件中所有条目全部有效,则继续验证下一个文件,(index++),如果发现条目不合法,后面的文件不需要再检测。
代码@7,:processOffset 代表了当前 consuemque 有效的偏移量。
代码8,@9:设置 flushedWhere,committedWhere 为当前有效的偏移量。
代码@10:截断无效的consumeque文件。
public void truncateDirtyFiles(long offset) {
List willRemoveFiles = new ArrayList();
for (MappedFile file : this.mappedFiles) {
long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;
if (fileTailOffset > offset) { // @1
if (offset >= file.getFileFromOffset()) {
file.setWrotePosition((int) (offset % this.mappedFileSize));
file.setCommittedPosition((int) (offset % this.mappedFileSize));
file.setFlushedPosition((int) (offset % this.mappedFileSize));
} else {
file.destroy(1000); // @2
willRemoveFiles.add(file);
}
}
}
this.deleteExpiredFile(willRemoveFiles); // @3
}
该方法主要就是再次遍历所有的 MappedFile,如果无效的 offset 大于 该 consumeque,则无需处理。
如果无效的 offset 小于该文件最大的偏移量,如果 consumequeue 的 offset 大于失效的 offset,则该文件整个删除,如果否,则设置 wrotePosition,commitedPosition,flushedPoisition 的值即可。
由此可见,DefaultMessageStore#recoverConsumeQueue 主要要做的就是先移除非法的offset。
下面代码摘录自:DefaultMessageStore#recover
if (lastExitOK) {
this.commitLog.recoverNormally();
} else {
this.commitLog.recoverAbnormally();
}
lastExitOk 为true,表示abort文件不存在,表示是正常退出,如果abort文件存在,则表示异常退出,
1.2 commitlog正常恢复
CommitLog#recoverNormally commitlog正常恢复与 ConsumeQueue 的恢复差不多的逻辑,就不重复跟踪。
1.3 commitlog异常恢复
CommitLog#recoverAbnormally
public void recoverAbnormally() {
// recover by the minimum time stamp
boolean checkCRConRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRConRecover();
final List mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
// Looking beginning to recover from which file
int index = mappedFiles.size() - 1;
MappedFile mappedFile = null;
for (; index >= 0; index–) { // @1
mappedFile = mappedFiles.get(index);
if (this.isMappedFileMatchedRecover(mappedFile)) {
log.info("recover from this maped file " + mappedFile.getFileName());
break;
}
}
if (index < 0) {
index = 0;
mappedFile = mappedFiles.get(index);
}
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset();
long mappedFileOffset = 0;
while (true) {
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover); // @2
int size = dispatchRequest.getMsgSize();
// Normal data
if (size > 0) {
mappedFileOffset += size;
if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
this.defaultMessageStore.doDispatch(dispatchRequest);
}
} else {
this.defaultMessageStore.doDispatch(dispatchRequest);
}
}
// Intermediate file read error
else if (size == -1) {
log.info("recover physics file end, " + mappedFile.getFileName());
break;
}
// Come the end of the file, switch to the next file
// Since the return 0 representatives met last hole, this can
// not be included in truncate offset
else if (size == 0) {
index++;
if (index >= mappedFiles.size()) {



