diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java index 04492e3d59ca4428dc2d6462136390ecb0b6c126..ce4c94ad7241a5d29ac67180c24b1d3c8dfacac9 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java @@ -41,7 +41,6 @@ import org.apache.rocketmq.common.protocol.header.QueryMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.QueryMessageResponseHeader; import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData; -import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.InvokeCallback; @@ -191,10 +190,10 @@ public class MQAdminImpl { if (logicalQueueRouteData != null) { mq = logicalQueueRouteData.getMessageQueue(); } - String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq); + String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq)); if (null == brokerAddr) { this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); - brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq); + brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq)); } if (brokerAddr != null) { @@ -227,10 +226,10 @@ public class MQAdminImpl { previousQueueRouteData = maxQueueRouteData; mq = maxQueueRouteData.getMessageQueue(); } - String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq); + String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq)); if (null == brokerAddr) { this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); - brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq); + brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq)); } if (brokerAddr != null) { @@ -256,10 +255,10 @@ public class MQAdminImpl { mq = minQueueRouteData.getMessageQueue(); } - String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq); + String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq)); if (null == brokerAddr) { this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); - brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq); + brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq)); } if (brokerAddr != null) { @@ -298,10 +297,10 @@ public class MQAdminImpl { mq = minQueueRouteData.getMessageQueue(); } - String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq); + String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq)); if (null == brokerAddr) { this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); - brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq); + brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq)); } if (brokerAddr != null) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index d23664632cd6601b3e180682e492b5d52764e3ae..dafa555925e4753adfcc4634760875b1e8f9fd7e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -718,11 +718,16 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { private void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, final MessageQueue mq) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { try { + String desBrokerName = brokerName; + if (mq != null) { + String tmpBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq); + if (tmpBrokerName != null) { + desBrokerName = tmpBrokerName; + } + } String brokerAddr = null; - if (null != mq) { - brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq); - } else if (null != brokerName) { - brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName); + if (null != desBrokerName) { + brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(desBrokerName); } else { RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index a7d521f415e9b3eef878a76d7c8f650e91a4a88f..950eae787d34ae6264120ba31ec4577f1824b935 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -1055,16 +1055,11 @@ public class MQClientInstance { return mq.getBrokerName(); } - public FindBrokerResult findBrokerAddressInAdmin(final MessageQueue mq) { - String brokerName = getBrokerNameFromMessageQueue(mq); + + public FindBrokerResult findBrokerAddressInAdmin(final String brokerName) { if (brokerName == null) { return null; - } else { - return findBrokerAddressInAdmin(brokerName); } - } - - public FindBrokerResult findBrokerAddressInAdmin(final String brokerName) { String brokerAddr = null; boolean slave = false; boolean found = false; @@ -1094,16 +1089,11 @@ public class MQClientInstance { return null; } - public String findBrokerAddressInPublish(final MessageQueue mq) { - String brokerName = getBrokerNameFromMessageQueue(mq); + + public String findBrokerAddressInPublish(final String brokerName) { if (brokerName == null) { return null; - } else { - return findBrokerAddressInPublish(brokerName); } - } - //This is used for retry only - public String findBrokerAddressInPublish(final String brokerName) { HashMap map = this.brokerAddrTable.get(brokerName); if (map != null && !map.isEmpty()) { return map.get(MixAll.MASTER_ID); @@ -1112,20 +1102,15 @@ public class MQClientInstance { return null; } - public FindBrokerResult findBrokerAddressInSubscribe(final MessageQueue mq, final long brokerId, final boolean onlyThisBroker) { - String brokerName = getBrokerNameFromMessageQueue(mq); - if (brokerName == null) { - return null; - } else { - return findBrokerAddressInSubscribe(brokerName, brokerId, onlyThisBroker); - } - } public FindBrokerResult findBrokerAddressInSubscribe( final String brokerName, final long brokerId, final boolean onlyThisBroker ) { + if (brokerName == null) { + return null; + } String brokerAddr = null; boolean slave = false; boolean found = false; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 578c8437d690300a03216b26a5c1fbec60b2ee8b..52a2d9c7ccdccf390dd449a36115f5b103891633 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -16,16 +16,12 @@ */ package org.apache.rocketmq.client.impl.producer; -import com.alibaba.fastjson.JSON; -import com.google.common.base.Objects; import java.io.IOException; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashSet; import java.util.List; -import java.util.Locale; import java.util.Random; import java.util.Set; import java.util.concurrent.BlockingQueue; @@ -45,7 +41,6 @@ import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.common.ClientErrorCode; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.exception.MQRedirectException; import org.apache.rocketmq.client.exception.RequestTimeoutException; import org.apache.rocketmq.client.hook.CheckForbiddenContext; import org.apache.rocketmq.client.hook.CheckForbiddenHook; @@ -67,7 +62,6 @@ import org.apache.rocketmq.client.producer.RequestFutureTable; import org.apache.rocketmq.client.producer.RequestResponseFuture; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.client.producer.SendResultForLogicalQueue; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.client.producer.TransactionCheckListener; import org.apache.rocketmq.client.producer.TransactionListener; @@ -88,13 +82,9 @@ import org.apache.rocketmq.common.message.MessageId; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageType; import org.apache.rocketmq.common.protocol.NamespaceUtil; -import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader; import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; -import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData; -import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo; -import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.common.utils.CorrelationIdUtil; import org.apache.rocketmq.logging.InternalLogger; @@ -731,14 +721,13 @@ public class DefaultMQProducerImpl implements MQProducerInner { final TopicPublishInfo topicPublishInfo, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { long beginStartTime = System.currentTimeMillis(); - String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq); + String brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq); + String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName); if (null == brokerAddr) { tryToFindTopicPublishInfo(mq.getTopic()); - brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq); + brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName); } - String brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq); - SendMessageContext context = null; if (brokerAddr != null) { brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); @@ -1342,7 +1331,8 @@ public class DefaultMQProducerImpl implements MQProducerInner { id = MessageDecoder.decodeMessageId(sendResult.getMsgId()); } String transactionId = sendResult.getTransactionId(); - final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue()); + final String destBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(defaultMQProducer.queueWithNamespace(sendResult.getMessageQueue())); + final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(destBrokerName); EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); requestHeader.setTransactionId(transactionId); requestHeader.setCommitLogOffset(id.getOffset());