diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index d3caf8d84a8f317e619e6c9b14d07962385740e7..c83d19633a7d6a362ecc808120abea4ee997d19f 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; + import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; @@ -65,7 +66,7 @@ public class CommitLog { public CommitLog(final DefaultMessageStore defaultMessageStore) { this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(), - defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService()); + defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService()); this.defaultMessageStore = defaultMessageStore; if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { @@ -128,10 +129,10 @@ public class CommitLog { } public int deleteExpiredFile( - final long expiredTime, - final int deleteFilesInterval, - final long intervalForcibly, - final boolean cleanImmediately + final long expiredTime, + final int deleteFilesInterval, + final long intervalForcibly, + final boolean cleanImmediately ) { return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately); } @@ -158,7 +159,7 @@ public class CommitLog { /** * When the normal exit, data recovery, all memory data have been flush */ - public void recoverNormally() { + public void recoverNormally(long maxPhyOffsetOfConsumeQueue) { boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover(); final List mappedFiles = this.mappedFileQueue.getMappedFiles(); if (!mappedFiles.isEmpty()) { @@ -171,6 +172,7 @@ public class CommitLog { ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); long processOffset = mappedFile.getFileFromOffset(); long mappedFileOffset = 0; + boolean isDamaged = false; while (true) { DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover); int size = dispatchRequest.getMsgSize(); @@ -198,6 +200,7 @@ public class CommitLog { // Intermediate file read error else if (!dispatchRequest.isSuccess()) { log.info("recover physics file end, " + mappedFile.getFileName()); + isDamaged = true; break; } } @@ -206,6 +209,12 @@ public class CommitLog { this.mappedFileQueue.setFlushedWhere(processOffset); this.mappedFileQueue.setCommittedWhere(processOffset); this.mappedFileQueue.truncateDirtyFiles(processOffset); + + // Clear ConsumeQueue redundant data + if (isDamaged && maxPhyOffsetOfConsumeQueue >= processOffset) { + log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset); + this.defaultMessageStore.truncateDirtyLogicFiles(processOffset); + } } } @@ -225,7 +234,7 @@ public class CommitLog { * @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure */ public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC, - final boolean readBody) { + final boolean readBody) { try { // 1 TOTAL SIZE int totalSize = byteBuffer.getInt(); @@ -321,7 +330,7 @@ public class CommitLog { if (delayLevel > 0) { tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel, - storeTimestamp); + storeTimestamp); } } } @@ -335,24 +344,24 @@ public class CommitLog { doNothingForDeadCode(byteBuffer1); doNothingForDeadCode(byteBuffer2); log.error( - "[BUG]read total count not equals msg total size. totalSize={}, readTotalCount={}, bodyLen={}, topicLen={}, propertiesLength={}", - totalSize, readLength, bodyLen, topicLen, propertiesLength); + "[BUG]read total count not equals msg total size. totalSize={}, readTotalCount={}, bodyLen={}, topicLen={}, propertiesLength={}", + totalSize, readLength, bodyLen, topicLen, propertiesLength); return new DispatchRequest(totalSize, false/* success */); } return new DispatchRequest( - topic, - queueId, - physicOffset, - totalSize, - tagsCode, - storeTimestamp, - queueOffset, - keys, - uniqKey, - sysFlag, - preparedTransactionOffset, - propertiesMap + topic, + queueId, + physicOffset, + totalSize, + tagsCode, + storeTimestamp, + queueOffset, + keys, + uniqKey, + sysFlag, + preparedTransactionOffset, + propertiesMap ); } catch (Exception e) { } @@ -362,23 +371,23 @@ public class CommitLog { private static int calMsgLength(int bodyLength, int topicLength, int propertiesLength) { final int msgLen = 4 //TOTALSIZE - + 4 //MAGICCODE - + 4 //BODYCRC - + 4 //QUEUEID - + 4 //FLAG - + 8 //QUEUEOFFSET - + 8 //PHYSICALOFFSET - + 4 //SYSFLAG - + 8 //BORNTIMESTAMP - + 8 //BORNHOST - + 8 //STORETIMESTAMP - + 8 //STOREHOSTADDRESS - + 4 //RECONSUMETIMES - + 8 //Prepared Transaction Offset - + 4 + (bodyLength > 0 ? bodyLength : 0) //BODY - + 1 + topicLength //TOPIC - + 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength - + 0; + + 4 //MAGICCODE + + 4 //BODYCRC + + 4 //QUEUEID + + 4 //FLAG + + 8 //QUEUEOFFSET + + 8 //PHYSICALOFFSET + + 4 //SYSFLAG + + 8 //BORNTIMESTAMP + + 8 //BORNHOST + + 8 //STORETIMESTAMP + + 8 //STOREHOSTADDRESS + + 4 //RECONSUMETIMES + + 8 //Prepared Transaction Offset + + 4 + (bodyLength > 0 ? bodyLength : 0) //BODY + + 1 + topicLength //TOPIC + + 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength + + 0; return msgLen; } @@ -390,7 +399,7 @@ public class CommitLog { this.confirmOffset = phyOffset; } - public void recoverAbnormally() { + public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) { // recover by the minimum time stamp boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover(); final List mappedFiles = this.mappedFileQueue.getMappedFiles(); @@ -418,41 +427,41 @@ public class CommitLog { DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover); int size = dispatchRequest.getMsgSize(); - // Normal data - if (size > 0) { - mappedFileOffset += size; + if (dispatchRequest.isSuccess()) { + // Normal data + if (size > 0) { + mappedFileOffset += size; - if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) { - if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) { + if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) { + if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) { + this.defaultMessageStore.doDispatch(dispatchRequest); + } + } else { this.defaultMessageStore.doDispatch(dispatchRequest); } - } else { - this.defaultMessageStore.doDispatch(dispatchRequest); } - } - // Intermediate file read error - else if (size == -1) { + // Come the end of the file, switch to the next file + // Since the return 0 representatives met last hole, this can + // not be included in truncate offset + else if (size == 0) { + index++; + if (index >= mappedFiles.size()) { + // The current branch under normal circumstances should + // not happen + log.info("recover physics file over, last mapped file " + mappedFile.getFileName()); + break; + } else { + mappedFile = mappedFiles.get(index); + byteBuffer = mappedFile.sliceByteBuffer(); + processOffset = mappedFile.getFileFromOffset(); + mappedFileOffset = 0; + log.info("recover next physics file, " + mappedFile.getFileName()); + } + } + } else { log.info("recover physics file end, " + mappedFile.getFileName()); break; } - // Come the end of the file, switch to the next file - // Since the return 0 representatives met last hole, this can - // not be included in truncate offset - else if (size == 0) { - index++; - if (index >= mappedFiles.size()) { - // The current branch under normal circumstances should - // not happen - log.info("recover physics file over, last mapped file " + mappedFile.getFileName()); - break; - } else { - mappedFile = mappedFiles.get(index); - byteBuffer = mappedFile.sliceByteBuffer(); - processOffset = mappedFile.getFileFromOffset(); - mappedFileOffset = 0; - log.info("recover next physics file, " + mappedFile.getFileName()); - } - } } processOffset += mappedFileOffset; @@ -461,7 +470,10 @@ public class CommitLog { this.mappedFileQueue.truncateDirtyFiles(processOffset); // Clear ConsumeQueue redundant data - this.defaultMessageStore.truncateDirtyLogicFiles(processOffset); + if (maxPhyOffsetOfConsumeQueue >= processOffset) { + log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset); + this.defaultMessageStore.truncateDirtyLogicFiles(processOffset); + } } // Commitlog case files are deleted else { @@ -485,18 +497,18 @@ public class CommitLog { } if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable() - && this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) { + && this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) { if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) { log.info("find check timestamp, {} {}", - storeTimestamp, - UtilAll.timeMillisToHumanString(storeTimestamp)); + storeTimestamp, + UtilAll.timeMillisToHumanString(storeTimestamp)); return true; } } else { if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) { log.info("find check timestamp, {} {}", - storeTimestamp, - UtilAll.timeMillisToHumanString(storeTimestamp)); + storeTimestamp, + UtilAll.timeMillisToHumanString(storeTimestamp)); return true; } } @@ -532,7 +544,7 @@ public class CommitLog { final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE - || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { + || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // Delay Delivery if (msg.getDelayTimeLevel() > 0) { if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { @@ -638,7 +650,7 @@ public class CommitLog { boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); if (!flushOK) { log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() - + " client address: " + messageExt.getBornHostString()); + + " client address: " + messageExt.getBornHostString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); } } else { @@ -665,10 +677,10 @@ public class CommitLog { service.putRequest(request); service.getWaitNotifyObject().wakeupAll(); boolean flushOK = - request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); + request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); if (!flushOK) { log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: " - + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString()); + + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT); } } @@ -905,7 +917,7 @@ public class CommitLog { int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages(); int commitDataThoroughInterval = - CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval(); + CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval(); long begin = System.currentTimeMillis(); if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) { @@ -954,7 +966,7 @@ public class CommitLog { int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages(); int flushPhysicQueueThoroughInterval = - CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval(); + CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval(); boolean printFlushProgress = false; @@ -1175,7 +1187,7 @@ public class CommitLog { } public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, - final MessageExtBrokerInner msgInner) { + final MessageExtBrokerInner msgInner) { // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET
// PHY OFFSET @@ -1215,7 +1227,7 @@ public class CommitLog { * Serialize message */ final byte[] propertiesData = - msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); + msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); final int propertiesLength = propertiesData == null ? 0 : propertiesData.length; @@ -1234,7 +1246,7 @@ public class CommitLog { // Exceeds the maximum message if (msgLen > this.maxMessageSize) { CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength - + ", maxMessageSize: " + this.maxMessageSize); + + ", maxMessageSize: " + this.maxMessageSize); return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED); } @@ -1250,7 +1262,7 @@ public class CommitLog { final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank); return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(), - queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); + queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); } // Initialization of storage space @@ -1303,7 +1315,7 @@ public class CommitLog { byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen); AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId, - msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); + msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); switch (tranType) { case MessageSysFlag.TRANSACTION_PREPARED_TYPE: @@ -1321,7 +1333,7 @@ public class CommitLog { } public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, - final MessageExtBatch messageExtBatch) { + final MessageExtBatch messageExtBatch) { byteBuffer.mark(); //physical offset long wroteOffset = fileFromOffset + byteBuffer.position(); @@ -1353,7 +1365,7 @@ public class CommitLog { // Exceeds the maximum message if (msgLen > this.maxMessageSize) { CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLen - + ", maxMessageSize: " + this.maxMessageSize); + + ", maxMessageSize: " + this.maxMessageSize); return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED); } totalMsgLen += msgLen; @@ -1371,7 +1383,7 @@ public class CommitLog { byteBuffer.reset(); //ignore the previous appended messages byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8); return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgIdBuilder.toString(), messageExtBatch.getStoreTimestamp(), - beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); + beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); } //move to add queue offset and commitlog offset messagesByteBuff.position(msgPos + 20); @@ -1395,7 +1407,7 @@ public class CommitLog { byteBuffer.put(messagesByteBuff); messageExtBatch.setEncodedBuff(null); AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, totalMsgLen, msgIdBuilder.toString(), - messageExtBatch.getStoreTimestamp(), beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); + messageExtBatch.getStoreTimestamp(), beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); result.setMsgNum(msgNum); CommitLog.this.topicQueueTable.put(key, queueOffset); @@ -1454,7 +1466,7 @@ public class CommitLog { // Exceeds the maximum message if (msgLen > this.maxMessageSize) { CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLen - + ", maxMessageSize: " + this.maxMessageSize); + + ", maxMessageSize: " + this.maxMessageSize); throw new RuntimeException("message size exceeded"); } diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 1ade7c2838e0382eb8223646fd6ad31a74b77def..200f8ad51114ae4aaa740737eab79e84b6036837 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -35,6 +35,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; + import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ServiceThread; @@ -93,7 +94,7 @@ public class DefaultMessageStore implements MessageStore { private final SystemClock systemClock = new SystemClock(); private final ScheduledExecutorService scheduledExecutorService = - Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread")); + Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread")); private final BrokerStatsManager brokerStatsManager; private final MessageArrivingListener messageArrivingListener; private final BrokerConfig brokerConfig; @@ -113,7 +114,7 @@ public class DefaultMessageStore implements MessageStore { boolean shutDownNormal = false; public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager, - final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException { + final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException { this.messageArrivingListener = messageArrivingListener; this.brokerConfig = brokerConfig; this.messageStoreConfig = messageStoreConfig; @@ -184,7 +185,7 @@ public class DefaultMessageStore implements MessageStore { if (result) { this.storeCheckpoint = - new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir())); + new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir())); this.indexService.load(lastExitOK); @@ -437,8 +438,8 @@ public class DefaultMessageStore implements MessageStore { } public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, - final int maxMsgNums, - final MessageFilter messageFilter) { + final int maxMsgNums, + final MessageFilter messageFilter) { if (this.shutdown) { log.warn("message store has shutdown, so getMessage is forbidden"); return null; @@ -509,7 +510,7 @@ public class DefaultMessageStore implements MessageStore { boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy); if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(), - isInDisk)) { + isInDisk)) { break; } @@ -521,13 +522,13 @@ public class DefaultMessageStore implements MessageStore { } else { // can't find ext content.Client will filter messages by tag also. log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}", - tagsCode, offsetPy, sizePy, topic, group); + tagsCode, offsetPy, sizePy, topic, group); isTagsCodeLegal = false; } } if (messageFilter != null - && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) { + && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.NO_MATCHED_MESSAGE; } @@ -546,7 +547,7 @@ public class DefaultMessageStore implements MessageStore { } if (messageFilter != null - && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) { + && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.NO_MATCHED_MESSAGE; } @@ -570,7 +571,7 @@ public class DefaultMessageStore implements MessageStore { long diff = maxOffsetPy - maxPhyOffsetPulling; long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE - * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0)); + * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0)); getResult.setSuggestPullingFromSlave(diff > memory); } finally { @@ -580,7 +581,7 @@ public class DefaultMessageStore implements MessageStore { status = GetMessageStatus.OFFSET_FOUND_NULL; nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset)); log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: " - + maxOffset + ", but access logic queue failed."); + + maxOffset + ", but access logic queue failed."); } } } else { @@ -910,8 +911,8 @@ public class DefaultMessageStore implements MessageStore { for (ConsumeQueue cq : queueTable.values()) { cq.destroy(); log.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned", - cq.getTopic(), - cq.getQueueId() + cq.getTopic(), + cq.getQueueId() ); this.commitLog.removeQueueFromTopicQueueTable(cq.getTopic(), cq.getQueueId()); @@ -941,20 +942,20 @@ public class DefaultMessageStore implements MessageStore { if (maxCLOffsetInConsumeQueue == -1) { log.warn("maybe ConsumeQueue was created just now. topic={} queueId={} maxPhysicOffset={} minLogicOffset={}.", - nextQT.getValue().getTopic(), - nextQT.getValue().getQueueId(), - nextQT.getValue().getMaxPhysicOffset(), - nextQT.getValue().getMinLogicOffset()); + nextQT.getValue().getTopic(), + nextQT.getValue().getQueueId(), + nextQT.getValue().getMaxPhysicOffset(), + nextQT.getValue().getMinLogicOffset()); } else if (maxCLOffsetInConsumeQueue < minCommitLogOffset) { log.info( - "cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}", - topic, - nextQT.getKey(), - minCommitLogOffset, - maxCLOffsetInConsumeQueue); + "cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}", + topic, + nextQT.getKey(), + minCommitLogOffset, + maxCLOffsetInConsumeQueue); DefaultMessageStore.this.commitLog.removeQueueFromTopicQueueTable(nextQT.getValue().getTopic(), - nextQT.getValue().getQueueId()); + nextQT.getValue().getQueueId()); nextQT.getValue().destroy(); itQT.remove(); @@ -970,7 +971,7 @@ public class DefaultMessageStore implements MessageStore { } public Map getMessageIds(final String topic, final int queueId, long minOffset, long maxOffset, - SocketAddress storeHost) { + SocketAddress storeHost) { Map messageIds = new HashMap(); if (this.shutdown) { return messageIds; @@ -995,7 +996,7 @@ public class DefaultMessageStore implements MessageStore { long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); final ByteBuffer msgIdMemory = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH); String msgId = - MessageDecoder.createMessageId(msgIdMemory, MessageExt.socketAddress2ByteBuffer(storeHost), offsetPy); + MessageDecoder.createMessageId(msgIdMemory, MessageExt.socketAddress2ByteBuffer(storeHost), offsetPy); messageIds.put(msgId, nextOffset++); if (nextOffset > maxOffset) { return messageIds; @@ -1091,11 +1092,11 @@ public class DefaultMessageStore implements MessageStore { ConsumeQueue logic = map.get(queueId); if (null == logic) { ConsumeQueue newLogic = new ConsumeQueue( - topic, - queueId, - StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), - this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), - this); + topic, + queueId, + StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), + this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), + this); ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic); if (oldLogic != null) { logic = oldLogic; @@ -1195,7 +1196,7 @@ public class DefaultMessageStore implements MessageStore { String stack = UtilAll.jstack(); final String fileName = System.getProperty("user.home") + File.separator + "debug/lock/stack-" - + DefaultMessageStore.this.commitLog.getBeginTimeInLock() + "-" + lockTime; + + DefaultMessageStore.this.commitLog.getBeginTimeInLock() + "-" + lockTime; MixAll.string2FileNotSafe(stack, fileName); } } @@ -1256,11 +1257,11 @@ public class DefaultMessageStore implements MessageStore { continue; } ConsumeQueue logic = new ConsumeQueue( - topic, - queueId, - StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), - this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), - this); + topic, + queueId, + StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), + this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), + this); this.putConsumeQueue(topic, queueId, logic); if (!logic.load()) { return false; @@ -1276,12 +1277,12 @@ public class DefaultMessageStore implements MessageStore { } private void recover(final boolean lastExitOK) { - this.recoverConsumeQueue(); + long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue(); if (lastExitOK) { - this.commitLog.recoverNormally(); + this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue); } else { - this.commitLog.recoverAbnormally(); + this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue); } this.recoverTopicQueueTable(); @@ -1306,12 +1307,18 @@ public class DefaultMessageStore implements MessageStore { } } - private void recoverConsumeQueue() { + private long recoverConsumeQueue() { + long maxPhysicOffset = -1; for (ConcurrentMap maps : this.consumeQueueTable.values()) { for (ConsumeQueue logic : maps.values()) { logic.recover(); + if (logic.getMaxPhysicOffset() > maxPhysicOffset) { + maxPhysicOffset = logic.getMaxPhysicOffset(); + } } } + + return maxPhysicOffset; } private void recoverTopicQueueTable() { @@ -1438,10 +1445,10 @@ public class DefaultMessageStore implements MessageStore { private final static int MAX_MANUAL_DELETE_FILE_TIMES = 20; private final double diskSpaceWarningLevelRatio = - Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "0.90")); + Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "0.90")); private final double diskSpaceCleanForciblyRatio = - Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio", "0.85")); + Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio", "0.85")); private long lastRedeleteTimestamp = 0; private volatile int manualDeleteFileSeveralTimes = 0; @@ -1481,16 +1488,16 @@ public class DefaultMessageStore implements MessageStore { boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately; log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}", - fileReservedTime, - timeup, - spacefull, - manualDeleteFileSeveralTimes, - cleanAtOnce); + fileReservedTime, + timeup, + spacefull, + manualDeleteFileSeveralTimes, + cleanAtOnce); fileReservedTime *= 60 * 60 * 1000; deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval, - destroyMapedFileIntervalForcibly, cleanAtOnce); + destroyMapedFileIntervalForcibly, cleanAtOnce); if (deleteCount > 0) { } else if (spacefull) { log.warn("disk space will be full soon, but delete file failed."); @@ -1504,7 +1511,7 @@ public class DefaultMessageStore implements MessageStore { if ((currentTimestamp - this.lastRedeleteTimestamp) > interval) { this.lastRedeleteTimestamp = currentTimestamp; int destroyMapedFileIntervalForcibly = - DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly(); + DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly(); if (DefaultMessageStore.this.commitLog.retryDeleteFirstFile(destroyMapedFileIntervalForcibly)) { } } @@ -1556,7 +1563,7 @@ public class DefaultMessageStore implements MessageStore { { String storePathLogics = StorePathConfigHelper - .getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir()); + .getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir()); double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics); if (logicsRatio > diskSpaceWarningLevelRatio) { boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull(); @@ -1726,7 +1733,7 @@ public class DefaultMessageStore implements MessageStore { if (this.isCommitLogAvailable()) { log.warn("shutdown ReputMessageService, but commitlog have not finish to be dispatched, CL: {} reputFromOffset: {}", - DefaultMessageStore.this.commitLog.getMaxOffset(), this.reputFromOffset); + DefaultMessageStore.this.commitLog.getMaxOffset(), this.reputFromOffset); } super.shutdown(); @@ -1744,7 +1751,7 @@ public class DefaultMessageStore implements MessageStore { for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) { if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() - && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) { + && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) { break; } @@ -1755,7 +1762,7 @@ public class DefaultMessageStore implements MessageStore { for (int readSize = 0; readSize < result.getSize() && doNext; ) { DispatchRequest dispatchRequest = - DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); + DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); int size = dispatchRequest.getMsgSize(); if (dispatchRequest.isSuccess()) { @@ -1763,21 +1770,21 @@ public class DefaultMessageStore implements MessageStore { DefaultMessageStore.this.doDispatch(dispatchRequest); if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() - && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) { + && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) { DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(), - dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, - dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), - dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); + dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, + dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), + dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); } this.reputFromOffset += size; readSize += size; if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) { DefaultMessageStore.this.storeStatsService - .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet(); + .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet(); DefaultMessageStore.this.storeStatsService - .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()) - .addAndGet(dispatchRequest.getMsgSize()); + .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()) + .addAndGet(dispatchRequest.getMsgSize()); } } else if (size == 0) { this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset); @@ -1792,7 +1799,7 @@ public class DefaultMessageStore implements MessageStore { doNext = false; if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) { log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}", - this.reputFromOffset); + this.reputFromOffset); this.reputFromOffset += result.getSize() - readSize; } diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java index 20f94f09a0593c0f319b1af06763af22e34bf12c..57b6999c43795781ae20761e221ea897d32cb6a0 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java @@ -18,9 +18,12 @@ package org.apache.rocketmq.store; import java.io.File; +import java.io.RandomAccessFile; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; import java.nio.channels.OverlappingFileLockException; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; @@ -29,6 +32,7 @@ import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.store.config.StorePathConfigHelper; import org.junit.After; import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.junit.Before; @@ -171,6 +175,120 @@ public class DefaultMessageStoreTest { assertThat(getMessageResult45.getMessageBufferList().size()).isEqualTo(10); } + @Test + public void testRecover() throws Exception { + String topic = "recoverTopic"; + MessageBody = StoreMessage.getBytes(); + for (int i = 0; i < 100; i++) { + MessageExtBrokerInner messageExtBrokerInner = buildMessage(); + messageExtBrokerInner.setTopic(topic); + messageExtBrokerInner.setQueueId(0); + messageStore.putMessage(messageExtBrokerInner); + } + + Thread.sleep(100);//wait for build consumer queue + long maxPhyOffset = messageStore.getMaxPhyOffset(); + long maxCqOffset = messageStore.getMaxOffsetInQueue(topic, 0); + + //1.just reboot + messageStore.shutdown(); + messageStore = buildMessageStore(); + boolean load = messageStore.load(); + assertTrue(load); + messageStore.start(); + assertTrue(maxPhyOffset == messageStore.getMaxPhyOffset()); + assertTrue(maxCqOffset == messageStore.getMaxOffsetInQueue(topic, 0)); + + //2.damage commitlog and reboot normal + for (int i = 0; i < 100; i++) { + MessageExtBrokerInner messageExtBrokerInner = buildMessage(); + messageExtBrokerInner.setTopic(topic); + messageExtBrokerInner.setQueueId(0); + messageStore.putMessage(messageExtBrokerInner); + } + Thread.sleep(100); + long secondLastPhyOffset = messageStore.getMaxPhyOffset(); + long secondLastCqOffset = messageStore.getMaxOffsetInQueue(topic, 0); + + MessageExtBrokerInner messageExtBrokerInner = buildMessage(); + messageExtBrokerInner.setTopic(topic); + messageExtBrokerInner.setQueueId(0); + messageStore.putMessage(messageExtBrokerInner); + + messageStore.shutdown(); + + //damage last message + damageCommitlog(secondLastPhyOffset); + + //reboot + messageStore = buildMessageStore(); + load = messageStore.load(); + assertTrue(load); + messageStore.start(); + assertTrue(secondLastPhyOffset == messageStore.getMaxPhyOffset()); + assertTrue(secondLastCqOffset == messageStore.getMaxOffsetInQueue(topic, 0)); + + //3.damage commitlog and reboot abnormal + for (int i = 0; i < 100; i++) { + messageExtBrokerInner = buildMessage(); + messageExtBrokerInner.setTopic(topic); + messageExtBrokerInner.setQueueId(0); + messageStore.putMessage(messageExtBrokerInner); + } + Thread.sleep(100); + secondLastPhyOffset = messageStore.getMaxPhyOffset(); + secondLastCqOffset = messageStore.getMaxOffsetInQueue(topic, 0); + + messageExtBrokerInner = buildMessage(); + messageExtBrokerInner.setTopic(topic); + messageExtBrokerInner.setQueueId(0); + messageStore.putMessage(messageExtBrokerInner); + messageStore.shutdown(); + + //damage last message + damageCommitlog(secondLastPhyOffset); + //add abort file + String fileName = StorePathConfigHelper.getAbortFile(((DefaultMessageStore) messageStore).getMessageStoreConfig().getStorePathRootDir()); + File file = new File(fileName); + MappedFile.ensureDirOK(file.getParent()); + file.createNewFile(); + + messageStore = buildMessageStore(); + load = messageStore.load(); + assertTrue(load); + messageStore.start(); + assertTrue(secondLastPhyOffset == messageStore.getMaxPhyOffset()); + assertTrue(secondLastCqOffset == messageStore.getMaxOffsetInQueue(topic, 0)); + + //message write again + for (int i = 0; i < 100; i++) { + messageExtBrokerInner = buildMessage(); + messageExtBrokerInner.setTopic(topic); + messageExtBrokerInner.setQueueId(0); + messageStore.putMessage(messageExtBrokerInner); + } + } + + private void damageCommitlog(long offset) throws Exception { + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + File file = new File(messageStoreConfig.getStorePathCommitLog() + File.separator + "00000000000000000000"); + + FileChannel fileChannel = new RandomAccessFile(file, "rw").getChannel(); + MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 10); + + int bodyLen = mappedByteBuffer.getInt((int) offset + 84); + int topicLenIndex = (int) offset + 84 + bodyLen + 4; + mappedByteBuffer.position(topicLenIndex); + mappedByteBuffer.putInt(0); + mappedByteBuffer.putInt(0); + mappedByteBuffer.putInt(0); + mappedByteBuffer.putInt(0); + + mappedByteBuffer.force(); + fileChannel.force(true); + fileChannel.close(); + } + private class MyMessageArrivingListener implements MessageArrivingListener { @Override public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,