diff --git a/common/src/main/java/org/apache/rocketmq/common/MqttConfig.java b/common/src/main/java/org/apache/rocketmq/common/MqttConfig.java
index 270a74c231defb73755dc17fd606097432c6d0b1..fc9128d15f31d95eec218f6c4521c217cad1eb3d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MqttConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MqttConfig.java
@@ -16,32 +16,32 @@
*/
package org.apache.rocketmq.common;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.ClientConfig;
+import org.apache.rocketmq.remoting.ServerConfig;
public class MqttConfig {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
- private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
+ private ServerConfig mqttServerConfig;
- private int snodeHandleMqttThreadPoolQueueCapacity = 10000;
+ private ClientConfig mqttClientConfig;
- private int snodeHandleMqttMessageMinPoolSize = 10;
+ private int handleMqttThreadPoolQueueCapacity = 10000;
- private int snodeHandleMqttMessageMaxPoolSize = 20;
+ private int handleMqttMessageMinPoolSize = 10;
- private long houseKeepingInterval = 10 * 1000;
+ private int handleMqttMessageMaxPoolSize = 20;
- private int snodePushMqttMessageMinPoolSize = 10;
+ private int pushMqttMessageMinPoolSize = 10;
- private int snodePushMqttMessageMaxPoolSize = 20;
+ private int pushMqttMessageMaxPoolSize = 20;
- private int snodePushMqttMessageThreadPoolQueueCapacity = 10000;
+ private int pushMqttMessageThreadPoolQueueCapacity = 10000;
private int listenPort = 1883;
/**
@@ -50,119 +50,84 @@ public class MqttConfig {
@ImportantField
private boolean aclEnable = false;
- public long getHouseKeepingInterval() {
- return houseKeepingInterval;
- }
-
- public void setHouseKeepingInterval(long houseKeepingInterval) {
- this.houseKeepingInterval = houseKeepingInterval;
- }
-
- /**
- * This configurable item defines interval of topics registration of broker to name server. Allowing values are
- * between 10, 000 and 60, 000 milliseconds.
- */
- private int registerNameServerPeriod = 1000 * 30;
-
- public int getRegisterNameServerPeriod() {
- return registerNameServerPeriod;
- }
-
- public void setRegisterNameServerPeriod(int registerNameServerPeriod) {
- this.registerNameServerPeriod = registerNameServerPeriod;
- }
-
- @ImportantField
- private boolean fetchNamesrvAddrByAddressServer = false;
-
- public static String localHostName() {
- try {
- return InetAddress.getLocalHost().getHostName();
- } catch (UnknownHostException e) {
- log.error("Failed to obtain the host name", e);
- }
-
- return "DEFAULT_SNODE";
+ public int getListenPort() {
+ return listenPort;
}
- public boolean isFetchNamesrvAddrByAddressServer() {
- return fetchNamesrvAddrByAddressServer;
+ public void setListenPort(int listenPort) {
+ this.listenPort = listenPort;
}
- public void setFetchNamesrvAddrByAddressServer(boolean fetchNamesrvAddrByAddressServer) {
- this.fetchNamesrvAddrByAddressServer = fetchNamesrvAddrByAddressServer;
- }
- public int getListenPort() {
- return listenPort;
+ public boolean isAclEnable() {
+ return aclEnable;
}
- public String getRocketmqHome() {
- return rocketmqHome;
+ public void setAclEnable(boolean aclEnable) {
+ this.aclEnable = aclEnable;
}
- public void setRocketmqHome(String rocketmqHome) {
- this.rocketmqHome = rocketmqHome;
+ public ServerConfig getMqttServerConfig() {
+ return mqttServerConfig;
}
- public void setListenPort(int listenPort) {
- this.listenPort = listenPort;
+ public void setMqttServerConfig(ServerConfig mqttServerConfig) {
+ this.mqttServerConfig = mqttServerConfig;
}
- public int getSnodeHandleMqttThreadPoolQueueCapacity() {
- return snodeHandleMqttThreadPoolQueueCapacity;
+ public ClientConfig getMqttClientConfig() {
+ return mqttClientConfig;
}
- public void setSnodeHandleMqttThreadPoolQueueCapacity(int snodeHandleMqttThreadPoolQueueCapacity) {
- this.snodeHandleMqttThreadPoolQueueCapacity = snodeHandleMqttThreadPoolQueueCapacity;
+ public void setMqttClientConfig(ClientConfig mqttClientConfig) {
+ this.mqttClientConfig = mqttClientConfig;
}
- public int getSnodeHandleMqttMessageMinPoolSize() {
- return snodeHandleMqttMessageMinPoolSize;
+ public int getHandleMqttThreadPoolQueueCapacity() {
+ return handleMqttThreadPoolQueueCapacity;
}
- public void setSnodeHandleMqttMessageMinPoolSize(int snodeHandleMqttMessageMinPoolSize) {
- this.snodeHandleMqttMessageMinPoolSize = snodeHandleMqttMessageMinPoolSize;
+ public void setHandleMqttThreadPoolQueueCapacity(int handleMqttThreadPoolQueueCapacity) {
+ this.handleMqttThreadPoolQueueCapacity = handleMqttThreadPoolQueueCapacity;
}
- public int getSnodeHandleMqttMessageMaxPoolSize() {
- return snodeHandleMqttMessageMaxPoolSize;
+ public int getHandleMqttMessageMinPoolSize() {
+ return handleMqttMessageMinPoolSize;
}
- public void setSnodeHandleMqttMessageMaxPoolSize(int snodeHandleMqttMessageMaxPoolSize) {
- this.snodeHandleMqttMessageMaxPoolSize = snodeHandleMqttMessageMaxPoolSize;
+ public void setHandleMqttMessageMinPoolSize(int handleMqttMessageMinPoolSize) {
+ this.handleMqttMessageMinPoolSize = handleMqttMessageMinPoolSize;
}
- public int getSnodePushMqttMessageMinPoolSize() {
- return snodePushMqttMessageMinPoolSize;
+ public int getHandleMqttMessageMaxPoolSize() {
+ return handleMqttMessageMaxPoolSize;
}
- public void setSnodePushMqttMessageMinPoolSize(int snodePushMqttMessageMinPoolSize) {
- this.snodePushMqttMessageMinPoolSize = snodePushMqttMessageMinPoolSize;
+ public void setHandleMqttMessageMaxPoolSize(int handleMqttMessageMaxPoolSize) {
+ this.handleMqttMessageMaxPoolSize = handleMqttMessageMaxPoolSize;
}
- public int getSnodePushMqttMessageMaxPoolSize() {
- return snodePushMqttMessageMaxPoolSize;
+ public int getPushMqttMessageMinPoolSize() {
+ return pushMqttMessageMinPoolSize;
}
- public void setSnodePushMqttMessageMaxPoolSize(int snodePushMqttMessageMaxPoolSize) {
- this.snodePushMqttMessageMaxPoolSize = snodePushMqttMessageMaxPoolSize;
+ public void setPushMqttMessageMinPoolSize(int pushMqttMessageMinPoolSize) {
+ this.pushMqttMessageMinPoolSize = pushMqttMessageMinPoolSize;
}
- public int getSnodePushMqttMessageThreadPoolQueueCapacity() {
- return snodePushMqttMessageThreadPoolQueueCapacity;
+ public int getPushMqttMessageMaxPoolSize() {
+ return pushMqttMessageMaxPoolSize;
}
- public void setSnodePushMqttMessageThreadPoolQueueCapacity(int snodePushMqttMessageThreadPoolQueueCapacity) {
- this.snodePushMqttMessageThreadPoolQueueCapacity = snodePushMqttMessageThreadPoolQueueCapacity;
+ public void setPushMqttMessageMaxPoolSize(int pushMqttMessageMaxPoolSize) {
+ this.pushMqttMessageMaxPoolSize = pushMqttMessageMaxPoolSize;
}
- public boolean isAclEnable() {
- return aclEnable;
+ public int getPushMqttMessageThreadPoolQueueCapacity() {
+ return pushMqttMessageThreadPoolQueueCapacity;
}
- public void setAclEnable(boolean aclEnable) {
- this.aclEnable = aclEnable;
+ public void setPushMqttMessageThreadPoolQueueCapacity(int pushMqttMessageThreadPoolQueueCapacity) {
+ this.pushMqttMessageThreadPoolQueueCapacity = pushMqttMessageThreadPoolQueueCapacity;
}
-
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java b/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java
index fc1b7e6a309833de0ca19c5d0320fe7d75ab9729..831219bddc70450ed6fdf74da76b556577c08a73 100644
--- a/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java
@@ -60,12 +60,6 @@ public class SnodeConfig {
private int snodeSendMessageMaxPoolSize = 20;
- private int snodeHandleMqttThreadPoolQueueCapacity = 10000;
-
- private int snodeHandleMqttMessageMinPoolSize = 10;
-
- private int snodeHandleMqttMessageMaxPoolSize = 20;
-
private int snodeHeartBeatCorePoolSize = 1;
private int snodeHeartBeatMaxPoolSize = 2;
@@ -88,12 +82,6 @@ public class SnodeConfig {
private int snodePushMessageThreadPoolQueueCapacity = 10000;
- private int snodePushMqttMessageMinPoolSize = 10;
-
- private int snodePushMqttMessageMaxPoolSize = 20;
-
- private int snodePushMqttMessageThreadPoolQueueCapacity = 10000;
-
private int slowConsumerThreshold = 1024;
private final String sendMessageInterceptorPath = "META-INF/service/org.apache.rocketmq.snode.interceptor.SendMessageInterceptor";
@@ -277,30 +265,6 @@ public class SnodeConfig {
this.snodeId = snodeId;
}
- public int getSnodeHandleMqttThreadPoolQueueCapacity() {
- return snodeHandleMqttThreadPoolQueueCapacity;
- }
-
- public void setSnodeHandleMqttThreadPoolQueueCapacity(int snodeHandleMqttThreadPoolQueueCapacity) {
- this.snodeHandleMqttThreadPoolQueueCapacity = snodeHandleMqttThreadPoolQueueCapacity;
- }
-
- public int getSnodeHandleMqttMessageMinPoolSize() {
- return snodeHandleMqttMessageMinPoolSize;
- }
-
- public void setSnodeHandleMqttMessageMinPoolSize(int snodeHandleMqttMessageMinPoolSize) {
- this.snodeHandleMqttMessageMinPoolSize = snodeHandleMqttMessageMinPoolSize;
- }
-
- public int getSnodeHandleMqttMessageMaxPoolSize() {
- return snodeHandleMqttMessageMaxPoolSize;
- }
-
- public void setSnodeHandleMqttMessageMaxPoolSize(int snodeHandleMqttMessageMaxPoolSize) {
- this.snodeHandleMqttMessageMaxPoolSize = snodeHandleMqttMessageMaxPoolSize;
- }
-
public String getSnodeName() {
return snodeName;
}
@@ -365,30 +329,6 @@ public class SnodeConfig {
this.snodePushMessageThreadPoolQueueCapacity = snodePushMessageThreadPoolQueueCapacity;
}
- public int getSnodePushMqttMessageMinPoolSize() {
- return snodePushMqttMessageMinPoolSize;
- }
-
- public void setSnodePushMqttMessageMinPoolSize(int snodePushMqttMessageMinPoolSize) {
- this.snodePushMqttMessageMinPoolSize = snodePushMqttMessageMinPoolSize;
- }
-
- public int getSnodePushMqttMessageMaxPoolSize() {
- return snodePushMqttMessageMaxPoolSize;
- }
-
- public void setSnodePushMqttMessageMaxPoolSize(int snodePushMqttMessageMaxPoolSize) {
- this.snodePushMqttMessageMaxPoolSize = snodePushMqttMessageMaxPoolSize;
- }
-
- public int getSnodePushMqttMessageThreadPoolQueueCapacity() {
- return snodePushMqttMessageThreadPoolQueueCapacity;
- }
-
- public void setSnodePushMqttMessageThreadPoolQueueCapacity(int snodePushMqttMessageThreadPoolQueueCapacity) {
- this.snodePushMqttMessageThreadPoolQueueCapacity = snodePushMqttMessageThreadPoolQueueCapacity;
- }
-
public String getSendMessageInterceptorPath() {
return sendMessageInterceptorPath;
}
diff --git a/pom.xml b/pom.xml
index 8b0ce4223e9a455928f1714f756c40750b74476e..3899247f27951b60e69f9a2c7247a7f6579479b5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -653,6 +653,11 @@
org.eclipse.paho.client.mqttv3
1.2.0
+
+ com.google.code.gson
+ gson
+ LATEST
+
diff --git a/remoting/pom.xml b/remoting/pom.xml
index 9abb2b21d78f2ecebe8c601e1648a2c70c0ed974..26885a565f2ccf194fbb490356f89af1181835e9 100644
--- a/remoting/pom.xml
+++ b/remoting/pom.xml
@@ -15,7 +15,8 @@
limitations under the License.
-->
-
+
org.apache.rocketmq
rocketmq-all
@@ -38,8 +39,8 @@
fastjson
- org.msgpack
- msgpack
+ org.msgpack
+ msgpack
io.netty
@@ -62,5 +63,9 @@
com.google.guava
guava
+
+ com.google.code.gson
+ gson
+
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/ServerConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/ServerConfig.java
index 97c3d3eb39b8be92022bcc2a0bab6b6ff5b222b3..dda2528fabf56de653208588d6c36bdc5be66dd9 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/ServerConfig.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/ServerConfig.java
@@ -20,7 +20,6 @@ import org.apache.rocketmq.remoting.netty.NettySystemConfig;
public class ServerConfig implements Cloneable {
private int listenPort = 8888;
- private int mqttListenPort = 1883;
private int serverWorkerThreads = 8;
private int serverCallbackExecutorThreads = 8;
private int serverSelectorThreads = 3;
@@ -76,14 +75,6 @@ public class ServerConfig implements Cloneable {
this.listenPort = listenPort;
}
- public int getMqttListenPort() {
- return mqttListenPort;
- }
-
- public void setMqttListenPort(int mqttListenPort) {
- this.mqttListenPort = mqttListenPort;
- }
-
public int getServerWorkerThreads() {
return serverWorkerThreads;
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
index 7c008fafba1d689c4010344efa9576da4a779b65..e823a69c767d1ec8646cc769b65e97e1f5f13dc9 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
@@ -80,8 +80,6 @@ public class RemotingCommand {
private transient byte[] body;
- private Object payload;
-
public RemotingCommand() {
}
@@ -261,14 +259,6 @@ public class RemotingCommand {
this.body = body;
}
- public Object getPayload() {
- return payload;
- }
-
- public void setPayload(Object payload) {
- this.payload = payload;
- }
-
public HashMap getExtFields() {
return extFields;
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttRemotingClient.java
index 05dd1b258cd3f1212921a3ed5d3da201f36c857e..725cfe74fbce4c19ec38b07611fef63b8ac252dd 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttRemotingClient.java
@@ -60,7 +60,7 @@ import org.apache.rocketmq.remoting.util.ThreadUtils;
public class MqttRemotingClient extends NettyRemotingClientAbstract implements RemotingClient {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
- private ClientConfig nettyClientConfig;
+ private ClientConfig mqttClientConfig;
private Bootstrap bootstrap = new Bootstrap();
private EventLoopGroup eventLoopGroupWorker;
@@ -76,33 +76,38 @@ public class MqttRemotingClient extends NettyRemotingClientAbstract implements R
public MqttRemotingClient() {
super();
+ loadProperties();
}
- public MqttRemotingClient(final ClientConfig nettyClientConfig) {
- this(nettyClientConfig, null);
+ private void loadProperties() {
+
+ }
+
+ public MqttRemotingClient(final ClientConfig mqttClientConfig) {
+ this(mqttClientConfig, null);
}
- public MqttRemotingClient(final ClientConfig nettyClientConfig,
+ public MqttRemotingClient(final ClientConfig mqttClientConfig,
final ChannelEventListener channelEventListener) {
- super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
- init(nettyClientConfig, channelEventListener);
+ super(mqttClientConfig.getClientOnewaySemaphoreValue(), mqttClientConfig.getClientAsyncSemaphoreValue());
+ init(mqttClientConfig, channelEventListener);
}
@Override
- public RemotingClient init(ClientConfig nettyClientConfig, ChannelEventListener channelEventListener) {
- this.nettyClientConfig = nettyClientConfig;
+ public RemotingClient init(ClientConfig mqttClientConfig, ChannelEventListener channelEventListener) {
+ this.mqttClientConfig = mqttClientConfig;
this.channelEventListener = channelEventListener;
- this.eventLoopGroupWorker = new NioEventLoopGroup(nettyClientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads",
- nettyClientConfig.getClientWorkerThreads()));
+ this.eventLoopGroupWorker = new NioEventLoopGroup(mqttClientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads",
+ mqttClientConfig.getClientWorkerThreads()));
this.publicExecutor = ThreadUtils.newFixedThreadPool(
- nettyClientConfig.getClientCallbackExecutorThreads(),
- 10000, "Remoting-PublicExecutor", true);
- this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),
- ThreadUtils.newGenericThreadFactory("NettyClientWorkerThreads", nettyClientConfig.getClientWorkerThreads()));
- if (nettyClientConfig.isUseTLS()) {
+ mqttClientConfig.getClientCallbackExecutorThreads(),
+ 10000, "MqttRemoting-PublicExecutor", true);
+ this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(mqttClientConfig.getClientWorkerThreads(),
+ ThreadUtils.newGenericThreadFactory("MqttNettyClientWorkerThreads", mqttClientConfig.getClientWorkerThreads()));
+ if (mqttClientConfig.isUseTLS()) {
try {
sslContext = TlsHelper.buildSslContext(true);
- log.info("SSL enabled for client");
+ log.info("SSL enabled for mqtt client");
} catch (IOException e) {
log.error("Failed to create SSLContext", e);
} catch (CertificateException e) {
@@ -123,14 +128,14 @@ public class MqttRemotingClient extends NettyRemotingClientAbstract implements R
bootstrap = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
- .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
- .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
- .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, mqttClientConfig.getConnectTimeoutMillis())
+ .option(ChannelOption.SO_SNDBUF, mqttClientConfig.getClientSocketSndBufSize())
+ .option(ChannelOption.SO_RCVBUF, mqttClientConfig.getClientSocketRcvBufSize())
.handler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
- if (nettyClientConfig.isUseTLS()) {
+ if (mqttClientConfig.isUseTLS()) {
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
log.info("Prepend SSL handler");
@@ -142,7 +147,7 @@ public class MqttRemotingClient extends NettyRemotingClientAbstract implements R
defaultEventExecutorGroup,
MqttEncoder.INSTANCE,
new MqttDecoder(),
- new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
+ new IdleStateHandler(0, 0, mqttClientConfig.getClientChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyClientHandler());
}
@@ -199,7 +204,7 @@ public class MqttRemotingClient extends NettyRemotingClientAbstract implements R
throw (RemotingSendRequestException) remotingException;
}
if (remotingException instanceof RemotingTimeoutException) {
- if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
+ if (mqttClientConfig.isClientCloseSocketIfTimeout()) {
this.closeRemotingChannel(addr, remotingChannel);
log.warn("InvokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttRemotingServer.java
index 6d4fc20a875ae776d06a8eee0d1c542e1c7f3622..3a5c688ff37dffb08346feb5818c914f8b6e11c7 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttRemotingServer.java
@@ -71,7 +71,7 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R
private ServerBootstrap serverBootstrap;
private EventLoopGroup eventLoopGroupSelector;
private EventLoopGroup eventLoopGroupBoss;
- private ServerConfig nettyServerConfig;
+ private ServerConfig mqttServerConfig;
private ExecutorService publicExecutor;
private ChannelEventListener channelEventListener;
@@ -89,32 +89,32 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R
super();
}
- public MqttRemotingServer(final ServerConfig nettyServerConfig) {
- this(nettyServerConfig, null);
+ public MqttRemotingServer(final ServerConfig mqttServerConfig) {
+ this(mqttServerConfig, null);
}
- public MqttRemotingServer(final ServerConfig nettyServerConfig,
+ public MqttRemotingServer(final ServerConfig mqttServerConfig,
final ChannelEventListener channelEventListener) {
- init(nettyServerConfig, channelEventListener);
+ init(mqttServerConfig, channelEventListener);
}
@Override
public RemotingServer init(ServerConfig serverConfig,
ChannelEventListener channelEventListener) {
- this.nettyServerConfig = serverConfig;
- super.init(nettyServerConfig.getServerOnewaySemaphoreValue(),
- nettyServerConfig.getServerAsyncSemaphoreValue());
+ this.mqttServerConfig = serverConfig;
+ super.init(mqttServerConfig.getServerOnewaySemaphoreValue(),
+ mqttServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap();
this.channelEventListener = channelEventListener;
- int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
+ int publicThreadNums = mqttServerConfig.getServerCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
this.publicExecutor = ThreadUtils.newFixedThreadPool(
publicThreadNums,
10000, "MqttRemoting-PublicExecutor", true);
- if (JvmUtils.isUseEpoll() && this.nettyServerConfig.isUseEpollNativeSelector()) {
+ if (JvmUtils.isUseEpoll() && this.mqttServerConfig.isUseEpollNativeSelector()) {
this.eventLoopGroupSelector = new EpollEventLoopGroup(
serverConfig.getServerSelectorThreads(),
ThreadUtils.newGenericThreadFactory("MqttNettyEpollIoThreads",
@@ -134,7 +134,7 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R
serverConfig.getServerSelectorThreads()));
this.socketChannelClass = NioServerSocketChannel.class;
}
- this.port = nettyServerConfig.getMqttListenPort();
+ this.port = mqttServerConfig.getListenPort();
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
serverConfig.getServerWorkerThreads(),
ThreadUtils.newGenericThreadFactory("MqttNettyWorkerThreads",
@@ -169,9 +169,9 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF,
- nettyServerConfig.getServerSocketSndBufSize())
+ mqttServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF,
- nettyServerConfig.getServerSocketRcvBufSize())
+ mqttServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.port))
.childHandler(new ChannelInitializer() {
@Override
@@ -184,11 +184,11 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R
MqttEncoder.INSTANCE,
new MqttMessage2RemotingCommandHandler(),
new RemotingCommand2MqttMessageHandler(),
- new IdleStateHandler(nettyServerConfig
+ new IdleStateHandler(mqttServerConfig
.getConnectionChannelReaderIdleSeconds(),
- nettyServerConfig
+ mqttServerConfig
.getConnectionChannelWriterIdleSeconds(),
- nettyServerConfig
+ mqttServerConfig
.getServerChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyServerHandler()
@@ -197,7 +197,7 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R
}
});
- if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
+ if (mqttServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/EncodeDecodeDispatcher.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/EncodeDecodeDispatcher.java
index 6b3cb35cbadc7835ef6815325c9c555c2a5b49de..2c17f912c5a4b941f7355b94ecbe1cd5538c376a 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/EncodeDecodeDispatcher.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/EncodeDecodeDispatcher.java
@@ -45,5 +45,4 @@ public class EncodeDecodeDispatcher {
public static Map getEncodeDecodeDispatcher() {
return encodeDecodeDispatcher;
}
-
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttConnectEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttConnectEncodeDecode.java
index 61586742889c37285aa9b514165e5c5dbe2cc51a..b5a394f2f948ab207f202862a10b42aa9ca5eeae 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttConnectEncodeDecode.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttConnectEncodeDecode.java
@@ -23,6 +23,7 @@ import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
+import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil;
public class MqttConnectEncodeDecode implements Message2MessageEncodeDecode {
@@ -53,7 +54,7 @@ public class MqttConnectEncodeDecode implements Message2MessageEncodeDecode {
requestCommand = RemotingCommand
.createRequestCommand(1000, mqttHeader);
- requestCommand.setPayload(((MqttConnectMessage) mqttMessage).payload());
+ requestCommand.setBody(MqttEncodeDecodeUtil.encode(((MqttConnectMessage) mqttMessage).payload()));
return requestCommand;
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPublishEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPublishEncodeDecode.java
index 1b30acd10701db68a2755bdbfb06df5410129a0f..3476438c210041359c03cff1820a088974b9aa33 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPublishEncodeDecode.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPublishEncodeDecode.java
@@ -18,6 +18,7 @@
package org.apache.rocketmq.remoting.transport.mqtt.dispatcher;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
@@ -48,9 +49,12 @@ public class MqttPublishEncodeDecode implements Message2MessageEncodeDecode {
requestCommand = RemotingCommand
.createRequestCommand(1000, mqttHeader);
- //invoke copy to generate a new ByteBuf or increase refCnt by 1 by invoking retain() method, because release method is invoked in Message2MessageEncodeDecode.channelRead
- requestCommand.setPayload(((MqttPublishMessage) mqttMessage).payload().copy());
+ ByteBuf payload = ((MqttPublishMessage) mqttMessage).payload();
+ byte[] body = new byte[payload.readableBytes()];
+ payload.readBytes(body);
+ requestCommand.setBody(body);
return requestCommand;
+
}
@Override
@@ -60,6 +64,6 @@ public class MqttPublishEncodeDecode implements Message2MessageEncodeDecode {
new MqttFixedHeader(MqttMessageType.PUBLISH, mqttHeader.isDup(),
MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(),
mqttHeader.getRemainingLength()),
- new MqttPublishVariableHeader(mqttHeader.getTopicName(), mqttHeader.getPacketId()), (ByteBuf) remotingCommand.getPayload());
+ new MqttPublishVariableHeader(mqttHeader.getTopicName(), mqttHeader.getPacketId()), Unpooled.copiedBuffer(remotingCommand.getBody()));
}
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubackEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubackEncodeDecode.java
index c87bc14979e9baa27f5e49627eeadbfd1d4daaac..a8353a88ec8990b407c4c794a720775e3fb96b14 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubackEncodeDecode.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubackEncodeDecode.java
@@ -28,6 +28,7 @@ import java.io.UnsupportedEncodingException;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
+import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil;
public class MqttSubackEncodeDecode implements Message2MessageEncodeDecode {
@@ -43,6 +44,6 @@ public class MqttSubackEncodeDecode implements Message2MessageEncodeDecode {
new MqttFixedHeader(MqttMessageType.SUBACK, mqttHeader.isDup(),
MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(),
mqttHeader.getRemainingLength()),
- MqttMessageIdVariableHeader.from(mqttHeader.getMessageId()), (MqttSubAckPayload) remotingCommand.getPayload());
+ MqttMessageIdVariableHeader.from(mqttHeader.getMessageId()), (MqttSubAckPayload) MqttEncodeDecodeUtil.decode(remotingCommand.getBody(),MqttSubAckPayload.class));
}
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubscribeEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubscribeEncodeDecode.java
index dec009cf49d8573c07fc3b0441e4174c8a770d25..09490370cb8ca2e565718ede651b1879af6d50c0 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubscribeEncodeDecode.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubscribeEncodeDecode.java
@@ -23,6 +23,7 @@ import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
+import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil;
public class MqttSubscribeEncodeDecode implements Message2MessageEncodeDecode {
@@ -44,7 +45,7 @@ public class MqttSubscribeEncodeDecode implements Message2MessageEncodeDecode {
requestCommand = RemotingCommand
.createRequestCommand(1000, mqttHeader);
- requestCommand.setPayload(((MqttSubscribeMessage) mqttMessage).payload());
+ requestCommand.setBody(MqttEncodeDecodeUtil.encode(((MqttSubscribeMessage) mqttMessage).payload()));
return requestCommand;
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubscribeEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubscribeEncodeDecode.java
index 957754bbb9262e006516f839f1053672c0ea4d78..49967f81c608d612400019e7dd1c7b0e74c0d4ae 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubscribeEncodeDecode.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubscribeEncodeDecode.java
@@ -23,6 +23,7 @@ import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
+import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil;
public class MqttUnSubscribeEncodeDecode implements Message2MessageEncodeDecode {
@@ -42,7 +43,7 @@ public class MqttUnSubscribeEncodeDecode implements Message2MessageEncodeDecode
mqttHeader.setMessageId(variableHeader.messageId());
requestCommand = RemotingCommand.createRequestCommand(1000, mqttHeader);
- requestCommand.setPayload(((MqttUnsubscribeMessage) mqttMessage).payload());
+ requestCommand.setBody(MqttEncodeDecodeUtil.encode(((MqttUnsubscribeMessage) mqttMessage).payload()));
return requestCommand;
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java b/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
new file mode 100644
index 0000000000000000000000000000000000000000..0eb1730a86211e1cbd6dbd814d7f575db9c17923
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.util;
+
+import com.google.gson.Gson;
+import java.nio.charset.Charset;
+
+public class MqttEncodeDecodeUtil {
+ private static final Gson GSON = new Gson();
+
+ public static byte[] encode(Object object) {
+ final String json = GSON.toJson(object);
+ if (json != null) {
+ return json.getBytes(Charset.forName("UTF-8"));
+ }
+ return null;
+ }
+
+ public static Object decode(byte[] body, Class classOfT) {
+ final String json = new String(body, Charset.forName("UTF-8"));
+ return GSON.fromJson(json, classOfT);
+ }
+}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
index 8716cdd9b0ce451945b68f7db68dfb74b80ccf35..6f54ae0225c9845a17a0e2bfee45397ca9788e46 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.broker.BrokerStartup;
+import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -85,8 +86,11 @@ public class SnodeController {
.getLogger(LoggerName.SNODE_LOGGER_NAME);
private final SnodeConfig snodeConfig;
+ private final MqttConfig mqttConfig;
private final ServerConfig nettyServerConfig;
private final ClientConfig nettyClientConfig;
+ private final ServerConfig mqttServerConfig;
+ private final ClientConfig mqttClientConfig;
private RemotingClient remotingClient;
private RemotingServer snodeServer;
private RemotingClient mqttRemotingClient;
@@ -125,12 +129,13 @@ public class SnodeController {
.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"SnodeControllerScheduledThread"));
- public SnodeController(ServerConfig nettyServerConfig,
- ClientConfig nettyClientConfig,
- SnodeConfig snodeConfig) {
- this.nettyClientConfig = nettyClientConfig;
- this.nettyServerConfig = nettyServerConfig;
+ public SnodeController(SnodeConfig snodeConfig, MqttConfig mqttConfig) {
+ this.nettyClientConfig = snodeConfig.getNettyClientConfig();
+ this.nettyServerConfig = snodeConfig.getNettyServerConfig();
+ this.mqttServerConfig = mqttConfig.getMqttServerConfig();
+ this.mqttClientConfig = mqttConfig.getMqttClientConfig();
this.snodeConfig = snodeConfig;
+ this.mqttConfig = mqttConfig;
if (!this.snodeConfig.isEmbeddedModeEnable()) {
this.enodeService = new RemoteEnodeServiceImpl(this);
} else {
@@ -144,8 +149,9 @@ public class SnodeController {
}
this.mqttRemotingClient = RemotingClientFactory.getInstance()
.createRemotingClient(RemotingUtil.MQTT_PROTOCOL);
+
if (this.mqttRemotingClient != null) {
- this.mqttRemotingClient.init(this.getNettyClientConfig(), null);
+ this.mqttRemotingClient.init(this.mqttClientConfig, null);
}
this.sendMessageExecutor = ThreadUtils.newThreadPoolExecutor(
@@ -185,12 +191,12 @@ public class SnodeController {
false);
this.handleMqttMessageExecutor = ThreadUtils.newThreadPoolExecutor(
- snodeConfig.getSnodeHandleMqttMessageMinPoolSize(),
- snodeConfig.getSnodeHandleMqttMessageMaxPoolSize(),
+ mqttConfig.getHandleMqttMessageMinPoolSize(),
+ mqttConfig.getHandleMqttMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue<>(snodeConfig.getSnodeHandleMqttThreadPoolQueueCapacity()),
- "SnodeHandleMqttMessageThread",
+ new ArrayBlockingQueue<>(mqttConfig.getHandleMqttThreadPoolQueueCapacity()),
+ "handleMqttMessageThread",
false);
if (this.snodeConfig.getNamesrvAddr() != null) {
@@ -224,6 +230,10 @@ public class SnodeController {
return snodeConfig;
}
+ public MqttConfig getMqttConfig() {
+ return mqttConfig;
+ }
+
private void initRemotingServerInterceptorGroup() {
List remotingServerInterceptors = InterceptorFactory.getInstance()
.loadInterceptors(this.snodeConfig.getRemotingServerInterceptorPath());
@@ -251,7 +261,7 @@ public class SnodeController {
this.mqttRemotingServer = RemotingServerFactory.getInstance().createRemotingServer(
RemotingUtil.MQTT_PROTOCOL);
if (this.mqttRemotingServer != null) {
- this.mqttRemotingServer.init(this.nettyServerConfig, this.clientHousekeepingService);
+ this.mqttRemotingServer.init(this.mqttServerConfig, this.clientHousekeepingService);
this.mqttRemotingServer.registerInterceptorGroup(this.remotingServerInterceptorGroup);
}
registerProcessor();
@@ -350,7 +360,6 @@ public class SnodeController {
}
public void start() {
- initialize();
if (snodeServer != null) {
this.snodeServer.start();
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java
index a8d5b7ddab9d5bbc2005f3ecb45ba18091defa33..fffb66827a0128ea16272145f7bd47ed26b20c1e 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java
@@ -20,7 +20,9 @@ import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
import java.io.BufferedInputStream;
+import java.io.File;
import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
@@ -32,6 +34,7 @@ import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerStartup;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
@@ -48,19 +51,22 @@ import org.slf4j.LoggerFactory;
import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_ENABLE;
public class SnodeStartup {
- private static InternalLogger log;
+ private static InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
public static Properties properties = null;
public static CommandLine commandLine = null;
public static String configFile = null;
+ private static final String DEFAULT_MQTT_CONFIG_FILE = "/conf/mqtt.properties";
+ private static String mqttConfigFileName = System.getProperty("rocketmq.mqtt.config", DEFAULT_MQTT_CONFIG_FILE);
public static void main(String[] args) throws IOException, JoranException {
SnodeConfig snodeConfig = loadConfig(args);
+ MqttConfig mqttConfig = loadMqttConfig(snodeConfig);
if (snodeConfig.isEmbeddedModeEnable()) {
BrokerController brokerController = BrokerStartup.createBrokerController(args);
BrokerStartup.start(brokerController);
snodeConfig.setSnodeName(brokerController.getBrokerConfig().getBrokerName());
}
- SnodeController snodeController = createSnodeController(snodeConfig);
+ SnodeController snodeController = createSnodeController(snodeConfig, mqttConfig);
startup(snodeController);
}
@@ -94,7 +100,6 @@ public class SnodeStartup {
SnodeConfig snodeConfig = new SnodeConfig();
final ServerConfig nettyServerConfig = new ServerConfig();
final ClientConfig nettyClientConfig = new ClientConfig();
-
nettyServerConfig.setListenPort(snodeConfig.getListenPort());
nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
@@ -142,12 +147,25 @@ public class SnodeStartup {
return snodeConfig;
}
- public static SnodeController createSnodeController(SnodeConfig snodeConfig) throws JoranException {
+ public static MqttConfig loadMqttConfig(SnodeConfig snodeConfig) throws IOException {
+ MqttConfig mqttConfig = new MqttConfig();
+ final ServerConfig mqttServerConfig = new ServerConfig();
+ final ClientConfig mqttClientConfig = new ClientConfig();
+ mqttServerConfig.setListenPort(mqttConfig.getListenPort());
+ String file = snodeConfig.getRocketmqHome() + File.separator + mqttConfigFileName;
+ loadMqttProperties(file, mqttServerConfig, mqttClientConfig);
+ mqttConfig.setMqttServerConfig(mqttServerConfig);
+ mqttConfig.setMqttClientConfig(mqttClientConfig);
+
+ MixAll.printObjectProperties(log, mqttConfig);
+ MixAll.printObjectProperties(log, mqttConfig.getMqttServerConfig());
+ MixAll.printObjectProperties(log, mqttConfig.getMqttClientConfig());
+ return mqttConfig;
+ }
+
+ public static SnodeController createSnodeController(SnodeConfig snodeConfig, MqttConfig mqttConfig) throws JoranException {
- final SnodeController snodeController = new SnodeController(
- snodeConfig.getNettyServerConfig(),
- snodeConfig.getNettyClientConfig(),
- snodeConfig);
+ final SnodeController snodeController = new SnodeController(snodeConfig, mqttConfig);
boolean initResult = snodeController.initialize();
if (!initResult) {
@@ -199,5 +217,21 @@ public class SnodeStartup {
return options;
}
+
+ private static void loadMqttProperties(String file, ServerConfig mqttServerConfig,
+ ClientConfig mqttClientConfig) throws IOException {
+ InputStream in;
+ try {
+ in = new BufferedInputStream(new FileInputStream(file));
+ Properties properties = new Properties();
+ properties.load(in);
+ MixAll.properties2Object(properties, mqttServerConfig);
+ MixAll.properties2Object(properties, mqttClientConfig);
+ in.close();
+ } catch (FileNotFoundException e) {
+ log.info("The mqtt config file is not found. filePath={}", file);
+ }
+
+ }
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java
index 8ae898443211684b8ad7f3c69f0dd17401eb5f4b..9eb62c240cd8fd8337719d8c46e4f81c8c162414 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java
@@ -17,7 +17,7 @@
package org.apache.rocketmq.snode.processor;
-import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
@@ -41,6 +41,7 @@ import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
+import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.processor.mqtthandler.MessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler;
@@ -98,16 +99,18 @@ public class DefaultMqttMessageProcessor implements RequestProcessor {
mqttHeader.isHasPassword(), mqttHeader.isWillRetain(),
mqttHeader.getWillQos(), mqttHeader.isWillFlag(),
mqttHeader.isCleanSession(), mqttHeader.getKeepAliveTimeSeconds());
- MqttConnectPayload mqttConnectPayload = (MqttConnectPayload) message.getPayload();
+// MqttConnectPayload mqttConnectPayload = (MqttConnectPayload) message.getPayload();
+ MqttConnectPayload mqttConnectPayload = (MqttConnectPayload) MqttEncodeDecodeUtil.decode(message.getBody(), MqttConnectPayload.class);
mqttMessage = new MqttConnectMessage(fixedHeader, mqttConnectVariableHeader, mqttConnectPayload);
break;
case PUBLISH:
MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(mqttHeader.getTopicName(), mqttHeader.getPacketId());
- mqttMessage = new MqttPublishMessage(fixedHeader, mqttPublishVariableHeader, (ByteBuf) message.getPayload());
+ mqttMessage = new MqttPublishMessage(fixedHeader, mqttPublishVariableHeader, Unpooled.copiedBuffer(message.getBody()));
break;
case SUBSCRIBE:
MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(mqttHeader.getMessageId());
- mqttMessage = new MqttSubscribeMessage(fixedHeader, mqttMessageIdVariableHeader, (MqttSubscribePayload) message.getPayload());
+ MqttSubscribePayload mqttSubscribePayload = (MqttSubscribePayload) MqttEncodeDecodeUtil.decode(message.getBody(), MqttSubscribePayload.class);
+ mqttMessage = new MqttSubscribeMessage(fixedHeader, mqttMessageIdVariableHeader, mqttSubscribePayload);
break;
case UNSUBSCRIBE:
case PINGREQ:
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttSubscribeMessageHandler.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttSubscribeMessageHandler.java
index 5999996701cb32ed83ffdad164c5aed4ba7a66fc..34f91d367cd89ba2f3e5d816067759f1b46534ad 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttSubscribeMessageHandler.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttSubscribeMessageHandler.java
@@ -38,6 +38,7 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
+import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.Client;
import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl;
@@ -104,7 +105,7 @@ public class MqttSubscribeMessageHandler implements MessageHandler {
List grantQoss = doSubscribe(client, payload.topicSubscriptions(), iotClientManager);
//Publish retained messages to subscribers.
MqttSubAckPayload mqttSubAckPayload = new MqttSubAckPayload(grantQoss);
- command.setPayload(mqttSubAckPayload);
+ command.setBody(MqttEncodeDecodeUtil.encode(mqttSubAckPayload));
mqttHeader.setRemainingLength(0x02 + mqttSubAckPayload.grantedQoSLevels().size());
command.setRemark(null);
command.setCode(ResponseCode.SUCCESS);
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/MqttPushServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/MqttPushServiceImpl.java
index ea8b9734e4a3b82a1ae36616a775f5c47504de50..e853025996dd2afb4eec14cff42d906599fd93c1 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/MqttPushServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/MqttPushServiceImpl.java
@@ -53,12 +53,12 @@ public class MqttPushServiceImpl {
public MqttPushServiceImpl(final SnodeController snodeController) {
this.snodeController = snodeController;
pushMqttMessageExecutorService = ThreadUtils.newThreadPoolExecutor(
- this.snodeController.getSnodeConfig().getSnodePushMqttMessageMinPoolSize(),
- this.snodeController.getSnodeConfig().getSnodePushMqttMessageMaxPoolSize(),
+ this.snodeController.getMqttConfig().getPushMqttMessageMinPoolSize(),
+ this.snodeController.getMqttConfig().getPushMqttMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue<>(this.snodeController.getSnodeConfig().getSnodePushMqttMessageThreadPoolQueueCapacity()),
- "SnodePushMqttMessageThread",
+ new ArrayBlockingQueue<>(this.snodeController.getMqttConfig().getPushMqttMessageThreadPoolQueueCapacity()),
+ "pushMqttMessageThread",
false);
}
@@ -106,12 +106,14 @@ public class MqttPushServiceImpl {
if (client.getRemotingChannel() instanceof NettyChannelHandlerContextImpl) {
remotingChannel = new NettyChannelImpl(((NettyChannelHandlerContextImpl) client.getRemotingChannel()).getChannelHandlerContext().channel());
}
- requestCommand.setPayload(message.copy());
+ byte[] body = new byte[message.readableBytes()];
+ message.readBytes(body);
+ requestCommand.setBody(body);
snodeController.getMqttRemotingServer().push(remotingChannel, requestCommand, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
}
} catch (Exception ex) {
log.warn("Exception was thrown when pushing MQTT message to topic: {}, exception={}", topic, ex.getMessage());
- }finally {
+ } finally {
System.out.println("Release Bytebuf");
ReferenceCountUtil.release(message);
}
@@ -136,7 +138,6 @@ public class MqttPushServiceImpl {
mqttHeader.setRemainingLength(4 + topic.getBytes().length + message.readableBytes());
RemotingCommand pushMessage = RemotingCommand.createRequestCommand(RequestCode.MQTT_MESSAGE, mqttHeader);
-// pushMessage.setPayload(message);
return pushMessage;
}
diff --git a/snode/src/test/java/org/apache/rocketmq/snode/SnodeControllerTest.java b/snode/src/test/java/org/apache/rocketmq/snode/SnodeControllerTest.java
index 414f06bb9e8220569ddb5d23f8b9262d82783b9e..a8c35a7231363efe2a29275b597743f13f3d7ef5 100644
--- a/snode/src/test/java/org/apache/rocketmq/snode/SnodeControllerTest.java
+++ b/snode/src/test/java/org/apache/rocketmq/snode/SnodeControllerTest.java
@@ -16,8 +16,8 @@
*/
package org.apache.rocketmq.snode;
+import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
-import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig;
import org.junit.Test;
@@ -29,10 +29,7 @@ public class SnodeControllerTest {
public void testSnodeRestart() {
ServerConfig serverConfig = new ServerConfig();
serverConfig.setListenPort(10912);
- SnodeController snodeController = new SnodeController(
- serverConfig,
- new ClientConfig(),
- new SnodeConfig());
+ SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig());
assertThat(snodeController.initialize());
snodeController.start();
snodeController.shutdown();
@@ -44,10 +41,7 @@ public class SnodeControllerTest {
System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl.yml");
SnodeConfig snodeConfig = new SnodeConfig();
snodeConfig.setAclEnable(true);
- SnodeController snodeController = new SnodeController(
- new ServerConfig(),
- new ClientConfig(),
- snodeConfig);
+ SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig());
assertThat(snodeController.initialize());
snodeController.start();
snodeController.shutdown();
diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java b/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java
index 6f75a3266aecd257d68c4e5cd3489cec6ff43988..076a005257906b413eef205dd86ae72e6690dc3b 100644
--- a/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java
+++ b/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java
@@ -20,14 +20,14 @@ import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.io.UnsupportedEncodingException;
+import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.protocol.RequestCode;
-import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.RemotingChannel;
-import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
+import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil;
import org.apache.rocketmq.snode.SnodeController;
import org.junit.Before;
import org.junit.Test;
@@ -41,7 +41,7 @@ public class DefaultMqttMessageProcessorTest {
private DefaultMqttMessageProcessor defaultMqttMessageProcessor;
@Spy
- private SnodeController snodeController = new SnodeController(new ServerConfig(), new ClientConfig(), new SnodeConfig());
+ private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig());
@Mock
private RemotingChannel remotingChannel;
@@ -88,7 +88,7 @@ public class DefaultMqttMessageProcessorTest {
MqttHeader mqttHeader = createMqttConnectMesssageHeader();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.MQTT_MESSAGE, mqttHeader);
MqttConnectPayload payload = new MqttConnectPayload("1234567", "testTopic", "willMessage".getBytes(), null, "1234567".getBytes());
- request.setPayload(payload);
+ request.setBody(MqttEncodeDecodeUtil.encode(payload));
return request;
}
}
diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java b/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java
index 8846ef1f1d8cf4ccc9d5675e8fff8b87107fa6ed..b0301e7e9b59d4085fe7b5707ffbac6117a9d98a 100644
--- a/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java
+++ b/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java
@@ -22,10 +22,9 @@ import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
+import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
-import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.RemotingChannel;
-import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler;
import org.junit.Test;
@@ -42,12 +41,10 @@ public class MqttConnectMessageHandlerTest {
@Test
public void testHandlerMessage() throws Exception {
- MqttConnectMessageHandler mqttConnectMessageHandler = new MqttConnectMessageHandler(
- new SnodeController(new ServerConfig(), new ClientConfig(), new SnodeConfig()));
-
+ MqttConnectMessageHandler mqttConnectMessageHandler = new MqttConnectMessageHandler(new SnodeController(new SnodeConfig(), new MqttConfig()));
MqttConnectPayload payload = new MqttConnectPayload("1234567", "testTopic", "willMessage".getBytes(), null, "1234567".getBytes());
MqttConnectMessage mqttConnectMessage = new MqttConnectMessage(new MqttFixedHeader(
- MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 200),new MqttConnectVariableHeader(null,4,false,false,false,0,false,false,50),new MqttConnectPayload("abcd", "ttest", "message".getBytes(),"user","password".getBytes()));
+ MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 200), new MqttConnectVariableHeader(null, 4, false, false, false, 0, false, false, 50), new MqttConnectPayload("abcd", "ttest", "message".getBytes(), "user", "password".getBytes()));
mqttConnectMessageHandler.handleMessage(mqttConnectMessage, remotingChannel);
}
diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttDisconnectMessageHandlerTest.java b/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttDisconnectMessageHandlerTest.java
index 4bb73dfc8f8809ebe9332cacfe9a7d024975cef8..0f474b1307fd5231ca860b5f5801e400f447c069 100644
--- a/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttDisconnectMessageHandlerTest.java
+++ b/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttDisconnectMessageHandlerTest.java
@@ -20,11 +20,10 @@ import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
+import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.message.mqtt.WillMessage;
-import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.RemotingChannel;
-import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.Client;
import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl;
@@ -43,8 +42,7 @@ public class MqttDisconnectMessageHandlerTest {
@Test
public void testHandlerMessage() throws Exception {
- SnodeController snodeController = new SnodeController(new ServerConfig(),
- new ClientConfig(), new SnodeConfig());
+ SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig());
MqttDisconnectMessageHandler mqttDisconnectMessageHandler = new MqttDisconnectMessageHandler(
snodeController);
Client client = new Client();
diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java b/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java
index a97f169d04af8a939a149250b48d0b7ade3f53d8..db70f982fcf6c5ffd6e9d4a9eb08e201c0a18632 100644
--- a/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java
+++ b/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java
@@ -18,13 +18,12 @@ package org.apache.rocketmq.snode.processor;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
-import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.RemotingChannel;
-import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -48,7 +47,7 @@ public class SendMessageProcessorTest {
private SendMessageProcessor sendMessageProcessor;
@Spy
- private SnodeController snodeController = new SnodeController(new ServerConfig(), new ClientConfig(), new SnodeConfig());
+ private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig());
@Mock
private RemotingChannel remotingChannel;
@@ -56,7 +55,7 @@ public class SendMessageProcessorTest {
private String topic = "snodeTopic";
private String group = "snodeGroup";
-
+
@Mock
private EnodeService enodeService;
diff --git a/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java b/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java
index ff736c294a6378d9f8f8bd719ccf80bbd1b12e8e..b39a0ae831c44836b0e9f1c1c4d054e2c1185b49 100644
--- a/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java
+++ b/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java
@@ -18,9 +18,8 @@ package org.apache.rocketmq.snode.service;
import java.util.ArrayList;
import java.util.List;
+import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
-import org.apache.rocketmq.remoting.ClientConfig;
-import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
@@ -46,7 +45,7 @@ import static org.mockito.Mockito.when;
public class NnodeServiceImplTest extends SnodeTestBase {
@Spy
- private SnodeController snodeController = new SnodeController(new ServerConfig(), new ClientConfig(), new SnodeConfig());
+ private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig());
@Mock
private NettyRemotingClient remotingClient;
diff --git a/snode/src/test/java/org/apache/rocketmq/snode/service/RemoteEnodeServiceImplTest.java b/snode/src/test/java/org/apache/rocketmq/snode/service/RemoteEnodeServiceImplTest.java
index 582c16e17fa9110e47b313ed5b149fc88fad101a..4e529751d3294ec759922adf3d2d099c5104bd0e 100644
--- a/snode/src/test/java/org/apache/rocketmq/snode/service/RemoteEnodeServiceImplTest.java
+++ b/snode/src/test/java/org/apache/rocketmq/snode/service/RemoteEnodeServiceImplTest.java
@@ -18,13 +18,12 @@ package org.apache.rocketmq.snode.service;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
-import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.InvokeCallback;
-import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient;
@@ -56,7 +55,7 @@ public class RemoteEnodeServiceImplTest extends SnodeTestBase {
private EnodeService enodeService;
@Spy
- private SnodeController snodeController = new SnodeController(new ServerConfig(), new ClientConfig(), new SnodeConfig());
+ private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig());
@Mock
private NnodeService nnodeService;
diff --git a/snode/src/test/java/org/apache/rocketmq/snode/service/SlowConsumerServiceImplTest.java b/snode/src/test/java/org/apache/rocketmq/snode/service/SlowConsumerServiceImplTest.java
index 8b9457cae5971c6b43b489f3532712aff3a7171c..048330dc214cf5eac8e5b0151badd41a8d5fc683 100644
--- a/snode/src/test/java/org/apache/rocketmq/snode/service/SlowConsumerServiceImplTest.java
+++ b/snode/src/test/java/org/apache/rocketmq/snode/service/SlowConsumerServiceImplTest.java
@@ -16,9 +16,8 @@
*/
package org.apache.rocketmq.snode.service;
+import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
-import org.apache.rocketmq.remoting.ClientConfig;
-import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.SlowConsumerService;
import org.apache.rocketmq.snode.client.impl.SlowConsumerServiceImpl;
@@ -38,7 +37,7 @@ import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class SlowConsumerServiceImplTest {
@Spy
- private SnodeController snodeController = new SnodeController(new ServerConfig(), new ClientConfig(), new SnodeConfig());
+ private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig());
private final String enodeName = "testEndoe";
diff --git a/snode/src/test/java/org/apache/rocketmq/snode/service/WillMessageServiceImplTest.java b/snode/src/test/java/org/apache/rocketmq/snode/service/WillMessageServiceImplTest.java
index f9ca3536a3ec9fa189a0491b7779c5a5aa9d7207..57f7c7a52346c10fa1aa36d21e613238a0039870 100644
--- a/snode/src/test/java/org/apache/rocketmq/snode/service/WillMessageServiceImplTest.java
+++ b/snode/src/test/java/org/apache/rocketmq/snode/service/WillMessageServiceImplTest.java
@@ -16,10 +16,9 @@
*/
package org.apache.rocketmq.snode.service;
+import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.message.mqtt.WillMessage;
-import org.apache.rocketmq.remoting.ClientConfig;
-import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.SnodeTestBase;
import org.apache.rocketmq.snode.service.impl.WillMessageServiceImpl;
@@ -33,8 +32,7 @@ import org.mockito.junit.MockitoJUnitRunner;
public class WillMessageServiceImplTest extends SnodeTestBase {
@Spy
- private SnodeController snodeController = new SnodeController(new ServerConfig(),
- new ClientConfig(), new SnodeConfig());
+ private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig());
private WillMessageService willMessageService;