diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java index 50a48d4de9c26be4144a9fc938a76a5e6dcef096..3be8cbc809ed59f6d39de94cd407a01741a899be 100644 --- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java +++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.rocketmq.common.ConfigManager; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicFilterType; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; @@ -305,6 +306,11 @@ public class ScheduleMessageService extends ConfigManager { if (msgExt != null) { try { MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); + if (MixAll.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) { + log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}", + msgInner.getTopic(), msgInner); + continue; + } PutMessageResult putMessageResult = ScheduleMessageService.this.writeMessageStore .putMessage(msgInner);