diff --git a/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java b/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java similarity index 98% rename from snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java rename to common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java index 6c51fadb765d43a9d02d450119d95144e2549f59..fd3b299b13e9ca3ac44f280ed0f0ca5e6978bcbf 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java @@ -14,21 +14,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.snode.config; +package org.apache.rocketmq.common; import java.net.InetAddress; import java.net.UnknownHostException; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.annotation.ImportantField; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.common.RemotingUtil; -import static org.apache.rocketmq.client.ClientConfig.SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY; public class SnodeConfig { - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); + + public static final String SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY = "com.rocketmq.sendMessageWithVIPChannel"; private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java index 43b0b5f62de8f8b0aa46cd3110c96e9407ead217..b47e3b544e25dedb8509abf72b8164faf0c3256f 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java @@ -24,6 +24,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.acl.AccessValidator; +import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.protocol.RequestCode; @@ -55,7 +56,6 @@ import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl; import org.apache.rocketmq.snode.client.impl.ProducerManagerImpl; import org.apache.rocketmq.snode.client.impl.SlowConsumerServiceImpl; import org.apache.rocketmq.snode.client.impl.SubscriptionManagerImpl; -import org.apache.rocketmq.snode.config.SnodeConfig; import org.apache.rocketmq.snode.offset.ConsumerOffsetManager; import org.apache.rocketmq.snode.processor.ConsumerManageProcessor; import org.apache.rocketmq.snode.processor.DefaultMqttMessageProcessor; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java index d6060a90c6a58262a4d1798fcba5e342e76e9ed1..232def9ca35ce031fa7c696bca75d9db62c49eb6 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java @@ -26,11 +26,13 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; @@ -39,7 +41,6 @@ import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.remoting.common.TlsMode; import org.apache.rocketmq.remoting.netty.TlsSystemConfig; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.apache.rocketmq.snode.config.SnodeConfig; import org.apache.rocketmq.srvutil.ServerUtil; import org.slf4j.LoggerFactory; @@ -123,19 +124,31 @@ public class SnodeStartup { nettyClientConfig, snodeConfig); + boolean initResult = snodeController.initialize(); + if (!initResult) { + snodeController.shutdown(); + System.exit(-3); + } + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { private volatile boolean hasShutdown = false; + private AtomicInteger shutdownTimes = new AtomicInteger(0); @Override public void run() { synchronized (this) { + log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet()); + if (!this.hasShutdown) { this.hasShutdown = true; + long beginTime = System.currentTimeMillis(); snodeController.shutdown(); + long consumingTimeTotal = System.currentTimeMillis() - beginTime; + log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal); } } } - })); + },"ShutdownHook")); return snodeController; } @@ -143,6 +156,15 @@ public class SnodeStartup { Option opt = new Option("c", "configFile", true, "SNode config properties file"); opt.setRequired(false); options.addOption(opt); + + opt = new Option("p", "printConfigItem", false, "Print all config item"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("m", "printImportantConfig", false, "Print important config item"); + opt.setRequired(false); + options.addOption(opt); + return options; } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java b/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java index 1db3f9cedf2ad2407aa55002deca480b0ca9b770..910d5be189044b0ef5b8ca0c9f277798710f8ac9 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java @@ -29,7 +29,7 @@ import org.apache.rocketmq.snode.client.impl.ClientRole; import org.apache.rocketmq.snode.constant.SnodeConstant; public class ClientHousekeepingService implements ChannelEventListener { - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); private final ClientManager producerManager; private final ClientManager consumerManager; private final ClientManager iotClientManager; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java index 4cd54d6ba2b9fdf0fa07e0794da4913ca9d8722b..0bb064787b4affbf8e96ffff7bed0c04af3e1fd4 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java @@ -43,7 +43,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.snode.SnodeController; public class ConsumerManageProcessor implements RequestProcessor { - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); private final SnodeController snodeController; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java index 1a3ced1ac10fc83ef03215187d10c44ba9dbf0d2..1ca672ffc12006567601d39a9f943422a5746233 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java @@ -28,6 +28,9 @@ import java.io.UnsupportedEncodingException; import java.nio.charset.Charset; import java.util.HashMap; import java.util.Map; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RequestProcessor; import org.apache.rocketmq.remoting.common.RemotingUtil; @@ -38,6 +41,7 @@ import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload; import org.apache.rocketmq.snode.processor.mqtthandler.MessageHandler; public class DefaultMqttMessageProcessor implements RequestProcessor { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); private Map type2handler = new HashMap<>(); private static final int MIN_AVAILABLE_VERSION = 3; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/NnodeService.java b/snode/src/main/java/org/apache/rocketmq/snode/service/NnodeService.java index 84f224b234a6743ed3a4e88c739284da8c3b6d55..f5a66000c0565903f726e922598e346b0bf0078f 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/NnodeService.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/NnodeService.java @@ -18,12 +18,12 @@ package org.apache.rocketmq.snode.service; import java.util.Set; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; -import org.apache.rocketmq.snode.config.SnodeConfig; public interface NnodeService { /** diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java index 8dcdf0c61dba53ebe9fd71455dd7679c2cda9098..575855523c87eedc64b9a181c40f7a33f70b483f 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java @@ -22,6 +22,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.namesrv.TopAddressing; import org.apache.rocketmq.common.protocol.RequestCode; @@ -37,7 +38,6 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.config.SnodeConfig; import org.apache.rocketmq.snode.constant.SnodeConstant; import org.apache.rocketmq.snode.service.NnodeService; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java index 685af3f05abb2533772dcc5613d59a7b65f2dd0c..0e3479aca81d3a78affb825337d04cbc2e1b0acc 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java @@ -20,6 +20,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData; @@ -28,7 +29,6 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.config.SnodeConfig; import org.apache.rocketmq.snode.service.ScheduledService; public class ScheduledServiceImpl implements ScheduledService { diff --git a/snode/src/test/java/org/apache/rocketmq/snode/SnodeControllerTest.java b/snode/src/test/java/org/apache/rocketmq/snode/SnodeControllerTest.java index ff5235b2dff234aa318911628175537282d097df..414f06bb9e8220569ddb5d23f8b9262d82783b9e 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/SnodeControllerTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/SnodeControllerTest.java @@ -16,9 +16,9 @@ */ package org.apache.rocketmq.snode; +import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.ServerConfig; -import org.apache.rocketmq.snode.config.SnodeConfig; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java b/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java index 2a2a4459c28f3103c12ef8f9e9e4bab1652479fa..71863068be97c02aa1e90989e63c288a821a91dc 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java @@ -22,6 +22,7 @@ import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttQoS; import java.io.UnsupportedEncodingException; import java.nio.charset.Charset; +import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.RemotingChannel; @@ -33,7 +34,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload; import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.config.SnodeConfig; import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler; import org.junit.Before; import org.junit.Test; diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java b/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java index b610057fecfd0c2abccc67eac1a7bb3da447ce5e..8846ef1f1d8cf4ccc9d5675e8fff8b87107fa6ed 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java @@ -22,11 +22,11 @@ import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttQoS; +import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.config.SnodeConfig; import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler; import org.junit.Test; import org.junit.runner.RunWith; diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java b/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java index a7a26672bd89e0303970e8374b602aa69826b1ab..2573f0272f24b5317b4a2d5c1afa95fac9707737 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.snode.processor; import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2; @@ -28,7 +29,6 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.CodecHelper; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.config.SnodeConfig; import org.apache.rocketmq.snode.service.EnodeService; import org.apache.rocketmq.snode.service.NnodeService; import org.junit.Before; diff --git a/snode/src/test/java/org/apache/rocketmq/snode/service/EnodeServiceImplTest.java b/snode/src/test/java/org/apache/rocketmq/snode/service/EnodeServiceImplTest.java index c228a83f733c48837c8ff242562b85189dac920e..4255c773afb1a34f2213c06036fcdd6c4a3a053b 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/service/EnodeServiceImplTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/service/EnodeServiceImplTest.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.snode.service; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; @@ -30,7 +31,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient; import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.SnodeTestBase; -import org.apache.rocketmq.snode.config.SnodeConfig; import org.apache.rocketmq.snode.service.impl.EnodeServiceImpl; import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.GetMessageStatus; diff --git a/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java b/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java index 45a19dd8fce8e0cd9214b9542121a340b947f875..51ff8ff53754d9721478c18cc1b19d68e6ff0e2e 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.snode.service; import java.util.ArrayList; import java.util.List; +import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.remoting.exception.RemotingConnectException; @@ -27,7 +28,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient; import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.SnodeTestBase; -import org.apache.rocketmq.snode.config.SnodeConfig; import org.apache.rocketmq.snode.service.impl.NnodeServiceImpl; import org.junit.Before; import org.junit.Test; diff --git a/snode/src/test/java/org/apache/rocketmq/snode/service/SlowConsumerServiceImplTest.java b/snode/src/test/java/org/apache/rocketmq/snode/service/SlowConsumerServiceImplTest.java index 6a25009e04f8db0ef02a8e84fe81993359b602e4..5338a23dec28c55c687d787794e27d63b00f3c66 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/service/SlowConsumerServiceImplTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/service/SlowConsumerServiceImplTest.java @@ -16,12 +16,12 @@ */ package org.apache.rocketmq.snode.service; +import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.client.SlowConsumerService; import org.apache.rocketmq.snode.client.impl.SlowConsumerServiceImpl; -import org.apache.rocketmq.snode.config.SnodeConfig; import org.apache.rocketmq.snode.offset.ConsumerOffsetManager; import org.junit.Before; import org.junit.Test;