From 3d778ef625d6fd975ac710973b0fdfe4e58f7936 Mon Sep 17 00:00:00 2001 From: huangli Date: Wed, 11 Dec 2019 10:30:39 +0800 Subject: [PATCH] Avoid message in schedule queue enter half message queue (#1626) --- .../rocketmq/store/schedule/ScheduleMessageService.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java index 50a48d4d..3be8cbc8 100644 --- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java +++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.rocketmq.common.ConfigManager; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicFilterType; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; @@ -305,6 +306,11 @@ public class ScheduleMessageService extends ConfigManager { if (msgExt != null) { try { MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); + if (MixAll.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) { + log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}", + msgInner.getTopic(), msgInner); + continue; + } PutMessageResult putMessageResult = ScheduleMessageService.this.writeMessageStore .putMessage(msgInner); -- GitLab