diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 124e14e7806be37198f010caa0d675c37976be08..254f87a6529f1af782679390f76b8745b2ef445a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -224,8 +224,10 @@ public class DefaultMessageStore implements MessageStore { lockFile.getChannel().force(true); { /** - * 1. calculate the reput offset according to the consume queue; - * 2. make sure the lagged messages to be dispatched before starting the commitlog, especially when the broker role are automatically changed. + * 1. Make sure the fast-forward messages to be truncated during the recovering according to the max physical offset of the commitlog; + * 2. DLedger committedPos may be missing, so the maxPhysicalPosInLogicQueue maybe bigger that maxOffset returned by DLedgerCommitLog, just let it go; + * 3. Calculate the reput offset according to the consume queue; + * 4. Make sure the fall-behind messages to be dispatched before starting the commitlog, especially when the broker role are automatically changed. */ long maxPhysicalPosInLogicQueue = commitLog.getMinOffset(); for (ConcurrentMap maps : this.consumeQueueTable.values()) { @@ -235,23 +237,34 @@ public class DefaultMessageStore implements MessageStore { } } } - log.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMaxOffset={} clConfirmedOffset={}", - maxPhysicalPosInLogicQueue, this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset()); - long reputOffset; - if (this.getMessageStoreConfig().isDuplicationEnable()) { - reputOffset = Math.min(maxPhysicalPosInLogicQueue, this.commitLog.getConfirmOffset()); - } else { - reputOffset = Math.min(maxPhysicalPosInLogicQueue, this.commitLog.getMaxOffset()); - } - if (reputOffset < 0) { - reputOffset = 0; - } - this.reputMessageService.setReputFromOffset(reputOffset); + if (maxPhysicalPosInLogicQueue < 0) { + maxPhysicalPosInLogicQueue = 0; + } + if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) { + maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset(); + /** + * This happens in following conditions: + * 1. If someone removes all the consumequeue files or the disk get damaged. + * 2. Launch a new broker, and copy the commitlog from other brokers. + * + * All the conditions has the same in common that the maxPhysicalPosInLogicQueue should be 0. + * If the maxPhysicalPosInLogicQueue is gt 0, there maybe something wrong. + */ + log.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={} clMinOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset()); + } + log.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMinOffset={} clMaxOffset={} clConfirmedOffset={}", + maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset(), this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset()); + this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue); this.reputMessageService.start(); - //Finish dispatching the messages fall behind - //Note that, if dledger is enabled, the maxOffset maybe -1, so here only require the dispatchBehindBytes > 0 - while (this.dispatchBehindBytes() > 0) { + /** + * 1. Finish dispatching the messages fall behind, then to start other services. + * 2. DLedger committedPos may be missing, so here just require dispatchBehindBytes <= 0 + */ + while (true) { + if (dispatchBehindBytes() <= 0) { + break; + } Thread.sleep(1000); log.info("Try to finish doing reput the messages fall behind during the starting, reputOffset={} maxOffset={} behind={}", this.reputMessageService.getReputFromOffset(), this.getMaxPhyOffset(), this.dispatchBehindBytes()); } diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java index 3e1d25414a2d28cb87a4744728643b0857a704a8..1f029c51a0b52c94aa124eecace886e3b7393097 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java @@ -135,13 +135,13 @@ public class DLedgerCommitLog extends CommitLog { @Override public long getMaxOffset() { - if (dividedCommitlogOffset > 0 && dLedgerFileStore.getCommittedPos() < 0) { - return dividedCommitlogOffset; + if (dLedgerFileStore.getCommittedPos() > 0) { + return dLedgerFileStore.getCommittedPos(); } - if (dLedgerFileStore.getCommittedPos() == -1) { - return 0; + if (dLedgerFileList.getMinOffset() > 0) { + return dLedgerFileList.getMinOffset(); } - return dLedgerFileStore.getCommittedPos(); + return 0; } @Override diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest.java b/store/src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest.java index e09f579e2a80df7240b2400b9de78922326f2fcc..540486d2aa2b6e689ac6ad31a1453f06c7d15fbd 100644 --- a/store/src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest.java @@ -85,6 +85,8 @@ public class MixCommitlogTest extends MessageStoreTestBase { DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) dledgerStore.getCommitLog(); Assert.assertFalse(dLedgerCommitLog.getdLedgerServer().getdLedgerConfig().isEnableDiskForceClean()); Assert.assertEquals(dividedOffset, dLedgerCommitLog.getDividedCommitlogOffset()); + Assert.assertEquals(0, dledgerStore.dispatchBehindBytes()); + Assert.assertEquals(dividedOffset, dLedgerCommitLog.getMaxOffset()); Thread.sleep(2000); doPutMessages(dledgerStore, topic, 0, 1000, 1000); Thread.sleep(500);