From 84d2260b87bf027fbf6fccb570ca6750fd78c9d3 Mon Sep 17 00:00:00 2001 From: Hu Zongtang Date: Mon, 29 Apr 2019 10:38:12 +0800 Subject: [PATCH] [ISSUE #1164]Fix Consumer Instance can't consume message from slave when cluster is in the high level tps and master has been killed. (#1181) * [issue#1164]return the codes to original reput method part. * [issue#1164]fix issue that Consumer Instance can't consume message from slave when cluster is in the high level tps and master has been killed. * [issue#1164]if the broker is a master node,then modify reputFromOffset correctly. * [issue#1164]add some coding comments. --- .../java/org/apache/rocketmq/store/DefaultMessageStore.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 70efabf1..f254dd15 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -1864,10 +1864,12 @@ public class DefaultMessageStore implements MessageStore { this.reputFromOffset += size; } else { doNext = false; - if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog()) { + // If user open the dledger pattern or the broker is master node, + // it will not ignore the exception and fix the reputFromOffset variable + if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() || + DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) { log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}", this.reputFromOffset); - this.reputFromOffset += result.getSize() - readSize; } } -- GitLab