From 8422e74fa5d1f6c8a29770e568d9bb228ac1f6c7 Mon Sep 17 00:00:00 2001 From: Li Zhanhui Date: Fri, 1 Jun 2018 10:04:56 +0800 Subject: [PATCH] Shutdown all thread pools when broker quits --- .../rocketmq/broker/out/BrokerOuterAPI.java | 1 + .../rocketmq/producer/AbstractOMSProducer.java | 16 +++++++++++++--- .../rocketmq/store/stats/BrokerStatsManager.java | 1 + 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index 262e2d2c..2825a34c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -80,6 +80,7 @@ public class BrokerOuterAPI { public void shutdown() { this.remotingClient.shutdown(); + this.brokerOuterExecutor.shutdown(); } public String fetchNameServerAddr() { diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java index 53fc0f90..3db85904 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java @@ -99,9 +99,19 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory { return new OMSTimeOutException("-1", String.format("Send message to broker timeout, %dms, Topic=%s, msgId=%s", this.rocketmqProducer.getSendMsgTimeout(), topic, msgId), e); } else if (e.getCause() instanceof MQBrokerException || e.getCause() instanceof RemotingConnectException) { - MQBrokerException brokerException = (MQBrokerException) e.getCause(); - return new OMSRuntimeException("-1", String.format("Received a broker exception, Topic=%s, msgId=%s, %s", - topic, msgId, brokerException.getErrorMessage()), e); + if (e.getCause() instanceof MQBrokerException) { + MQBrokerException brokerException = (MQBrokerException) e.getCause(); + return new OMSRuntimeException("-1", String.format("Received a broker exception, Topic=%s, msgId=%s, %s", + topic, msgId, brokerException.getErrorMessage()), e); + } + + if (e.getCause() instanceof RemotingConnectException) { + RemotingConnectException connectException = (RemotingConnectException)e.getCause(); + return new OMSRuntimeException("-1", + String.format("Network connection experiences failures. Topic=%s, msgId=%s, %s", + topic, msgId, connectException.getMessage()), + e); + } } } // Exception thrown by local. diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java index ac8ae3cb..4adbed76 100644 --- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java +++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java @@ -109,6 +109,7 @@ public class BrokerStatsManager { public void shutdown() { this.scheduledExecutorService.shutdown(); + this.commercialExecutor.shutdown(); } public StatsItem getStatsItem(final String statsName, final String statsKey) { -- GitLab