From 01a0eb0088860b10ae0cff257e4f25a0c59cd44c Mon Sep 17 00:00:00 2001 From: Jaskey Date: Wed, 15 Feb 2017 22:17:47 +0800 Subject: [PATCH] Fix possible NullPointerException when retry in send Async way --- .../rocketmq/client/impl/MQClientAPIImpl.java | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index bdce8830..6119e248 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -438,34 +438,40 @@ public class MQClientAPIImpl { ) { int tmp = curTimes.incrementAndGet(); if (needRetry && tmp <= timesTotal) { - MessageQueue tmpmq = producer.selectOneMessageQueue(topicPublishInfo, brokerName); - String addr = instance.findBrokerAddressInPublish(tmpmq.getBrokerName()); + String retryBrokerName = brokerName;//by default, it will send to the same broker + if (topicPublishInfo != null) { //select one message queue accordingly, in order to determine which broker to send + MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName); + retryBrokerName = mqChosen.getBrokerName(); + } + String addr = instance.findBrokerAddressInPublish(retryBrokerName); log.info("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr, - tmpmq.getBrokerName()); + retryBrokerName); try { request.setOpaque(RemotingCommand.createNewRequestId()); - sendMessageAsync(addr, tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, + sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, context, producer); } catch (InterruptedException e1) { - onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, + onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, false, producer); } catch (RemotingConnectException e1) { producer.updateFaultItem(brokerName, 3000, true); - onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, + onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, true, producer); } catch (RemotingTooMuchRequestException e1) { - onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, + onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, false, producer); } catch (RemotingException e1) { producer.updateFaultItem(brokerName, 3000, true); - onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, + onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, true, producer); } } else { + if (context != null) { context.setException(e); context.getProducer().executeSendMessageHookAfter(context); } + try { sendCallback.onException(e); } catch (Exception ignored) { -- GitLab