提交 f308cd30 编写于 作者: D dongeforever

Convert mq to broker name

上级 c0ffc5bd
......@@ -41,7 +41,6 @@ import org.apache.rocketmq.common.protocol.header.QueryMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.QueryMessageResponseHeader;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.InvokeCallback;
......@@ -191,10 +190,10 @@ public class MQAdminImpl {
if (logicalQueueRouteData != null) {
mq = logicalQueueRouteData.getMessageQueue();
}
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq);
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
if (null == brokerAddr) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq);
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
}
if (brokerAddr != null) {
......@@ -227,10 +226,10 @@ public class MQAdminImpl {
previousQueueRouteData = maxQueueRouteData;
mq = maxQueueRouteData.getMessageQueue();
}
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq);
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
if (null == brokerAddr) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq);
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
}
if (brokerAddr != null) {
......@@ -256,10 +255,10 @@ public class MQAdminImpl {
mq = minQueueRouteData.getMessageQueue();
}
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq);
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
if (null == brokerAddr) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq);
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
}
if (brokerAddr != null) {
......@@ -298,10 +297,10 @@ public class MQAdminImpl {
mq = minQueueRouteData.getMessageQueue();
}
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq);
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
if (null == brokerAddr) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq);
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
}
if (brokerAddr != null) {
......
......@@ -718,11 +718,16 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, final MessageQueue mq)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
String desBrokerName = brokerName;
if (mq != null) {
String tmpBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
if (tmpBrokerName != null) {
desBrokerName = tmpBrokerName;
}
}
String brokerAddr = null;
if (null != mq) {
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq);
} else if (null != brokerName) {
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
if (null != desBrokerName) {
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(desBrokerName);
} else {
RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
}
......
......@@ -1055,16 +1055,11 @@ public class MQClientInstance {
return mq.getBrokerName();
}
public FindBrokerResult findBrokerAddressInAdmin(final MessageQueue mq) {
String brokerName = getBrokerNameFromMessageQueue(mq);
public FindBrokerResult findBrokerAddressInAdmin(final String brokerName) {
if (brokerName == null) {
return null;
} else {
return findBrokerAddressInAdmin(brokerName);
}
}
public FindBrokerResult findBrokerAddressInAdmin(final String brokerName) {
String brokerAddr = null;
boolean slave = false;
boolean found = false;
......@@ -1094,16 +1089,11 @@ public class MQClientInstance {
return null;
}
public String findBrokerAddressInPublish(final MessageQueue mq) {
String brokerName = getBrokerNameFromMessageQueue(mq);
public String findBrokerAddressInPublish(final String brokerName) {
if (brokerName == null) {
return null;
} else {
return findBrokerAddressInPublish(brokerName);
}
}
//This is used for retry only
public String findBrokerAddressInPublish(final String brokerName) {
HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
if (map != null && !map.isEmpty()) {
return map.get(MixAll.MASTER_ID);
......@@ -1112,20 +1102,15 @@ public class MQClientInstance {
return null;
}
public FindBrokerResult findBrokerAddressInSubscribe(final MessageQueue mq, final long brokerId, final boolean onlyThisBroker) {
String brokerName = getBrokerNameFromMessageQueue(mq);
if (brokerName == null) {
return null;
} else {
return findBrokerAddressInSubscribe(brokerName, brokerId, onlyThisBroker);
}
}
public FindBrokerResult findBrokerAddressInSubscribe(
final String brokerName,
final long brokerId,
final boolean onlyThisBroker
) {
if (brokerName == null) {
return null;
}
String brokerAddr = null;
boolean slave = false;
boolean found = false;
......
......@@ -16,16 +16,12 @@
*/
package org.apache.rocketmq.client.impl.producer;
import com.alibaba.fastjson.JSON;
import com.google.common.base.Objects;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
......@@ -45,7 +41,6 @@ import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.common.ClientErrorCode;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.exception.MQRedirectException;
import org.apache.rocketmq.client.exception.RequestTimeoutException;
import org.apache.rocketmq.client.hook.CheckForbiddenContext;
import org.apache.rocketmq.client.hook.CheckForbiddenHook;
......@@ -67,7 +62,6 @@ import org.apache.rocketmq.client.producer.RequestFutureTable;
import org.apache.rocketmq.client.producer.RequestResponseFuture;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendResultForLogicalQueue;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.client.producer.TransactionListener;
......@@ -88,13 +82,9 @@ import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageType;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.utils.CorrelationIdUtil;
import org.apache.rocketmq.logging.InternalLogger;
......@@ -731,14 +721,13 @@ public class DefaultMQProducerImpl implements MQProducerInner {
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq);
String brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq);
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
}
String brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
SendMessageContext context = null;
if (brokerAddr != null) {
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
......@@ -1342,7 +1331,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}
String transactionId = sendResult.getTransactionId();
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue());
final String destBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(defaultMQProducer.queueWithNamespace(sendResult.getMessageQueue()));
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(destBrokerName);
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册