From b9ed0ab917ead249abfec9ea97332197754f7971 Mon Sep 17 00:00:00 2001 From: dongeforever Date: Tue, 20 Nov 2018 15:47:29 +0800 Subject: [PATCH] Fix tests --- .../org/apache/rocketmq/store/DefaultMessageStore.java | 10 +++++++--- .../rocketmq/store/config/MessageStoreConfig.java | 2 +- .../org/apache/rocketmq/store/ConsumeQueueTest.java | 3 ++- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index ecf0cbd3..828088a8 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -239,12 +239,16 @@ public class DefaultMessageStore implements MessageStore { log.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMaxOffset={} clConfirmedOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset()); - + long reputOffset; if (this.getMessageStoreConfig().isDuplicationEnable()) { - this.reputMessageService.setReputFromOffset(Math.min(maxPhysicalPosInLogicQueue, this.commitLog.getConfirmOffset())); + reputOffset = Math.min(maxPhysicalPosInLogicQueue, this.commitLog.getConfirmOffset()); } else { - this.reputMessageService.setReputFromOffset(Math.min(maxPhysicalPosInLogicQueue, this.commitLog.getMaxOffset())); + reputOffset = Math.min(maxPhysicalPosInLogicQueue, this.commitLog.getMaxOffset()); + } + if (reputOffset < 0) { + reputOffset = 0; } + this.reputMessageService.setReputFromOffset(reputOffset); this.reputMessageService.start(); if (!messageStoreConfig.isEnableDLegerCommitLog()) { diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 63c7c313..75d2c8eb 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -143,7 +143,7 @@ public class MessageStoreConfig { private int transientStorePoolSize = 5; private boolean fastFailIfNoBufferInStorePool = false; - private boolean enableDLegerCommitLog; + private boolean enableDLegerCommitLog = false; private String dLegerGroup; private String dLegerPeers; private String dLegerSelfId; diff --git a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java index 7e01b851..ede25ea5 100644 --- a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java @@ -30,9 +30,10 @@ import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.stats.BrokerStatsManager; -import static org.assertj.core.api.Assertions.assertThat; import org.junit.Test; +import static org.assertj.core.api.Assertions.assertThat; + public class ConsumeQueueTest { private static final String msg = "Once, there was a chance for me!"; -- GitLab