diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index 262e2d2c911ff0ae7db640f92a1db982793fe870..2825a34cd725416cfaaa030fd9a88d2a2051f8b1 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -80,6 +80,7 @@ public class BrokerOuterAPI { public void shutdown() { this.remotingClient.shutdown(); + this.brokerOuterExecutor.shutdown(); } public String fetchNameServerAddr() { diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java index 53fc0f9009f4399f76c3c4936daea607a04f5b05..3db859048f6d0eb056708427af36a2a00a6a3dc2 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java @@ -99,9 +99,19 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory { return new OMSTimeOutException("-1", String.format("Send message to broker timeout, %dms, Topic=%s, msgId=%s", this.rocketmqProducer.getSendMsgTimeout(), topic, msgId), e); } else if (e.getCause() instanceof MQBrokerException || e.getCause() instanceof RemotingConnectException) { - MQBrokerException brokerException = (MQBrokerException) e.getCause(); - return new OMSRuntimeException("-1", String.format("Received a broker exception, Topic=%s, msgId=%s, %s", - topic, msgId, brokerException.getErrorMessage()), e); + if (e.getCause() instanceof MQBrokerException) { + MQBrokerException brokerException = (MQBrokerException) e.getCause(); + return new OMSRuntimeException("-1", String.format("Received a broker exception, Topic=%s, msgId=%s, %s", + topic, msgId, brokerException.getErrorMessage()), e); + } + + if (e.getCause() instanceof RemotingConnectException) { + RemotingConnectException connectException = (RemotingConnectException)e.getCause(); + return new OMSRuntimeException("-1", + String.format("Network connection experiences failures. Topic=%s, msgId=%s, %s", + topic, msgId, connectException.getMessage()), + e); + } } } // Exception thrown by local. diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java index ac8ae3cb6457234376aa1c91230a41472530b92d..4adbed76abc7513afd3e4428872d5d3d490f23a9 100644 --- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java +++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java @@ -109,6 +109,7 @@ public class BrokerStatsManager { public void shutdown() { this.scheduledExecutorService.shutdown(); + this.commercialExecutor.shutdown(); } public StatsItem getStatsItem(final String statsName, final String statsKey) {