From c47978d62f07c0d7864d4567cf4a8528466ba5df Mon Sep 17 00:00:00 2001 From: huzongtang Date: Sat, 2 Feb 2019 15:22:47 +0800 Subject: [PATCH] [ISSUE #747]Optimize and adjust codes on Snode. --- .../apache/rocketmq/common}/SnodeConfig.java | 8 +++--- .../rocketmq/snode/SnodeController.java | 2 +- .../apache/rocketmq/snode/SnodeStartup.java | 26 +++++++++++++++++-- .../client/ClientHousekeepingService.java | 2 +- .../processor/ConsumerManageProcessor.java | 2 +- .../DefaultMqttMessageProcessor.java | 4 +++ .../rocketmq/snode/service/NnodeService.java | 2 +- .../snode/service/impl/NnodeServiceImpl.java | 2 +- .../service/impl/ScheduledServiceImpl.java | 2 +- .../rocketmq/snode/SnodeControllerTest.java | 2 +- .../DefaultMqttMessageProcessorTest.java | 2 +- .../MqttConnectMessageHandlerTest.java | 2 +- .../processor/SendMessageProcessorTest.java | 2 +- .../snode/service/EnodeServiceImplTest.java | 2 +- .../snode/service/NnodeServiceImplTest.java | 2 +- .../service/SlowConsumerServiceImplTest.java | 2 +- 16 files changed, 45 insertions(+), 19 deletions(-) rename {snode/src/main/java/org/apache/rocketmq/snode/config => common/src/main/java/org/apache/rocketmq/common}/SnodeConfig.java (98%) diff --git a/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java b/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java similarity index 98% rename from snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java rename to common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java index 6c51fadb..fd3b299b 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java @@ -14,21 +14,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.snode.config; +package org.apache.rocketmq.common; import java.net.InetAddress; import java.net.UnknownHostException; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.annotation.ImportantField; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.common.RemotingUtil; -import static org.apache.rocketmq.client.ClientConfig.SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY; public class SnodeConfig { - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); + + public static final String SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY = "com.rocketmq.sendMessageWithVIPChannel"; private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java index 43b0b5f6..b47e3b54 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java @@ -24,6 +24,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.acl.AccessValidator; +import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.protocol.RequestCode; @@ -55,7 +56,6 @@ import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl; import org.apache.rocketmq.snode.client.impl.ProducerManagerImpl; import org.apache.rocketmq.snode.client.impl.SlowConsumerServiceImpl; import org.apache.rocketmq.snode.client.impl.SubscriptionManagerImpl; -import org.apache.rocketmq.snode.config.SnodeConfig; import org.apache.rocketmq.snode.offset.ConsumerOffsetManager; import org.apache.rocketmq.snode.processor.ConsumerManageProcessor; import org.apache.rocketmq.snode.processor.DefaultMqttMessageProcessor; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java index d6060a90..232def9c 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java @@ -26,11 +26,13 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; @@ -39,7 +41,6 @@ import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.remoting.common.TlsMode; import org.apache.rocketmq.remoting.netty.TlsSystemConfig; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.apache.rocketmq.snode.config.SnodeConfig; import org.apache.rocketmq.srvutil.ServerUtil; import org.slf4j.LoggerFactory; @@ -123,19 +124,31 @@ public class SnodeStartup { nettyClientConfig, snodeConfig); + boolean initResult = snodeController.initialize(); + if (!initResult) { + snodeController.shutdown(); + System.exit(-3); + } + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { private volatile boolean hasShutdown = false; + private AtomicInteger shutdownTimes = new AtomicInteger(0); @Override public void run() { synchronized (this) { + log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet()); + if (!this.hasShutdown) { this.hasShutdown = true; + long beginTime = System.currentTimeMillis(); snodeController.shutdown(); + long consumingTimeTotal = System.currentTimeMillis() - beginTime; + log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal); } } } - })); + },"ShutdownHook")); return snodeController; } @@ -143,6 +156,15 @@ public class SnodeStartup { Option opt = new Option("c", "configFile", true, "SNode config properties file"); opt.setRequired(false); options.addOption(opt); + + opt = new Option("p", "printConfigItem", false, "Print all config item"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("m", "printImportantConfig", false, "Print important config item"); + opt.setRequired(false); + options.addOption(opt); + return options; } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java b/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java index 1db3f9ce..910d5be1 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java @@ -29,7 +29,7 @@ import org.apache.rocketmq.snode.client.impl.ClientRole; import org.apache.rocketmq.snode.constant.SnodeConstant; public class ClientHousekeepingService implements ChannelEventListener { - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); private final ClientManager producerManager; private final ClientManager consumerManager; private final ClientManager iotClientManager; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java index 4cd54d6b..0bb06478 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java @@ -43,7 +43,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.snode.SnodeController; public class ConsumerManageProcessor implements RequestProcessor { - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); private final SnodeController snodeController; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java index 1a3ced1a..1ca672ff 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java @@ -28,6 +28,9 @@ import java.io.UnsupportedEncodingException; import java.nio.charset.Charset; import java.util.HashMap; import java.util.Map; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RequestProcessor; import org.apache.rocketmq.remoting.common.RemotingUtil; @@ -38,6 +41,7 @@ import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload; import org.apache.rocketmq.snode.processor.mqtthandler.MessageHandler; public class DefaultMqttMessageProcessor implements RequestProcessor { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); private Map type2handler = new HashMap<>(); private static final int MIN_AVAILABLE_VERSION = 3; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/NnodeService.java b/snode/src/main/java/org/apache/rocketmq/snode/service/NnodeService.java index 84f224b2..f5a66000 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/NnodeService.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/NnodeService.java @@ -18,12 +18,12 @@ package org.apache.rocketmq.snode.service; import java.util.Set; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; -import org.apache.rocketmq.snode.config.SnodeConfig; public interface NnodeService { /** diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java index 8dcdf0c6..57585552 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java @@ -22,6 +22,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.namesrv.TopAddressing; import org.apache.rocketmq.common.protocol.RequestCode; @@ -37,7 +38,6 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.config.SnodeConfig; import org.apache.rocketmq.snode.constant.SnodeConstant; import org.apache.rocketmq.snode.service.NnodeService; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java index 685af3f0..0e3479ac 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java @@ -20,6 +20,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData; @@ -28,7 +29,6 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.config.SnodeConfig; import org.apache.rocketmq.snode.service.ScheduledService; public class ScheduledServiceImpl implements ScheduledService { diff --git a/snode/src/test/java/org/apache/rocketmq/snode/SnodeControllerTest.java b/snode/src/test/java/org/apache/rocketmq/snode/SnodeControllerTest.java index ff5235b2..414f06bb 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/SnodeControllerTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/SnodeControllerTest.java @@ -16,9 +16,9 @@ */ package org.apache.rocketmq.snode; +import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.ServerConfig; -import org.apache.rocketmq.snode.config.SnodeConfig; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java b/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java index 2a2a4459..71863068 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java @@ -22,6 +22,7 @@ import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttQoS; import java.io.UnsupportedEncodingException; import java.nio.charset.Charset; +import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.RemotingChannel; @@ -33,7 +34,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload; import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.config.SnodeConfig; import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler; import org.junit.Before; import org.junit.Test; diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java b/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java index b610057f..8846ef1f 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java @@ -22,11 +22,11 @@ import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttQoS; +import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.config.SnodeConfig; import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler; import org.junit.Test; import org.junit.runner.RunWith; diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java b/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java index a7a26672..2573f027 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.snode.processor; import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2; @@ -28,7 +29,6 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.CodecHelper; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.config.SnodeConfig; import org.apache.rocketmq.snode.service.EnodeService; import org.apache.rocketmq.snode.service.NnodeService; import org.junit.Before; diff --git a/snode/src/test/java/org/apache/rocketmq/snode/service/EnodeServiceImplTest.java b/snode/src/test/java/org/apache/rocketmq/snode/service/EnodeServiceImplTest.java index c228a83f..4255c773 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/service/EnodeServiceImplTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/service/EnodeServiceImplTest.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.snode.service; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; @@ -30,7 +31,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient; import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.SnodeTestBase; -import org.apache.rocketmq.snode.config.SnodeConfig; import org.apache.rocketmq.snode.service.impl.EnodeServiceImpl; import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.GetMessageStatus; diff --git a/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java b/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java index 45a19dd8..51ff8ff5 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.snode.service; import java.util.ArrayList; import java.util.List; +import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.remoting.exception.RemotingConnectException; @@ -27,7 +28,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient; import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.SnodeTestBase; -import org.apache.rocketmq.snode.config.SnodeConfig; import org.apache.rocketmq.snode.service.impl.NnodeServiceImpl; import org.junit.Before; import org.junit.Test; diff --git a/snode/src/test/java/org/apache/rocketmq/snode/service/SlowConsumerServiceImplTest.java b/snode/src/test/java/org/apache/rocketmq/snode/service/SlowConsumerServiceImplTest.java index 6a25009e..5338a23d 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/service/SlowConsumerServiceImplTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/service/SlowConsumerServiceImplTest.java @@ -16,12 +16,12 @@ */ package org.apache.rocketmq.snode.service; +import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.client.SlowConsumerService; import org.apache.rocketmq.snode.client.impl.SlowConsumerServiceImpl; -import org.apache.rocketmq.snode.config.SnodeConfig; import org.apache.rocketmq.snode.offset.ConsumerOffsetManager; import org.junit.Before; import org.junit.Test; -- GitLab