提交 08618d51 编写于 作者: D dongeforever

Finish the findBrokerAddr for admin publish subscribe

上级 f308cd30
...@@ -199,10 +199,10 @@ public class RemoteBrokerOffsetStore implements OffsetStore { ...@@ -199,10 +199,10 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
@Override @Override
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException { MQBrokerException, InterruptedException, MQClientException {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
if (null == findBrokerResult) { if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
} }
if (findBrokerResult != null) { if (findBrokerResult != null) {
...@@ -226,11 +226,11 @@ public class RemoteBrokerOffsetStore implements OffsetStore { ...@@ -226,11 +226,11 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException, private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException { InterruptedException, MQClientException {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
if (null == findBrokerResult) { if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
} }
if (findBrokerResult != null) { if (findBrokerResult != null) {
......
...@@ -578,7 +578,11 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { ...@@ -578,7 +578,11 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, String consumerGroup) public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, String consumerGroup)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException { throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try { try {
String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName) String destBrokerName = brokerName;
if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(destBrokerName)) {
destBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(this.defaultMQPullConsumer.queueWithNamespace(new MessageQueue(msg.getTopic(), msg.getBrokerName(), msg.getQueueId())));
}
String brokerAddr = (null != destBrokerName) ? this.mQClientFactory.findBrokerAddressInPublish(destBrokerName)
: RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()); : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
if (UtilAll.isBlank(consumerGroup)) { if (UtilAll.isBlank(consumerGroup)) {
......
...@@ -725,6 +725,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -725,6 +725,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
desBrokerName = tmpBrokerName; desBrokerName = tmpBrokerName;
} }
} }
if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(desBrokerName)) {
desBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(this.defaultMQPushConsumer.queueWithNamespace(new MessageQueue(msg.getTopic(), msg.getBrokerName(), msg.getQueueId())));
}
String brokerAddr = null; String brokerAddr = null;
if (null != desBrokerName) { if (null != desBrokerName) {
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(desBrokerName); brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(desBrokerName);
...@@ -765,15 +769,21 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -765,15 +769,21 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
long queueOffset = ExtraInfoUtil.getQueueOffset(extraInfoStrs); long queueOffset = ExtraInfoUtil.getQueueOffset(extraInfoStrs);
String topic = message.getTopic(); String topic = message.getTopic();
String desBrokerName = brokerName;
if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(brokerName)) {
desBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(this.defaultMQPushConsumer.queueWithNamespace(new MessageQueue(topic, brokerName, queueId)));
}
FindBrokerResult FindBrokerResult
findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true); findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(desBrokerName, MixAll.MASTER_ID, true);
if (null == findBrokerResult) { if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true); findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(desBrokerName, MixAll.MASTER_ID, true);
} }
if (findBrokerResult == null) { if (findBrokerResult == null) {
log.error("The broker[" + brokerName + "] not exist"); log.error("The broker[" + desBrokerName + "] not exist");
return; return;
} }
...@@ -806,11 +816,17 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -806,11 +816,17 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
String[] extraInfoStrs = ExtraInfoUtil.split(extraInfo); String[] extraInfoStrs = ExtraInfoUtil.split(extraInfo);
String brokerName = ExtraInfoUtil.getBrokerName(extraInfoStrs); String brokerName = ExtraInfoUtil.getBrokerName(extraInfoStrs);
int queueId = ExtraInfoUtil.getQueueId(extraInfoStrs); int queueId = ExtraInfoUtil.getQueueId(extraInfoStrs);
String desBrokerName = brokerName;
if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(brokerName)) {
desBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(this.defaultMQPushConsumer.queueWithNamespace(new MessageQueue(topic, brokerName, queueId)));
}
FindBrokerResult FindBrokerResult
findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true); findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(desBrokerName, MixAll.MASTER_ID, true);
if (null == findBrokerResult) { if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true); findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(desBrokerName, MixAll.MASTER_ID, true);
} }
if (findBrokerResult != null) { if (findBrokerResult != null) {
ChangeInvisibleTimeRequestHeader requestHeader = new ChangeInvisibleTimeRequestHeader(); ChangeInvisibleTimeRequestHeader requestHeader = new ChangeInvisibleTimeRequestHeader();
...@@ -820,10 +836,11 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -820,10 +836,11 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
requestHeader.setConsumerGroup(consumerGroup); requestHeader.setConsumerGroup(consumerGroup);
requestHeader.setExtraInfo(extraInfo); requestHeader.setExtraInfo(extraInfo);
requestHeader.setInvisibleTime(invisibleTime); requestHeader.setInvisibleTime(invisibleTime);
//here the broker should be polished
this.mQClientFactory.getMQClientAPIImpl().changeInvisibleTimeAsync(brokerName, findBrokerResult.getBrokerAddr(), requestHeader, ASYNC_TIMEOUT, callback); this.mQClientFactory.getMQClientAPIImpl().changeInvisibleTimeAsync(brokerName, findBrokerResult.getBrokerAddr(), requestHeader, ASYNC_TIMEOUT, callback);
return; return;
} }
throw new MQClientException("The broker[" + brokerName + "] not exist", null); throw new MQClientException("The broker[" + desBrokerName + "] not exist", null);
} }
public int getMaxReconsumeTimes() { public int getMaxReconsumeTimes() {
......
...@@ -239,12 +239,12 @@ public class PullAPIWrapper { ...@@ -239,12 +239,12 @@ public class PullAPIWrapper {
int queueId = mq.getQueueId(); int queueId = mq.getQueueId();
FindBrokerResult findBrokerResult = FindBrokerResult findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq),
this.recalculatePullFromWhichNode(mq), false); this.recalculatePullFromWhichNode(mq), false);
if (null == findBrokerResult) { if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
findBrokerResult = findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq),
this.recalculatePullFromWhichNode(mq), false); this.recalculatePullFromWhichNode(mq), false);
} }
...@@ -373,10 +373,10 @@ public class PullAPIWrapper { ...@@ -373,10 +373,10 @@ public class PullAPIWrapper {
public void popAsync(MessageQueue mq, long invisibleTime, int maxNums, String consumerGroup, public void popAsync(MessageQueue mq, long invisibleTime, int maxNums, String consumerGroup,
long timeout, PopCallback popCallback, boolean poll, int initMode, boolean order, String expressionType, String expression) long timeout, PopCallback popCallback, boolean poll, int initMode, boolean order, String expressionType, String expression)
throws MQClientException, RemotingException, InterruptedException { throws MQClientException, RemotingException, InterruptedException {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true); FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq), MixAll.MASTER_ID, true);
if (null == findBrokerResult) { if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true); findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq), MixAll.MASTER_ID, true);
} }
if (findBrokerResult != null) { if (findBrokerResult != null) {
PopMessageRequestHeader requestHeader = new PopMessageRequestHeader(); PopMessageRequestHeader requestHeader = new PopMessageRequestHeader();
......
...@@ -76,7 +76,7 @@ public abstract class RebalanceImpl { ...@@ -76,7 +76,7 @@ public abstract class RebalanceImpl {
} }
public void unlock(final MessageQueue mq, final boolean oneway) { public void unlock(final MessageQueue mq, final boolean oneway) {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true); FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq), MixAll.MASTER_ID, true);
if (findBrokerResult != null) { if (findBrokerResult != null) {
UnlockBatchRequestBody requestBody = new UnlockBatchRequestBody(); UnlockBatchRequestBody requestBody = new UnlockBatchRequestBody();
requestBody.setConsumerGroup(this.consumerGroup); requestBody.setConsumerGroup(this.consumerGroup);
...@@ -141,7 +141,8 @@ public abstract class RebalanceImpl { ...@@ -141,7 +141,8 @@ public abstract class RebalanceImpl {
continue; continue;
} }
Set<MessageQueue> mqs = result.get(mq.getBrokerName()); String destBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
Set<MessageQueue> mqs = result.get(destBrokerName);
if (null == mqs) { if (null == mqs) {
mqs = new HashSet<MessageQueue>(); mqs = new HashSet<MessageQueue>();
result.put(mq.getBrokerName(), mqs); result.put(mq.getBrokerName(), mqs);
...@@ -154,7 +155,7 @@ public abstract class RebalanceImpl { ...@@ -154,7 +155,7 @@ public abstract class RebalanceImpl {
} }
public boolean lock(final MessageQueue mq) { public boolean lock(final MessageQueue mq) {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true); FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq), MixAll.MASTER_ID, true);
if (findBrokerResult != null) { if (findBrokerResult != null) {
LockBatchRequestBody requestBody = new LockBatchRequestBody(); LockBatchRequestBody requestBody = new LockBatchRequestBody();
requestBody.setConsumerGroup(this.consumerGroup); requestBody.setConsumerGroup(this.consumerGroup);
......
...@@ -721,11 +721,11 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -721,11 +721,11 @@ public class DefaultMQProducerImpl implements MQProducerInner {
final TopicPublishInfo topicPublishInfo, final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis(); long beginStartTime = System.currentTimeMillis();
String brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq); String destBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName); String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(destBrokerName);
if (null == brokerAddr) { if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic()); tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName); brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(destBrokerName);
} }
SendMessageContext context = null; SendMessageContext context = null;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册