diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index bdce883050030d38983ebd63202624370d8700a2..6119e2483eb79c565b060874f5b103dbe5c2500a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -438,34 +438,40 @@ public class MQClientAPIImpl { ) { int tmp = curTimes.incrementAndGet(); if (needRetry && tmp <= timesTotal) { - MessageQueue tmpmq = producer.selectOneMessageQueue(topicPublishInfo, brokerName); - String addr = instance.findBrokerAddressInPublish(tmpmq.getBrokerName()); + String retryBrokerName = brokerName;//by default, it will send to the same broker + if (topicPublishInfo != null) { //select one message queue accordingly, in order to determine which broker to send + MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName); + retryBrokerName = mqChosen.getBrokerName(); + } + String addr = instance.findBrokerAddressInPublish(retryBrokerName); log.info("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr, - tmpmq.getBrokerName()); + retryBrokerName); try { request.setOpaque(RemotingCommand.createNewRequestId()); - sendMessageAsync(addr, tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, + sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, context, producer); } catch (InterruptedException e1) { - onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, + onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, false, producer); } catch (RemotingConnectException e1) { producer.updateFaultItem(brokerName, 3000, true); - onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, + onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, true, producer); } catch (RemotingTooMuchRequestException e1) { - onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, + onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, false, producer); } catch (RemotingException e1) { producer.updateFaultItem(brokerName, 3000, true); - onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, + onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, true, producer); } } else { + if (context != null) { context.setException(e); context.getProducer().executeSendMessageHookAfter(context); } + try { sendCallback.onException(e); } catch (Exception ignored) {