栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

源码分析RocketMQ之消费队列、Index索引文件存储结构与存储机制-下篇

源码分析RocketMQ之消费队列、Index索引文件存储结构与存储机制-下篇

this.allocateMappedFileService.shutdown();

}

return result;

}

代码@1:判断 ${ROCKET_HOME}/storepath/abort 文件是否存在,如果文件存在,则返回true,否则返回false,这个文件的作用是什么呢?原来,在DefaultMessageStore 启动时创建,在 shutdown 时删除,也就是如果该文件存在,说明不是正常的关闭。

private boolean isTempFileExist() {

String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());

File file = new File(fileName);

return file.exists();

}

代码@2:延迟消息启动。

代码@3:commitlog文件加载。

代码@4:加载consumerqueue文件。

代码@5:文件存储检测点。

代码@6:索引文件加载。

代码@7:文件检测恢复。

接下来,本文重点分析步骤3-6都是基于物理磁盘上的文件,构建成内存映射文件(MappedFile)。

代码@7:验证commitlog、consumequeue、索引文件直接的一致性检测,也是本文重点分析内容。

1、文件恢复

======

DefaultMessageStore#recover

private void recover(final boolean lastExitOK) {

this.recoverConsumeQueue(); // @1

if (lastExitOK) { // @2

this.commitLog.recoverNormally(); // @21

} else {

this.commitLog.recoverAbnormally(); // @22

}

this.recoverTopicQueueTable(); // @3

}

代码@1:恢复消息队列。

代码@2:如果是正常退出,则按照正常修复;如果是异常退出,则走异常修复逻辑。

代码@3,修复主题队列。

1.1 消息队列恢复 DefaultMessageStore#recoverConsumeQueue


ConsumeQueue#recover

public void recover() {

final List mappedFiles = this.mappedFileQueue.getMappedFiles(); // @1

if (!mappedFiles.isEmpty()) {

int index = mappedFiles.size() - 3;

if (index < 0) // @2

index = 0;

int mappedFileSizeLogics = this.mappedFileSize; // @3 start

MappedFile mappedFile = mappedFiles.get(index);

ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();

long processOffset = mappedFile.getFileFromOffset(); // @3 end

long mappedFileOffset = 0;

long maxExtAddr = 1;

while (true) { //

for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) { // 4 start

long offset = byteBuffer.getLong(); // @5 start

int size = byteBuffe

《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》

【docs.qq.com/doc/DSmxTbFJ1cmN1R2dB】 完整内容开源分享

r.getInt();

long tagsCode = byteBuffer.getLong(); // @5 end

if (offset >= 0 && size > 0) {

mappedFileOffset = i + CQ_STORE_UNIT_SIZE;

this.maxPhysicOffset = offset;

if (isExtAddr(tagsCode)) {

maxExtAddr = tagsCode;

}

} else {

log.info("recover current consume queue file over, " + mappedFile.getFileName() + " "

  • offset + " " + size + " " + tagsCode);

break;

}

} // @4 end

if (mappedFileOffset == mappedFileSizeLogics) { // @6

index++;

if (index >= mappedFiles.size()) {

log.info("recover last consume queue file over, last maped file "

  • mappedFile.getFileName());

break;

} else {

mappedFile = mappedFiles.get(index);

byteBuffer = mappedFile.sliceByteBuffer();

processOffset = mappedFile.getFileFromOffset();

mappedFileOffset = 0;

log.info("recover next consume queue file, " + mappedFile.getFileName());

}

} else {

log.info("recover current consume queue queue over " + mappedFile.getFileName() + " "

  • (processOffset + mappedFileOffset));

break;

}

}

processOffset += mappedFileOffset; // @7

this.mappedFileQueue.setFlushedWhere(processOffset); // @8

this.mappedFileQueue.setCommittedWhere(processOffset); // @9

this.mappedFileQueue.truncateDirtyFiles(processOffset); // @10

if (isExtReadEnable()) {

this.consumeQueueExt.recover();

log.info(“Truncate consume queue extend file by max {}”, maxExtAddr);

this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);

}

}

}

代码@1:获取该消息队列的所有内存映射文件。

代码@2:只从倒数第3个文件开始,这应该是一个经验值。

