diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java index 831f7783102301da1b12a04fcd79d4ae696d62cb..b6aaa5b0471f81cc10f5e7f9acaa82ee4845289f 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java @@ -70,11 +70,6 @@ public class DLedgerCommitLog extends CommitLog { private long dividedCommitlogOffset = -1; - //The old commitlog should be deleted before the dledger commitlog - private final boolean originalDledgerEnableForceClean; - private final AtomicBoolean hasSetOriginalDledgerEnableForceClean = new AtomicBoolean(false); - - private boolean isInrecoveringOldCommitlog = false; public DLedgerCommitLog(final DefaultMessageStore defaultMessageStore) { @@ -87,7 +82,7 @@ public class DLedgerCommitLog extends CommitLog { dLedgerConfig.setStoreBaseDir(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir()); dLedgerConfig.setMappedFileSizeForEntryData(defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog()); dLedgerConfig.setDeleteWhen(defaultMessageStore.getMessageStoreConfig().getDeleteWhen()); - originalDledgerEnableForceClean = dLedgerConfig.isEnableDiskForceClean(); + dLedgerConfig.setFileReservedHours(defaultMessageStore.getMessageStoreConfig().getFileReservedTime() + 1); id = Integer.valueOf(dLedgerConfig.getSelfId().substring(1)) + 1; dLedgerServer = new DLedgerServer(dLedgerConfig); dLedgerFileStore = (DLedgerMmapFileStore) dLedgerServer.getdLedgerStore(); @@ -112,6 +107,17 @@ public class DLedgerCommitLog extends CommitLog { return true; } + private void refreshConfig() { + dLedgerConfig.setEnableDiskForceClean(defaultMessageStore.getMessageStoreConfig().isCleanFileForciblyEnable()); + dLedgerConfig.setDeleteWhen(defaultMessageStore.getMessageStoreConfig().getDeleteWhen()); + dLedgerConfig.setFileReservedHours(defaultMessageStore.getMessageStoreConfig().getFileReservedTime() + 1); + } + + private void disableDeleteDledger() { + dLedgerConfig.setEnableDiskForceClean(false); + dLedgerConfig.setFileReservedHours(Integer.MAX_VALUE); + } + @Override public void start() { dLedgerServer.startup(); @@ -177,11 +183,11 @@ public class DLedgerCommitLog extends CommitLog { final boolean cleanImmediately ) { if (mappedFileQueue.getMappedFiles().isEmpty()) { - if (hasSetOriginalDledgerEnableForceClean.compareAndSet(false, true)) { - dLedgerConfig.setEnableDiskForceClean(originalDledgerEnableForceClean); - } + refreshConfig(); //To prevent too much log in defaultMessageStore return Integer.MAX_VALUE; + } else { + disableDeleteDledger(); } int count = super.deleteExpiredFile(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately); if (count > 0 || mappedFileQueue.getMappedFiles().size() != 1) { @@ -251,16 +257,14 @@ public class DLedgerCommitLog extends CommitLog { return null; } - private void recover(long maxPhyOffsetOfConsumeQueue, boolean lastOk) { + private void recover(long maxPhyOffsetOfConsumeQueue) { dLedgerFileStore.load(); if (dLedgerFileList.getMappedFiles().size() > 0) { dLedgerFileStore.recover(); dividedCommitlogOffset = dLedgerFileList.getFirstMappedFile().getFileFromOffset(); MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); if (mappedFile != null) { - dLedgerConfig.setEnableDiskForceClean(false); - } else { - hasSetOriginalDledgerEnableForceClean.set(true); + disableDeleteDledger(); } long maxPhyOffset = dLedgerFileList.getMaxWrotePosition(); // Clear ConsumeQueue redundant data @@ -277,7 +281,6 @@ public class DLedgerCommitLog extends CommitLog { isInrecoveringOldCommitlog = false; MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); if (mappedFile == null) { - hasSetOriginalDledgerEnableForceClean.set(true); return; } ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); @@ -309,12 +312,12 @@ public class DLedgerCommitLog extends CommitLog { @Override public void recoverNormally(long maxPhyOffsetOfConsumeQueue) { - recover(maxPhyOffsetOfConsumeQueue, true); + recover(maxPhyOffsetOfConsumeQueue); } @Override public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) { - recover(maxPhyOffsetOfConsumeQueue, false); + recover(maxPhyOffsetOfConsumeQueue); } @Override diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest.java b/store/src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest.java index 8fb62f3b231ee63228f298b32d324ebeb59dfe30..e09f579e2a80df7240b2400b9de78922326f2fcc 100644 --- a/store/src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest.java @@ -133,6 +133,7 @@ public class MixCommitlogTest extends MessageStoreTestBase { { DefaultMessageStore dledgerStore = createDledgerMessageStore(base, group, "n0", peers, null, true, 0); DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) dledgerStore.getCommitLog(); + Assert.assertTrue(dledgerStore.getMessageStoreConfig().isCleanFileForciblyEnable()); Assert.assertFalse(dLedgerCommitLog.getdLedgerServer().getdLedgerConfig().isEnableDiskForceClean()); Assert.assertEquals(dividedOffset, dLedgerCommitLog.getDividedCommitlogOffset()); Thread.sleep(2000); @@ -143,6 +144,7 @@ public class MixCommitlogTest extends MessageStoreTestBase { Assert.assertEquals(0, dledgerStore.dispatchBehindBytes()); Assert.assertEquals(0, dledgerStore.getMinPhyOffset()); maxPhysicalOffset = dledgerStore.getMaxPhyOffset(); + Assert.assertTrue(maxPhysicalOffset > 0); doGetMessages(dledgerStore, topic, 0, 2000, 0); @@ -156,13 +158,15 @@ public class MixCommitlogTest extends MessageStoreTestBase { } Assert.assertEquals(dividedOffset, dledgerStore.getMinPhyOffset()); Assert.assertEquals(maxPhysicalOffset, dledgerStore.getMaxPhyOffset()); + + Assert.assertTrue(dledgerStore.getMessageStoreConfig().isCleanFileForciblyEnable()); Assert.assertTrue(dLedgerCommitLog.getdLedgerServer().getdLedgerConfig().isEnableDiskForceClean()); - dLedgerCommitLog.getdLedgerServer().getdLedgerConfig().setEnableDiskForceClean(false); + //Test fresh + dledgerStore.getMessageStoreConfig().setCleanFileForciblyEnable(false); for (int i = 0; i < 100; i++) { Assert.assertEquals(Integer.MAX_VALUE, dledgerStore.getCommitLog().deleteExpiredFile(System.currentTimeMillis(), 0, 0, true)); } - //should not change the value Assert.assertFalse(dLedgerCommitLog.getdLedgerServer().getdLedgerConfig().isEnableDiskForceClean()); doGetMessages(dledgerStore, topic, 0, 1000, 1000); dledgerStore.shutdown();