杭州政府网站建设潍坊今日头条新闻
消息储存服务加载
入口:org.apache.rocketmq.store.DefaultMessageStore#load
在创建brokerContriller时会调用初始化方法初始化brokerController,在初始化方法中会进行消息储存服务的加载
this.messageStore.load();
加载方法主要是加载一些必要的参数和数据,如配置文件、消费进度、commitlog文件、consumequeu文件等
加载commitlog等文件,实际上就是创建MappedFile对象,MappedFile会创建与物理文件之间的映射(buffer)与通道(channel)
除此之外, 还会恢复commitlog文件、consumequeue文件, 恢复是指, 检索文件中的每一条消息, 查看是否有效: commitlog中的消息是否符合规范 , consumeQueue中的数据是否符合规范每条是否为20字节。因为消息、数据都是从文件最后开始添加的,故只需文件尾部不符合规范的数据删除掉,即可。
在broker启动的时候会在broker储存目录下创建一个abort文件, 正常退出时则会删除abort文件,broker启动的时候会判断是否存在这个文件来判断是否是正常退出,还是意外宕机。
是否正常退出, 只关系到broker恢复commitlog文件的过程:
若为正常退出,则从倒数第三个文件开始向后检索,若不足三个则从第一个开始向后遍历。找到第一条错误数据,删除其之后的数据,并重新设置刷盘点
若为异常退出,则从最后一个文件开始,向前根据文件魔数等找到一个正常的文件,从该文件开始向后遍历,正常的消息重新发送到consumequeque和indexFile。若为错误数据,则删除其之后的数据,并重新设置刷盘点
同时,两种恢复路径的最后,都会以恢复的commitlog文件的最大有效消息的偏移量为准,来删除consumequeue那边的无效数据
public boolean load() {boolean result = true;try {// 通过判断 abort 文件是否存在,判断是否上次为正常退出。不存在 - 正常boolean lastExitOK = !this.isTempFileExist();log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");// 加载延时队列,延时消息, 包括延时等级、配置文件if (null != scheduleMessageService) {result = result && this.scheduleMessageService.load();}// load Commit Log// 加载commitlog mappedFile(commitlog文件夹下的)result = result && this.commitLog.load();// load Consume Queue// 加载consumeQueue mappedFile, (store\consumequeue目录下的)result = result && this.loadConsumeQueue();if (result) {this.storeCheckpoint =new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));this.indexService.load(lastExitOK);// 恢复文件this.recover(lastExitOK);log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());}} catch (Exception e) {log.error("load exception", e);result = false;}if (!result) {this.allocateMappedFileService.shutdown();}return result;
}
加载commitlog文件
加载commitlog文件: org.apache.rocketmq.store.CommitLog#load
public boolean load() {// commitlog文件在代码是一个MappedFile,通过MappedFileQueue进行管理// consumequeue、indexFile文件同样也是MappedFileboolean result = this.mappedFileQueue.load();log.info("load commit log " + (result ? "OK" : "Failed"));return result;
}
public boolean load() {// 遍历文件夹下面的文件File dir = new File(this.storePath);File[] files = dir.listFiles();if (files != null) {// ascending order// 按文件名从大到小排序,rocketmq种的文件都是以数字、文件的最小偏移量命名Arrays.sort(files);for (File file : files) {if (file.length() != this.mappedFileSize) {log.warn(file + "\t" + file.length()+ " length not matched message store config value, please check it manually");return false;}try {// 实例化mappedFileMappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);mappedFile.setWrotePosition(this.mappedFileSize);mappedFile.setFlushedPosition(this.mappedFileSize);mappedFile.setCommittedPosition(this.mappedFileSize);this.mappedFiles.add(mappedFile);log.info("load " + file.getPath() + " OK");} catch (IOException e) {log.error("load file " + file + " error", e);return false;}}}return true;
}
构造MappedFile
public MappedFile(final String fileName, final int fileSize) throws IOException {// 根据文件名和文件大小初始化mappedFileinit(fileName, fileSize);
}
javaprivate void init(final String fileName, final int fileSize) throws IOException {this.fileName = fileName;this.fileSize = fileSize;this.file = new File(fileName);this.fileFromOffset = Long.parseLong(this.file.getName());boolean ok = false;ensureDirOK(this.file.getParent());try {// 根据文件创建channelthis.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);// 文件总内存计数增加TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);// 文件总数计数增加TOTAL_MAPPED_FILES.incrementAndGet();ok = true;} catch (FileNotFoundException e) {log.error("Failed to create file " + this.fileName, e);throw e;} catch (IOException e) {log.error("Failed to map file " + this.fileName, e);throw e;} finally {if (!ok && this.fileChannel != null) {this.fileChannel.close();}}
}
恢复文件
恢复文件: org.apache.rocketmq.store.DefaultMessageStore#recover
private void recover(final boolean lastExitOK) {/*** maxPhyOffsetOfConsumeQueue 最大的有效的commitlog消息物理偏移量,就是指的最后一个有效条目中保存的commitlog文件中的物理偏移量,** 文件组自身的最大有效数据偏移量processOffset,就是指的最后一个有效条目在自身文件组中的偏移量* (比如:queueID文件夹下的连续的文件,或commitlog目录下的连续commitlog文件)。*/// 恢复ConsumeQueue文件,并返回consumeQueue最大有效commitlog文件偏移量long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();if (lastExitOK) {// 正常退出,恢复文件this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);} else {// 异常退出,恢复文件this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);}// 恢复topicQueueTablethis.recoverTopicQueueTable();
}
恢复consumequeue文件
org.apache.rocketmq.store.DefaultMessageStore#recoverConsumeQueue
恢复consumequeue文件,会遍历每个topic,然后遍历topic下的所有queue的文件夹,然后以consumequeue文件夹为维度恢复
从倒数第三个文件开始恢复,依次检查每条数据,是否符合8字节偏移量、4字节消息大小、8字节tag的规则,找到异常数据,则删除其之后的数据。
private long recoverConsumeQueue() {long maxPhysicOffset = -1;// 遍历topicfor (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {// 遍历topic下的queuefor (ConsumeQueue logic : maps.values()) {// 恢复这个queueId下的文件logic.recover();if (logic.getMaxPhysicOffset() > maxPhysicOffset) {maxPhysicOffset = logic.getMaxPhysicOffset();}}}// 返回所有queue中消息,在commitLog文件中的最大的偏移量// 也就是,commitLog同步到queue中的最后一条消息的偏移量return maxPhysicOffset;
}
public void recover() {final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();if (!mappedFiles.isEmpty()) {// 从倒数第三个开始,如果少于三个就从第一个开始int index = mappedFiles.size() - 3;if (index < 0)index = 0;int mappedFileSizeLogics = this.mappedFileSize;MappedFile mappedFile = mappedFiles.get(index);ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();long processOffset = mappedFile.getFileFromOffset();long mappedFileOffset = 0;long maxExtAddr = 1;while (true) {// 循环每一条消息索引,大小为 20 字节for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {// 这条在commitLog文件中的的偏移量long offset = byteBuffer.getLong();// 大小int size = byteBuffer.getInt();long tagsCode = byteBuffer.getLong();// 如果offset和size都大于0则表示当前条目有效if (offset >= 0 && size > 0) {mappedFileOffset = i + CQ_STORE_UNIT_SIZE;this.maxPhysicOffset = offset + size;if (isExtAddr(tagsCode)) {maxExtAddr = tagsCode;}} else {log.info("recover current consume queue file over, " + mappedFile.getFileName() + " "+ offset + " " + size + " " + tagsCode);break;}}// 如果当前ConsumeQueue文件中的有效数据偏移量和文件大小一样,则表示该ConsumeQueue文件的所有条目都是有效的if (mappedFileOffset == mappedFileSizeLogics) {// 下一个文件index++;// 如果是最后一个了,就退出if (index >= mappedFiles.size()) {log.info("recover last consume queue file over, last mapped file "+ mappedFile.getFileName());break;} else {// 获取下一个文件的数据mappedFile = mappedFiles.get(index);byteBuffer = mappedFile.sliceByteBuffer();processOffset = mappedFile.getFileFromOffset();mappedFileOffset = 0;log.info("recover next consume queue file, " + mappedFile.getFileName());}} else {log.info("recover current consume queue queue over " + mappedFile.getFileName() + " "+ (processOffset + mappedFileOffset));break;}}// 文件名偏移量 + 这个文件最大有效的数据的偏移量, 就是这个queue最后一条消息的整体的偏移量processOffset += mappedFileOffset;this.mappedFileQueue.setFlushedWhere(processOffset);this.mappedFileQueue.setCommittedWhere(processOffset);// 删除这个偏移量之后的数据this.mappedFileQueue.truncateDirtyFiles(processOffset);if (isExtReadEnable()) {this.consumeQueueExt.recover();log.info("Truncate consume queue extend file by max {}", maxExtAddr);this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);}}
}
正常恢复commitlog文件
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();if (!mappedFiles.isEmpty()) {// Began to recover from the last third fileint index = mappedFiles.size() - 3;if (index < 0)index = 0;MappedFile mappedFile = mappedFiles.get(index);ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();long processOffset = mappedFile.getFileFromOffset();long mappedFileOffset = 0;while (true) {// 生成DispatchRequest,验证本条消息是否合法DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);int size = dispatchRequest.getMsgSize();// Normal data// 正常,且size>0if (dispatchRequest.isSuccess() && size > 0) {// 更新mappedFileOffset的值加上本条消息长度mappedFileOffset += size;}// Come the end of the file, switch to the next file Since the// return 0 representatives met last hole,// this can not be included in truncate offset// 正常,size=0,说明到了文件末尾else if (dispatchRequest.isSuccess() && size == 0) {index++;if (index >= mappedFiles.size()) {// Current branch can not happenlog.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName());break;} else {mappedFile = mappedFiles.get(index);byteBuffer = mappedFile.sliceByteBuffer();processOffset = mappedFile.getFileFromOffset();mappedFileOffset = 0;log.info("recover next physics file, " + mappedFile.getFileName());}}// Intermediate file read error// 消息错误else if (!dispatchRequest.isSuccess()) {log.info("recover physics file end, " + mappedFile.getFileName());break;}}// 即最后一条消息的尾部位置processOffset += mappedFileOffset;// 设置刷盘点this.mappedFileQueue.setFlushedWhere(processOffset);this.mappedFileQueue.setCommittedWhere(processOffset);/** 删除文件最大有效数据偏移量processOffset之后的所有数据*/this.mappedFileQueue.truncateDirtyFiles(processOffset);// Clear ConsumeQueue redundant data// 如果consumequeue那边得到的commitlog最大的有效消息物理偏移量 大于等于 commitlog这边自己得出的自身的最大有效消息物理偏移量// 则以commitlog文件的有效数据为准,再次清除consumequeue文件中的脏数据if (maxPhyOffsetOfConsumeQueue >= processOffset) {log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);}} else {// Commitlog case files are deleted//如果不存在commitlog文件//那么重置刷盘最新位置,提交的最新位置,并且清除所有的consumequeue索引文件log.warn("The commitlog files are deleted, and delete the consume queue files");this.mappedFileQueue.setFlushedWhere(0);this.mappedFileQueue.setCommittedWhere(0);this.defaultMessageStore.destroyLogics();}
}
异常恢复commitlog文件
@Deprecated
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {// recover by the minimum time stampboolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();if (!mappedFiles.isEmpty()) {// Looking beginning to recover from which fileint index = mappedFiles.size() - 1;MappedFile mappedFile = null;// 从最后一个文件开始,从后往前检查、恢复for (; index >= 0; index--) {mappedFile = mappedFiles.get(index);// isMappedFileMatchedRecover 检查index对应的mappedFile文件是否正常if (this.isMappedFileMatchedRecover(mappedFile)) {log.info("recover from this mapped file " + mappedFile.getFileName());break;}}if (index < 0) {index = 0;mappedFile = mappedFiles.get(index);}ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();long processOffset = mappedFile.getFileFromOffset();long mappedFileOffset = 0;/*** 从最后一个正常的文件开始遍历*/while (true) {DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);int size = dispatchRequest.getMsgSize();if (dispatchRequest.isSuccess()) {// Normal dataif (size > 0) {mappedFileOffset += size;// 如果消息允许重复复制,默认为 falseif (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {// 如果 消息物理偏移量 小于 CommitLog的提交指针// 则调用CommitLogDispatcher重新构建当前消息的indexfile索引和consumequeue索引if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {this.defaultMessageStore.doDispatch(dispatchRequest);}} else {this.defaultMessageStore.doDispatch(dispatchRequest);}}// Come the end of the file, switch to the next file// Since the return 0 representatives met last hole, this can// not be included in truncate offset// 如果消息正常但是size为0,表示到了文件的末尾,则尝试跳到下一个commitlog文件进行检测else if (size == 0) {index++;if (index >= mappedFiles.size()) {// The current branch under normal circumstances should// not happenlog.info("recover physics file over, last mapped file " + mappedFile.getFileName());break;} else {mappedFile = mappedFiles.get(index);byteBuffer = mappedFile.sliceByteBuffer();processOffset = mappedFile.getFileFromOffset();mappedFileOffset = 0;log.info("recover next physics file, " + mappedFile.getFileName());}}} else {log.info("recover physics file end, " + mappedFile.getFileName() + " pos=" + byteBuffer.position());break;}}processOffset += mappedFileOffset;this.mappedFileQueue.setFlushedWhere(processOffset);this.mappedFileQueue.setCommittedWhere(processOffset);/** 删除文件最大有效数据偏移量processOffset之后的所有数据*/this.mappedFileQueue.truncateDirtyFiles(processOffset);// Clear ConsumeQueue redundant data// 如果consumequeue那边得到的commitlog最大的有效消息物理偏移量 大于等于 commitlog这边自己得出的自身的最大有效消息物理偏移量// 则以commitlog文件的有效数据为准,再次清除consumequeue文件中的脏数据if (maxPhyOffsetOfConsumeQueue >= processOffset) {log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);}}// Commitlog case files are deletedelse {log.warn("The commitlog files are deleted, and delete the consume queue files");this.mappedFileQueue.setFlushedWhere(0);this.mappedFileQueue.setCommittedWhere(0);this.defaultMessageStore.destroyLogics();}
}