diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java index 655c4de4c74ec8e6310e4ca7f5dce2fef84a59e5..cb6c5f6565cf01ae2372dd6c61e77c98f912da5e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java @@ -234,6 +234,11 @@ public class DLedgerCommitLog extends CommitLog { dLedgerFileStore.load(); if (dLedgerFileList.getMappedFiles().size() > 0) { dLedgerFileStore.recover(); + MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); + if (mappedFile != null) { + dLedgerConfig.setEnableDiskForceClean(false); + dividedCommitlogOffset = mappedFile.getFileFromOffset() + mappedFile.getFileSize(); + } return; } //Indicate that, it is the first time to load mixed commitlog, need to recover the old commitlog @@ -248,7 +253,6 @@ public class DLedgerCommitLog extends CommitLog { if (mappedFile == null) { return; } - dLedgerConfig.setEnableDiskForceClean(false); ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); byteBuffer.position(mappedFile.getWrotePosition()); boolean needWriteMagicCode = true; @@ -260,6 +264,7 @@ public class DLedgerCommitLog extends CommitLog { } else { log.info("Recover old commitlog found a illegal magic code={}", magicCode); } + dLedgerConfig.setEnableDiskForceClean(false); dividedCommitlogOffset = mappedFile.getFileFromOffset() + mappedFile.getFileSize(); log.info("Recover old commitlog needWriteMagicCode={} pos={} file={} dividedCommitlogOffset={}", needWriteMagicCode, mappedFile.getFileFromOffset() + mappedFile.getWrotePosition(), mappedFile.getFileName(), dividedCommitlogOffset); if (needWriteMagicCode) { @@ -465,7 +470,7 @@ public class DLedgerCommitLog extends CommitLog { @Override public SelectMappedBufferResult getMessage(final long offset, final int size) { if (offset < dividedCommitlogOffset) { - return getMessage(offset, size); + return super.getMessage(offset, size); } int mappedFileSize = this.dLedgerServer.getdLedgerConfig().getMappedFileSizeForEntryData(); MmapFile mappedFile = this.dLedgerFileList.findMappedFileByOffset(offset, offset == 0); diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java index 28a1d8ea1015e9831a7015ce7a9d04efb67a845b..47d4b3874a15e0b3f11a5e91429f4429f30897b5 100644 --- a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java @@ -23,7 +23,6 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { String base = createBaseDir(); String peers = String.format("n0-localhost:%d", nextPort()); String group = UUID.randomUUID().toString(); - { DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false); Thread.sleep(1000); diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java index d671287d89cd28ee5bbd584df7988fb319a09c37..bdc9761eb1de0eb00cc5755f2e2fb7e5d23eabfd 100644 --- a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java @@ -3,7 +3,10 @@ package org.apache.rocketmq.store.dledger; import io.openmessaging.storage.dledger.DLedgerServer; import java.io.File; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.store.DefaultMessageStore; +import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.PutMessageResult; @@ -89,4 +92,14 @@ public class MessageStoreTestBase extends StoreTestBase { } } + protected void doGetMessages(MessageStore messageStore, String topic, int queueId, int num, long beginLogicsOffset) { + for (int i = 0; i < num; i++) { + GetMessageResult getMessageResult = messageStore.getMessage("group", topic, 0, beginLogicsOffset + i, 3, null); + Assert.assertNotNull(getMessageResult); + Assert.assertTrue(!getMessageResult.getMessageBufferList().isEmpty()); + MessageExt messageExt = MessageDecoder.decode(getMessageResult.getMessageBufferList().get(0)); + Assert.assertEquals(beginLogicsOffset + i, messageExt.getQueueOffset()); + } + } + } diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest.java b/store/src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest.java index cba9fe9e0fd5ce22e5da792b309aab7618b6e543..5ade1dfd62b6cce3352361b746cbd015c2d7e2de 100644 --- a/store/src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest.java @@ -5,7 +5,7 @@ import org.apache.rocketmq.store.DefaultMessageStore; import org.junit.Assert; import org.junit.Test; -public class MixCommitlogTest extends DLedgerCommitlogTest { +public class MixCommitlogTest extends MessageStoreTestBase { @Test @@ -25,6 +25,7 @@ public class MixCommitlogTest extends DLedgerCommitlogTest { Assert.assertEquals(0, originalStore.dispatchBehindBytes()); dividedOffset = originalStore.getCommitLog().getMaxOffset(); dividedOffset = dividedOffset - dividedOffset % originalStore.getMessageStoreConfig().getMapedFileSizeCommitLog() + originalStore.getMessageStoreConfig().getMapedFileSizeCommitLog(); + doGetMessages(originalStore, topic, 0, 1000, 0); originalStore.shutdown(); } { @@ -33,11 +34,13 @@ public class MixCommitlogTest extends DLedgerCommitlogTest { Assert.assertEquals(0, recoverOriginalStore.getMinOffsetInQueue(topic, 0)); Assert.assertEquals(1000, recoverOriginalStore.getMaxOffsetInQueue(topic, 0)); Assert.assertEquals(0, recoverOriginalStore.dispatchBehindBytes()); + doGetMessages(recoverOriginalStore, topic, 0, 1000, 0); recoverOriginalStore.shutdown(); } { DefaultMessageStore dledgerStore = createDledgerMessageStore(base, group, "n0", peers, null, true); DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) dledgerStore.getCommitLog(); + Assert.assertFalse(dLedgerCommitLog.getdLedgerServer().getdLedgerConfig().isEnableDiskForceClean()); Assert.assertEquals(dividedOffset, dLedgerCommitLog.getDividedCommitlogOffset()); Thread.sleep(2000); doPutMessages(dledgerStore, topic, 0, 1000, 1000); @@ -45,8 +48,23 @@ public class MixCommitlogTest extends DLedgerCommitlogTest { Assert.assertEquals(0, dledgerStore.getMinOffsetInQueue(topic, 0)); Assert.assertEquals(2000, dledgerStore.getMaxOffsetInQueue(topic, 0)); Assert.assertEquals(0, dledgerStore.dispatchBehindBytes()); + doGetMessages(dledgerStore, topic, 0, 2000, 0); dledgerStore.shutdown(); } + { + DefaultMessageStore recoverDledgerStore = createDledgerMessageStore(base, group, "n0", peers, null, true); + DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) recoverDledgerStore.getCommitLog(); + Assert.assertFalse(dLedgerCommitLog.getdLedgerServer().getdLedgerConfig().isEnableDiskForceClean()); + Assert.assertEquals(dividedOffset, dLedgerCommitLog.getDividedCommitlogOffset()); + Thread.sleep(2000); + doPutMessages(recoverDledgerStore, topic, 0, 1000, 2000); + Thread.sleep(500); + Assert.assertEquals(0, recoverDledgerStore.getMinOffsetInQueue(topic, 0)); + Assert.assertEquals(3000, recoverDledgerStore.getMaxOffsetInQueue(topic, 0)); + Assert.assertEquals(0, recoverDledgerStore.dispatchBehindBytes()); + doGetMessages(recoverDledgerStore, topic, 0, 3000, 0); + recoverDledgerStore.shutdown(); + } } }