提交 c47978d6 编写于 作者: H huzongtang

[ISSUE #747]Optimize and adjust codes on Snode.

上级 e6d0afbf
......@@ -14,21 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.config;
package org.apache.rocketmq.common;
import java.net.InetAddress;
import java.net.UnknownHostException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import static org.apache.rocketmq.client.ClientConfig.SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY;
public class SnodeConfig {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
public static final String SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY = "com.rocketmq.sendMessageWithVIPChannel";
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
......
......@@ -24,6 +24,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode;
......@@ -55,7 +56,6 @@ import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl;
import org.apache.rocketmq.snode.client.impl.ProducerManagerImpl;
import org.apache.rocketmq.snode.client.impl.SlowConsumerServiceImpl;
import org.apache.rocketmq.snode.client.impl.SubscriptionManagerImpl;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.offset.ConsumerOffsetManager;
import org.apache.rocketmq.snode.processor.ConsumerManageProcessor;
import org.apache.rocketmq.snode.processor.DefaultMqttMessageProcessor;
......
......@@ -26,11 +26,13 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
......@@ -39,7 +41,6 @@ import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.slf4j.LoggerFactory;
......@@ -123,19 +124,31 @@ public class SnodeStartup {
nettyClientConfig,
snodeConfig);
boolean initResult = snodeController.initialize();
if (!initResult) {
snodeController.shutdown();
System.exit(-3);
}
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
private AtomicInteger shutdownTimes = new AtomicInteger(0);
@Override
public void run() {
synchronized (this) {
log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
if (!this.hasShutdown) {
this.hasShutdown = true;
long beginTime = System.currentTimeMillis();
snodeController.shutdown();
long consumingTimeTotal = System.currentTimeMillis() - beginTime;
log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
}
}
}
}));
},"ShutdownHook"));
return snodeController;
}
......@@ -143,6 +156,15 @@ public class SnodeStartup {
Option opt = new Option("c", "configFile", true, "SNode config properties file");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("p", "printConfigItem", false, "Print all config item");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("m", "printImportantConfig", false, "Print important config item");
opt.setRequired(false);
options.addOption(opt);
return options;
}
}
......
......@@ -29,7 +29,7 @@ import org.apache.rocketmq.snode.client.impl.ClientRole;
import org.apache.rocketmq.snode.constant.SnodeConstant;
public class ClientHousekeepingService implements ChannelEventListener {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private final ClientManager producerManager;
private final ClientManager consumerManager;
private final ClientManager iotClientManager;
......
......@@ -43,7 +43,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
public class ConsumerManageProcessor implements RequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private final SnodeController snodeController;
......
......@@ -28,6 +28,9 @@ import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.common.RemotingUtil;
......@@ -38,6 +41,7 @@ import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload;
import org.apache.rocketmq.snode.processor.mqtthandler.MessageHandler;
public class DefaultMqttMessageProcessor implements RequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private Map<MqttMessageType, MessageHandler> type2handler = new HashMap<>();
private static final int MIN_AVAILABLE_VERSION = 3;
......
......@@ -18,12 +18,12 @@ package org.apache.rocketmq.snode.service;
import java.util.Set;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.snode.config.SnodeConfig;
public interface NnodeService {
/**
......
......@@ -22,6 +22,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.protocol.RequestCode;
......@@ -37,7 +38,6 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.constant.SnodeConstant;
import org.apache.rocketmq.snode.service.NnodeService;
......
......@@ -20,6 +20,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
......@@ -28,7 +29,6 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.service.ScheduledService;
public class ScheduledServiceImpl implements ScheduledService {
......
......@@ -16,9 +16,9 @@
*/
package org.apache.rocketmq.snode;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
......
......@@ -22,6 +22,7 @@ import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.RemotingChannel;
......@@ -33,7 +34,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler;
import org.junit.Before;
import org.junit.Test;
......
......@@ -22,11 +22,11 @@ import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler;
import org.junit.Test;
import org.junit.runner.RunWith;
......
......@@ -18,6 +18,7 @@ package org.apache.rocketmq.snode.processor;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
......@@ -28,7 +29,6 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.service.EnodeService;
import org.apache.rocketmq.snode.service.NnodeService;
import org.junit.Before;
......
......@@ -18,6 +18,7 @@ package org.apache.rocketmq.snode.service;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
......@@ -30,7 +31,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.SnodeTestBase;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.service.impl.EnodeServiceImpl;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
......
......@@ -18,6 +18,7 @@ package org.apache.rocketmq.snode.service;
import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
......@@ -27,7 +28,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.SnodeTestBase;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.service.impl.NnodeServiceImpl;
import org.junit.Before;
import org.junit.Test;
......
......@@ -16,12 +16,12 @@
*/
package org.apache.rocketmq.snode.service;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.SlowConsumerService;
import org.apache.rocketmq.snode.client.impl.SlowConsumerServiceImpl;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.offset.ConsumerOffsetManager;
import org.junit.Before;
import org.junit.Test;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册