diff --git a/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java b/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java index b9d00abc59a916a9f2b4e03105cd85a6706640ff..09bf10c327de1b9a0158ea7b32cc9a5c4d866305 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java @@ -51,27 +51,32 @@ public class DLedgerRoleChangeHandler implements DLedgerLeaderElector.RoleChange @Override public void run() { long start = System.currentTimeMillis(); try { - boolean succ = false; + boolean succ = true; log.info("Begin handling broker role change term={} role={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole()); switch (role) { case CANDIDATE: if (messageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) { brokerController.changeToSlave(dLedgerCommitLog.getId()); } - succ = true; break; case FOLLOWER: brokerController.changeToSlave(dLedgerCommitLog.getId()); - succ = true; break; case LEADER: - while (dLegerServer.getMemberState().isLeader() - && (dLegerServer.getdLedgerStore().getLedgerEndIndex() != dLegerServer.getdLedgerStore().getCommittedIndex() || messageStore.dispatchBehindBytes() != 0)) { - DLedgerUtils.sleep(100); + while (true) { + if (!dLegerServer.getMemberState().isLeader()) { + succ = false; + break; + } + if (dLegerServer.getdLedgerStore().getLedgerEndIndex() == -1) { + break; + } + if (dLegerServer.getdLedgerStore().getLedgerEndIndex() == dLegerServer.getdLedgerStore().getCommittedIndex() + && messageStore.dispatchBehindBytes() == 0) { + break; + } + Thread.sleep(100); } - succ = dLegerServer.getMemberState().isLeader() - && dLegerServer.getdLedgerStore().getLedgerEndIndex() == dLegerServer.getdLedgerStore().getCommittedIndex() - && messageStore.dispatchBehindBytes() == 0; if (succ) { messageStore.recoverTopicQueueTable(); brokerController.changeToMaster(BrokerRole.SYNC_MASTER); diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index cdbd1a8bd85c2dd652224af0c5d41545dfe275ce..124e14e7806be37198f010caa0d675c37976be08 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -222,45 +222,50 @@ public class DefaultMessageStore implements MessageStore { lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes())); lockFile.getChannel().force(true); - - this.flushConsumeQueueService.start(); - this.commitLog.start(); - this.storeStatsService.start(); - - //calculate the reput offset from the consume queue itself - long maxPhysicalPosInLogicQueue = commitLog.getMinOffset(); - for (ConcurrentMap maps : this.consumeQueueTable.values()) { - for (ConsumeQueue logic : maps.values()) { - if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) { - maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset(); + { + /** + * 1. calculate the reput offset according to the consume queue; + * 2. make sure the lagged messages to be dispatched before starting the commitlog, especially when the broker role are automatically changed. + */ + long maxPhysicalPosInLogicQueue = commitLog.getMinOffset(); + for (ConcurrentMap maps : this.consumeQueueTable.values()) { + for (ConsumeQueue logic : maps.values()) { + if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) { + maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset(); + } } } + log.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMaxOffset={} clConfirmedOffset={}", + maxPhysicalPosInLogicQueue, this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset()); + long reputOffset; + if (this.getMessageStoreConfig().isDuplicationEnable()) { + reputOffset = Math.min(maxPhysicalPosInLogicQueue, this.commitLog.getConfirmOffset()); + } else { + reputOffset = Math.min(maxPhysicalPosInLogicQueue, this.commitLog.getMaxOffset()); + } + if (reputOffset < 0) { + reputOffset = 0; + } + this.reputMessageService.setReputFromOffset(reputOffset); + this.reputMessageService.start(); + + //Finish dispatching the messages fall behind + //Note that, if dledger is enabled, the maxOffset maybe -1, so here only require the dispatchBehindBytes > 0 + while (this.dispatchBehindBytes() > 0) { + Thread.sleep(1000); + log.info("Try to finish doing reput the messages fall behind during the starting, reputOffset={} maxOffset={} behind={}", this.reputMessageService.getReputFromOffset(), this.getMaxPhyOffset(), this.dispatchBehindBytes()); + } + this.recoverTopicQueueTable(); } - log.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMaxOffset={} clConfirmedOffset={}", - maxPhysicalPosInLogicQueue, this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset()); - long reputOffset; - if (this.getMessageStoreConfig().isDuplicationEnable()) { - reputOffset = Math.min(maxPhysicalPosInLogicQueue, this.commitLog.getConfirmOffset()); - } else { - reputOffset = Math.min(maxPhysicalPosInLogicQueue, this.commitLog.getMaxOffset()); - } - if (reputOffset < 0) { - reputOffset = 0; - } - this.reputMessageService.setReputFromOffset(reputOffset); - this.reputMessageService.start(); if (!messageStoreConfig.isEnableDLegerCommitLog()) { this.haService.start(); this.handleScheduleMessageService(messageStoreConfig.getBrokerRole()); } - //Finish dispatching the messages fall behind - while (this.dispatchBehindBytes() != 0) { - Thread.sleep(1000); - log.info("Try to finish doing reput the messages fall behind during the starting, reputOffset={} maxOffset={} behind={}", this.reputMessageService.getReputFromOffset(), this.getMaxPhyOffset(), this.dispatchBehindBytes()); - } - this.recoverTopicQueueTable(); + this.flushConsumeQueueService.start(); + this.commitLog.start(); + this.storeStatsService.start(); this.createTempFile(); this.addScheduleTask(); diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java index 30ecefff3cfec5abc88096040ffc143ee7a65b04..831f7783102301da1b12a04fcd79d4ae696d62cb 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java @@ -86,6 +86,7 @@ public class DLedgerCommitLog extends CommitLog { dLedgerConfig.setPeers(defaultMessageStore.getMessageStoreConfig().getdLegerPeers()); dLedgerConfig.setStoreBaseDir(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir()); dLedgerConfig.setMappedFileSizeForEntryData(defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog()); + dLedgerConfig.setDeleteWhen(defaultMessageStore.getMessageStoreConfig().getDeleteWhen()); originalDledgerEnableForceClean = dLedgerConfig.isEnableDiskForceClean(); id = Integer.valueOf(dLedgerConfig.getSelfId().substring(1)) + 1; dLedgerServer = new DLedgerServer(dLedgerConfig); @@ -129,25 +130,26 @@ public class DLedgerCommitLog extends CommitLog { @Override public long getMaxOffset() { - if (this.dLedgerFileStore.getCommittedPos() != -1) { - return dLedgerFileStore.getCommittedPos(); - } else { - return this.dLedgerFileList.getMaxWrotePosition(); + if (dividedCommitlogOffset > 0 && dLedgerFileStore.getCommittedPos() < 0) { + return dividedCommitlogOffset; + } + if (dLedgerFileStore.getCommittedPos() == -1) { + return 0; } + return dLedgerFileStore.getCommittedPos(); } @Override public long getMinOffset() { - if (mappedFileQueue.getMappedFiles().isEmpty()) { - return dLedgerFileList.getMinOffset(); + if (!mappedFileQueue.getMappedFiles().isEmpty()) { + return mappedFileQueue.getMinOffset(); } - return mappedFileQueue.getMinOffset(); + return dLedgerFileList.getMinOffset(); } @Override public long getConfirmOffset() { - return this.dLedgerFileStore.getCommittedPos() == -1 ? getMaxOffset() - : this.dLedgerFileStore.getCommittedPos(); + return this.getMaxOffset(); } @Override diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java index 5c098d4b9aa84b42e658c4f172063e5b851b325c..a6acbdb8d9c6e94b76a387040a77e615513e3e4d 100644 --- a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java @@ -174,7 +174,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { Thread.sleep(1000); - Assert.assertTrue(leaderStore.getCommitLog().getMaxOffset() > 0); + Assert.assertEquals(0, leaderStore.getCommitLog().getMaxOffset()); Assert.assertEquals(0, leaderStore.getMaxOffsetInQueue(topic, 0)); @@ -183,6 +183,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { Assert.assertEquals(1, leaderStore.getMaxOffsetInQueue(topic, 0)); Assert.assertEquals(1, followerStore.getMaxOffsetInQueue(topic, 0)); + Assert.assertTrue(leaderStore.getCommitLog().getMaxOffset() > 0); leaderStore.destroy(); diff --git a/test/src/test/java/org/apache/rocketmq/test/base/dledger/ProduceAndConsumeTest.java b/test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java similarity index 100% rename from test/src/test/java/org/apache/rocketmq/test/base/dledger/ProduceAndConsumeTest.java rename to test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java