diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index 03a5dbaf5cfb1266de0775448af2f3c451f185e4..c229ce36ec105e70eab22fa3b7c2fd751c93c760 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -212,6 +212,14 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement return CompletableFuture.completedFuture(response); } + if (requestHeader.getOriginTopic() != null + && !msgExt.getTopic().equals(requestHeader.getOriginTopic())) { + //here just do some fence in case of some unexpected offset is income + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("look message by offset failed to check the topic name" + requestHeader.getOffset()); + return CompletableFuture.completedFuture(response); + } + final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC); if (null == retryTopic) { MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic()); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index d00c4e84a68f82b2c0b1d07fe109beb4f37d97bd..9d158c66c84e593d5ce90147b2a6248b9f4825de 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -715,48 +715,42 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { sendMessageBack(msg, delayLevel, null, mq); } + private void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, final MessageQueue mq) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { try { - String desBrokerName = brokerName; - if (mq != null) { - String tmpBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq); - if (tmpBrokerName != null) { - desBrokerName = tmpBrokerName; - } - } - if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(desBrokerName)) { - desBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(this.defaultMQPushConsumer.queueWithNamespace(new MessageQueue(msg.getTopic(), msg.getBrokerName(), msg.getQueueId()))); - } - - String brokerAddr = null; - if (null != desBrokerName) { - brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(desBrokerName); + if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(brokerName) + || (mq != null && MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(mq.getBrokerName()))) { + sendMessageBackAsNormalMessage(msg); } else { - RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()); + String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName) + : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()); + this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, + this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes()); } - this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, - this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes()); } catch (Exception e) { log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e); + sendMessageBackAsNormalMessage(msg); + } finally { + msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace())); + } + } - Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody()); + private void sendMessageBackAsNormalMessage(MessageExt msg) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody()); - String originMsgId = MessageAccessor.getOriginMessageId(msg); - MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId); + String originMsgId = MessageAccessor.getOriginMessageId(msg); + MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId); - newMsg.setFlag(msg.getFlag()); - MessageAccessor.setProperties(newMsg, msg.getProperties()); - MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic()); - MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1)); - MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes())); - MessageAccessor.clearProperty(newMsg, MessageConst.PROPERTY_TRANSACTION_PREPARED); - newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes()); + newMsg.setFlag(msg.getFlag()); + MessageAccessor.setProperties(newMsg, msg.getProperties()); + MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic()); + MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1)); + MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes())); + MessageAccessor.clearProperty(newMsg, MessageConst.PROPERTY_TRANSACTION_PREPARED); + newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes()); - this.mQClientFactory.getDefaultMQProducer().send(newMsg); - } finally { - msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace())); - } + this.mQClientFactory.getDefaultMQProducer().send(newMsg); } void ackAsync(MessageExt message, String consumerGroup) { diff --git "a/docs/cn/statictopic/RocketMQ_Static_Topic_Logic_Queue_\350\256\276\350\256\241.md" "b/docs/cn/statictopic/RocketMQ_Static_Topic_Logic_Queue_\350\256\276\350\256\241.md" index 9d41f00b8e7b5dc2ed60cfb4f89c1e0a124fd9e9..d2e8e10453f76f39c2707bfdeaeaec1e6f9dba92 100644 --- "a/docs/cn/statictopic/RocketMQ_Static_Topic_Logic_Queue_\350\256\276\350\256\241.md" +++ "b/docs/cn/statictopic/RocketMQ_Static_Topic_Logic_Queue_\350\256\276\350\256\241.md" @@ -385,10 +385,9 @@ logicOffset的决策,依赖于上一个 PhysicalQueue 的最大位点。 * 所有位点相关的API,需要考虑 MappingItem endOffset,因为超过了 endOffset 可能已经不属于 当前 LogicQueue 了 * 新建 MappingItem,需要先获取 旧 MappingItem 的 endOffset -当前实现,为了保证简洁,禁止 PhysicalQueue 被重复利用,每次更新映射都会让物理层面的 writeQueues++ 和 readQueues++ +当前实现,为了保证简洁,禁止 PhysicalQueue 被重复利用,每次更新映射都会让物理层面的 writeQueues++ 和 readQueues++。 后续实现,可以考虑复用已经被清除掉的Physical,也即已经没有数据,位点从0开始。 - #### 备机更新映射 当前,admin操作都是要求在Master操作的。因此,没有这个问题。 Command操作时,提前预判Master是否存在,如果不存在,则提前报错,减少中间失败率。 @@ -411,10 +410,9 @@ Command操作时,提前预判Master是否存在,如果不存在,则提前 #### 拉取消息时的 中断问题 当1个 PhysicalQueue 被拉取干净时,需要修正 nextBeginOffset 到下一个 PhysicalQueue。 如果没有处理好,则直接会导致拉取中断,无法前进。 - - #### pullResult 位点由谁设置的问题 -类似于Batch,由客户端设置,避免服务端解开消息。 +类似于Batch,由客户端设置,避免服务端解开消息: +在PullResultExt中新增字段 offsetDelta。 #### 远程读的性能问题 从实战经验来看,性能损耗几乎不计。 #### 使用习惯的改变