diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index 2495d9d6e585de9f5cb9fbc0e831fb33fab90a1c..a237bf61acfa93bf10d39cbf071ed19032d2c991 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -209,7 +209,7 @@ public class BrokerOuterAPI { break; } - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQBrokerException(response.getCode(), response.getRemark(), requestHeader == null ? null : requestHeader.getBrokerAddr()); } public void unregisterBrokerAll( @@ -255,7 +255,7 @@ public class BrokerOuterAPI { break; } - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQBrokerException(response.getCode(), response.getRemark(), brokerAddr); } public List needRegister( @@ -338,7 +338,7 @@ public class BrokerOuterAPI { break; } - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } public ConsumerOffsetSerializeWrapper getAllConsumerOffset( @@ -355,7 +355,7 @@ public class BrokerOuterAPI { break; } - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } public String getAllDelayOffset( @@ -372,7 +372,7 @@ public class BrokerOuterAPI { break; } - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } public SubscriptionGroupWrapper getAllSubscriptionGroupConfig( @@ -389,7 +389,7 @@ public class BrokerOuterAPI { break; } - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } public void registerRPCHook(RPCHook rpcHook) { diff --git a/client/src/main/java/org/apache/rocketmq/client/exception/MQBrokerException.java b/client/src/main/java/org/apache/rocketmq/client/exception/MQBrokerException.java index e4f2c8df85022b66f6345cd7c28edff8af44d6aa..f07a38b81f0a3689015eed3ac63f6f2fb12cb457 100644 --- a/client/src/main/java/org/apache/rocketmq/client/exception/MQBrokerException.java +++ b/client/src/main/java/org/apache/rocketmq/client/exception/MQBrokerException.java @@ -23,12 +23,22 @@ public class MQBrokerException extends Exception { private static final long serialVersionUID = 5975020272601250368L; private final int responseCode; private final String errorMessage; + private final String brokerAddr; public MQBrokerException(int responseCode, String errorMessage) { super(FAQUrl.attachDefaultURL("CODE: " + UtilAll.responseCode2String(responseCode) + " DESC: " - + errorMessage)); + + errorMessage)); this.responseCode = responseCode; this.errorMessage = errorMessage; + this.brokerAddr = null; + } + + public MQBrokerException(int responseCode, String errorMessage, String brokerAddr) { + super(FAQUrl.attachDefaultURL("CODE: " + UtilAll.responseCode2String(responseCode) + " DESC: " + + errorMessage + (brokerAddr != null ? " BROKER: " + brokerAddr : ""))); + this.responseCode = responseCode; + this.errorMessage = errorMessage; + this.brokerAddr = brokerAddr; } public int getResponseCode() { @@ -38,4 +48,8 @@ public class MQBrokerException extends Exception { public String getErrorMessage() { return errorMessage; } + + public String getBrokerAddr() { + return brokerAddr; + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index c64d7c568ec4c250fdee1c6c5749f65ca9d3d3fb..7a4d55654105b7449bebe4ca4a7eb17933466c1c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -390,7 +390,7 @@ public class MQClientAPIImpl { break; } - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } @@ -414,7 +414,7 @@ public class MQClientAPIImpl { default: break; } - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } @@ -502,7 +502,7 @@ public class MQClientAPIImpl { ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); assert response != null; - return this.processSendResponse(brokerName, msg, response); + return this.processSendResponse(brokerName, msg, response,addr); } private void sendMessageAsync( @@ -528,7 +528,7 @@ public class MQClientAPIImpl { if (null == sendCallback && response != null) { try { - SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response); + SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr); if (context != null && sendResult != null) { context.setSendResult(sendResult); context.getProducer().executeSendMessageHookAfter(context); @@ -542,7 +542,7 @@ public class MQClientAPIImpl { if (response != null) { try { - SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response); + SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr); assert sendResult != null; if (context != null) { context.setSendResult(sendResult); @@ -641,7 +641,8 @@ public class MQClientAPIImpl { private SendResult processSendResponse( final String brokerName, final Message msg, - final RemotingCommand response + final RemotingCommand response, + final String addr ) throws MQBrokerException, RemotingCommandException { SendStatus sendStatus; switch (response.getCode()) { @@ -662,7 +663,7 @@ public class MQClientAPIImpl { break; } default: { - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } } @@ -741,7 +742,7 @@ public class MQClientAPIImpl { RemotingCommand response = responseFuture.getResponseCommand(); if (response != null) { try { - PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response); + PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response, addr); assert pullResult != null; pullCallback.onSuccess(pullResult); } catch (Exception e) { @@ -768,11 +769,12 @@ public class MQClientAPIImpl { ) throws RemotingException, InterruptedException, MQBrokerException { RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); assert response != null; - return this.processPullResponse(response); + return this.processPullResponse(response, addr); } private PullResult processPullResponse( - final RemotingCommand response) throws MQBrokerException, RemotingCommandException { + final RemotingCommand response, + final String addr) throws MQBrokerException, RemotingCommandException { PullStatus pullStatus = PullStatus.NO_NEW_MSG; switch (response.getCode()) { case ResponseCode.SUCCESS: @@ -789,7 +791,7 @@ public class MQClientAPIImpl { break; default: - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } PullMessageResponseHeader responseHeader = @@ -822,7 +824,7 @@ public class MQClientAPIImpl { break; } - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } public long searchOffset(final String addr, final String topic, final int queueId, final long timestamp, @@ -847,7 +849,7 @@ public class MQClientAPIImpl { break; } - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } public long getMaxOffset(final String addr, final String topic, final int queueId, final long timeoutMillis) @@ -871,7 +873,7 @@ public class MQClientAPIImpl { break; } - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } public List getConsumerIdListByGroup( @@ -898,7 +900,7 @@ public class MQClientAPIImpl { break; } - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } public long getMinOffset(final String addr, final String topic, final int queueId, final long timeoutMillis) @@ -922,7 +924,7 @@ public class MQClientAPIImpl { break; } - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } public long getEarliestMsgStoretime(final String addr, final String topic, final int queueId, @@ -947,7 +949,7 @@ public class MQClientAPIImpl { break; } - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } public long queryConsumerOffset( @@ -971,7 +973,7 @@ public class MQClientAPIImpl { break; } - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } public void updateConsumerOffset( @@ -992,7 +994,7 @@ public class MQClientAPIImpl { break; } - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } public void updateConsumerOffsetOneway( @@ -1024,7 +1026,7 @@ public class MQClientAPIImpl { break; } - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } public void unregisterClient( @@ -1050,7 +1052,7 @@ public class MQClientAPIImpl { break; } - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } public void endTransactionOneway( @@ -1116,7 +1118,7 @@ public class MQClientAPIImpl { break; } - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } public Set lockBatchMQ( @@ -1138,7 +1140,7 @@ public class MQClientAPIImpl { break; } - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } public void unlockBatchMQ( @@ -1164,7 +1166,7 @@ public class MQClientAPIImpl { break; } - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } } @@ -1187,7 +1189,7 @@ public class MQClientAPIImpl { break; } - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final long timeoutMillis) @@ -1217,7 +1219,7 @@ public class MQClientAPIImpl { break; } - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } public ProducerConnection getProducerConnectionList(final String addr, final String producerGroup, @@ -1239,7 +1241,7 @@ public class MQClientAPIImpl { break; } - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } public ConsumerConnection getConsumerConnectionList(final String addr, final String consumerGroup, @@ -1261,7 +1263,7 @@ public class MQClientAPIImpl { break; } - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } public KVTable getBrokerRuntimeInfo(final String addr, final long timeoutMillis) throws RemotingConnectException, @@ -1279,7 +1281,7 @@ public class MQClientAPIImpl { break; } - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } public void updateBrokerConfig(final String addr, final Properties properties, final long timeoutMillis) @@ -1301,7 +1303,7 @@ public class MQClientAPIImpl { break; } - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } } @@ -1320,7 +1322,7 @@ public class MQClientAPIImpl { break; } - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } public ClusterInfo getBrokerClusterInfo( @@ -1670,7 +1672,7 @@ public class MQClientAPIImpl { break; } - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } public List queryConsumeTimeSpan(final String addr, final String topic, final String group, @@ -1694,7 +1696,7 @@ public class MQClientAPIImpl { break; } - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } public TopicList getTopicsByCluster(final String cluster, final long timeoutMillis) @@ -1745,7 +1747,7 @@ public class MQClientAPIImpl { break; } - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } public TopicList getSystemTopicList( @@ -2108,7 +2110,7 @@ public class MQClientAPIImpl { default: break; } - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQBrokerException(response.getCode(), response.getRemark(), brokerAddr); } public TopicConfigSerializeWrapper getAllTopicConfig(final String addr, @@ -2127,7 +2129,7 @@ public class MQClientAPIImpl { break; } - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } public void updateNameServerConfig(final Properties properties, final List nameServers, long timeoutMillis) diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java index 64d64f29a9489fbc77577c71170e552718be3263..f762910a40118cd7b99c9c5c943d5a9ba226954b 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java @@ -84,7 +84,7 @@ public class RemoteBrokerOffsetStoreTest { offsetStore.updateOffset(messageQueue, 1024, false); - doThrow(new MQBrokerException(-1, "")) + doThrow(new MQBrokerException(-1, "", null)) .when(mqClientAPI).queryConsumerOffset(anyString(), any(QueryConsumerOffsetRequestHeader.class), anyLong()); assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE)).isEqualTo(-1);