提交 0c69acaf 编写于 作者: D dongeforever

Get reput offset from logic queues

上级 5a19e6ee
...@@ -227,10 +227,20 @@ public class DefaultMessageStore implements MessageStore { ...@@ -227,10 +227,20 @@ public class DefaultMessageStore implements MessageStore {
this.commitLog.start(); this.commitLog.start();
this.storeStatsService.start(); this.storeStatsService.start();
//calculate the reput offset from the consume queue itself
long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
for (ConsumeQueue logic : maps.values()) {
if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) {
maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
}
}
}
if (this.getMessageStoreConfig().isDuplicationEnable()) { if (this.getMessageStoreConfig().isDuplicationEnable()) {
this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset()); this.reputMessageService.setReputFromOffset(Math.min(maxPhysicalPosInLogicQueue, this.commitLog.getConfirmOffset()));
} else { } else {
this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset()); this.reputMessageService.setReputFromOffset(Math.min(maxPhysicalPosInLogicQueue, this.commitLog.getMaxOffset()));
} }
this.reputMessageService.start(); this.reputMessageService.start();
......
...@@ -248,16 +248,15 @@ public class DLegerCommitLog extends CommitLog { ...@@ -248,16 +248,15 @@ public class DLegerCommitLog extends CommitLog {
} }
public long getConfirmOffset() { public long getConfirmOffset() {
return this.confirmOffset; return this.dLegerFileStore.getCommittedPos() == -1 ? getMaxOffset()
: this.dLegerFileStore.getCommittedPos();
} }
public void setConfirmOffset(long phyOffset) { public void setConfirmOffset(long phyOffset) {
this.confirmOffset = phyOffset; log.warn("Should not set confirm offset {} for dleger commitlog", phyOffset);
} }
private void notifyMessageArriving() { private void notifyMessageArriving() {
} }
...@@ -438,6 +437,8 @@ public class DLegerCommitLog extends CommitLog { ...@@ -438,6 +437,8 @@ public class DLegerCommitLog extends CommitLog {
return dLegerFileList.getMinOffset(); return dLegerFileList.getMinOffset();
} }
public SelectMappedBufferResult getMessage(final long offset, final int size) { public SelectMappedBufferResult getMessage(final long offset, final int size) {
int mappedFileSize = this.dLegerServer.getdLegerConfig().getMappedFileSizeForEntryData(); int mappedFileSize = this.dLegerServer.getdLegerConfig().getMappedFileSizeForEntryData();
MmapFile mappedFile = this.dLegerFileList.findMappedFileByOffset(offset, offset == 0); MmapFile mappedFile = this.dLegerFileList.findMappedFileByOffset(offset, offset == 0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册