diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 231bc0110403310c3ee260d25cbbb3fc65725c98..1c9af0493ea86ae3240a7e1930894ad54ff0cc47 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -227,10 +227,20 @@ public class DefaultMessageStore implements MessageStore { this.commitLog.start(); this.storeStatsService.start(); + //calculate the reput offset from the consume queue itself + long maxPhysicalPosInLogicQueue = commitLog.getMinOffset(); + for (ConcurrentMap maps : this.consumeQueueTable.values()) { + for (ConsumeQueue logic : maps.values()) { + if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) { + maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset(); + } + } + } + if (this.getMessageStoreConfig().isDuplicationEnable()) { - this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset()); + this.reputMessageService.setReputFromOffset(Math.min(maxPhysicalPosInLogicQueue, this.commitLog.getConfirmOffset())); } else { - this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset()); + this.reputMessageService.setReputFromOffset(Math.min(maxPhysicalPosInLogicQueue, this.commitLog.getMaxOffset())); } this.reputMessageService.start(); diff --git a/store/src/main/java/org/apache/rocketmq/store/dleger/DLegerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dleger/DLegerCommitLog.java index 7a15c0a473fcce71366b5ced378d5d2942f6ec14..c575aad90be315d5dabac5bddac8b7d2d6f02870 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dleger/DLegerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dleger/DLegerCommitLog.java @@ -248,16 +248,15 @@ public class DLegerCommitLog extends CommitLog { } public long getConfirmOffset() { - return this.confirmOffset; + return this.dLegerFileStore.getCommittedPos() == -1 ? getMaxOffset() + : this.dLegerFileStore.getCommittedPos(); } public void setConfirmOffset(long phyOffset) { - this.confirmOffset = phyOffset; + log.warn("Should not set confirm offset {} for dleger commitlog", phyOffset); } - - private void notifyMessageArriving() { } @@ -438,6 +437,8 @@ public class DLegerCommitLog extends CommitLog { return dLegerFileList.getMinOffset(); } + + public SelectMappedBufferResult getMessage(final long offset, final int size) { int mappedFileSize = this.dLegerServer.getdLegerConfig().getMappedFileSizeForEntryData(); MmapFile mappedFile = this.dLegerFileList.findMappedFileByOffset(offset, offset == 0);