提交 b9ed0ab9 编写于 作者: D dongeforever

Fix tests

上级 866e3090
...@@ -239,12 +239,16 @@ public class DefaultMessageStore implements MessageStore { ...@@ -239,12 +239,16 @@ public class DefaultMessageStore implements MessageStore {
log.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMaxOffset={} clConfirmedOffset={}", log.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMaxOffset={} clConfirmedOffset={}",
maxPhysicalPosInLogicQueue, this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset()); maxPhysicalPosInLogicQueue, this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset());
long reputOffset;
if (this.getMessageStoreConfig().isDuplicationEnable()) { if (this.getMessageStoreConfig().isDuplicationEnable()) {
this.reputMessageService.setReputFromOffset(Math.min(maxPhysicalPosInLogicQueue, this.commitLog.getConfirmOffset())); reputOffset = Math.min(maxPhysicalPosInLogicQueue, this.commitLog.getConfirmOffset());
} else { } else {
this.reputMessageService.setReputFromOffset(Math.min(maxPhysicalPosInLogicQueue, this.commitLog.getMaxOffset())); reputOffset = Math.min(maxPhysicalPosInLogicQueue, this.commitLog.getMaxOffset());
}
if (reputOffset < 0) {
reputOffset = 0;
} }
this.reputMessageService.setReputFromOffset(reputOffset);
this.reputMessageService.start(); this.reputMessageService.start();
if (!messageStoreConfig.isEnableDLegerCommitLog()) { if (!messageStoreConfig.isEnableDLegerCommitLog()) {
......
...@@ -143,7 +143,7 @@ public class MessageStoreConfig { ...@@ -143,7 +143,7 @@ public class MessageStoreConfig {
private int transientStorePoolSize = 5; private int transientStorePoolSize = 5;
private boolean fastFailIfNoBufferInStorePool = false; private boolean fastFailIfNoBufferInStorePool = false;
private boolean enableDLegerCommitLog; private boolean enableDLegerCommitLog = false;
private String dLegerGroup; private String dLegerGroup;
private String dLegerPeers; private String dLegerPeers;
private String dLegerSelfId; private String dLegerSelfId;
......
...@@ -30,9 +30,10 @@ import org.apache.rocketmq.common.UtilAll; ...@@ -30,9 +30,10 @@ import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.apache.rocketmq.store.stats.BrokerStatsManager;
import static org.assertj.core.api.Assertions.assertThat;
import org.junit.Test; import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class ConsumeQueueTest { public class ConsumeQueueTest {
private static final String msg = "Once, there was a chance for me!"; private static final String msg = "Once, there was a chance for me!";
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册