代码@3 :首先介绍几个局部变量。

  • mappedFileSizeLogics

consumequeue 逻辑大小。

  • mappedFile

该queue对应的内存映射文件。

  • byteBuffer

内存映射文件对应的ByteBuffer。

  • processOffset :

处理的 offset,默认从 consumequeue 中存放的第一个条目开始。

代码@4:循环验证 consumeque 包含条目的有效性(如果offset大于0并且size大于0,则表示是一个有效的条目)

代码@5:读取一个条目的内容。

  • offset :commitlog中的物理偏移量

  • size : 该条消息的消息总长度

  • tagsCode :tag hashcode

如果 offset大于0并且size大于0,则表示是一个有效的条目,设置 consumequeue 中有效的 mappedFileOffset ,继续下一个条目的验证,如果发现不正常的条目,则跳出循环。

代码@6:如果该 consumeque 文件中所有条目全部有效,则继续验证下一个文件,(index++),如果发现条目不合法,后面的文件不需要再检测。

代码@7,:processOffset 代表了当前 consuemque 有效的偏移量。

代码8,@9:设置 flushedWhere,committedWhere 为当前有效的偏移量。

代码@10:截断无效的consumeque文件。

public void truncateDirtyFiles(long offset) {

List willRemoveFiles = new ArrayList();

for (MappedFile file : this.mappedFiles) {

long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;

if (fileTailOffset > offset) { // @1

if (offset >= file.getFileFromOffset()) {

file.setWrotePosition((int) (offset % this.mappedFileSize));

file.setCommittedPosition((int) (offset % this.mappedFileSize));

file.setFlushedPosition((int) (offset % this.mappedFileSize));

} else {

file.destroy(1000); // @2

willRemoveFiles.add(file);

}

}

}

this.deleteExpiredFile(willRemoveFiles); // @3

}

该方法主要就是再次遍历所有的 MappedFile,如果无效的 offset 大于 该 consumeque,则无需处理。

如果无效的 offset 小于该文件最大的偏移量,如果 consumequeue 的 offset 大于失效的 offset,则该文件整个删除,如果否,则设置 wrotePosition,commitedPosition,flushedPoisition 的值即可。

由此可见,DefaultMessageStore#recoverConsumeQueue 主要要做的就是先移除非法的offset。

下面代码摘录自:DefaultMessageStore#recover

if (lastExitOK) {

this.commitLog.recoverNormally();

} else {

this.commitLog.recoverAbnormally();

}

lastExitOk 为true,表示abort文件不存在,表示是正常退出,如果abort文件存在,则表示异常退出,

1.2 commitlog正常恢复


CommitLog#recoverNormally commitlog正常恢复与 ConsumeQueue 的恢复差不多的逻辑,就不重复跟踪。

1.3 commitlog异常恢复


CommitLog#recoverAbnormally

public void recoverAbnormally() {

// recover by the minimum time stamp

boolean checkCRConRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRConRecover();

final List mappedFiles = this.mappedFileQueue.getMappedFiles();

if (!mappedFiles.isEmpty()) {

// Looking beginning to recover from which file

int index = mappedFiles.size() - 1;

MappedFile mappedFile = null;

for (; index >= 0; index–) { // @1

mappedFile = mappedFiles.get(index);

if (this.isMappedFileMatchedRecover(mappedFile)) {

log.info("recover from this maped file " + mappedFile.getFileName());

break;

}

}

if (index < 0) {

index = 0;

mappedFile = mappedFiles.get(index);

}

ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();

long processOffset = mappedFile.getFileFromOffset();

long mappedFileOffset = 0;

while (true) {

DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover); // @2

int size = dispatchRequest.getMsgSize();

// Normal data

if (size > 0) {

mappedFileOffset += size;

if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {

if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {

this.defaultMessageStore.doDispatch(dispatchRequest);

}

} else {

this.defaultMessageStore.doDispatch(dispatchRequest);

}

}

// Intermediate file read error

else if (size == -1) {

log.info("recover physics file end, " + mappedFile.getFileName());

break;

}

// Come the end of the file, switch to the next file

// Since the return 0 representatives met last hole, this can

// not be included in truncate offset

else if (size == 0) {

index++;

if (index >= mappedFiles.size()) {

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/680136.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号