From c36f09aa56031881d9bca139f8d9bc8266d9b057 Mon Sep 17 00:00:00 2001 From: chengxiangwang Date: Tue, 26 Feb 2019 23:08:21 +0800 Subject: [PATCH] 1.add mqttServerConfig/mqttClientConfig 2.delete payload from RemotingCommand --- .../apache/rocketmq/common/MqttConfig.java | 135 +++++++----------- .../apache/rocketmq/common/SnodeConfig.java | 60 -------- pom.xml | 5 + remoting/pom.xml | 11 +- .../rocketmq/remoting/ServerConfig.java | 9 -- .../remoting/protocol/RemotingCommand.java | 10 -- .../transport/mqtt/MqttRemotingClient.java | 49 ++++--- .../transport/mqtt/MqttRemotingServer.java | 34 ++--- .../dispatcher/EncodeDecodeDispatcher.java | 1 - .../dispatcher/MqttConnectEncodeDecode.java | 3 +- .../dispatcher/MqttPublishEncodeDecode.java | 10 +- .../dispatcher/MqttSubackEncodeDecode.java | 3 +- .../dispatcher/MqttSubscribeEncodeDecode.java | 3 +- .../MqttUnSubscribeEncodeDecode.java | 3 +- .../remoting/util/MqttEncodeDecodeUtil.java | 38 +++++ .../rocketmq/snode/SnodeController.java | 33 +++-- .../apache/rocketmq/snode/SnodeStartup.java | 50 +++++-- .../DefaultMqttMessageProcessor.java | 11 +- .../MqttSubscribeMessageHandler.java | 3 +- .../service/impl/MqttPushServiceImpl.java | 15 +- .../rocketmq/snode/SnodeControllerTest.java | 12 +- .../DefaultMqttMessageProcessorTest.java | 8 +- .../MqttConnectMessageHandlerTest.java | 9 +- .../MqttDisconnectMessageHandlerTest.java | 6 +- .../processor/SendMessageProcessorTest.java | 7 +- .../snode/service/NnodeServiceImplTest.java | 5 +- .../service/RemoteEnodeServiceImplTest.java | 5 +- .../service/SlowConsumerServiceImplTest.java | 5 +- .../service/WillMessageServiceImplTest.java | 6 +- 29 files changed, 263 insertions(+), 286 deletions(-) create mode 100644 remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java diff --git a/common/src/main/java/org/apache/rocketmq/common/MqttConfig.java b/common/src/main/java/org/apache/rocketmq/common/MqttConfig.java index 270a74c2..fc9128d1 100644 --- a/common/src/main/java/org/apache/rocketmq/common/MqttConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/MqttConfig.java @@ -16,32 +16,32 @@ */ package org.apache.rocketmq.common; -import java.net.InetAddress; -import java.net.UnknownHostException; import org.apache.rocketmq.common.annotation.ImportantField; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.remoting.ClientConfig; +import org.apache.rocketmq.remoting.ServerConfig; public class MqttConfig { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); - private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); + private ServerConfig mqttServerConfig; - private int snodeHandleMqttThreadPoolQueueCapacity = 10000; + private ClientConfig mqttClientConfig; - private int snodeHandleMqttMessageMinPoolSize = 10; + private int handleMqttThreadPoolQueueCapacity = 10000; - private int snodeHandleMqttMessageMaxPoolSize = 20; + private int handleMqttMessageMinPoolSize = 10; - private long houseKeepingInterval = 10 * 1000; + private int handleMqttMessageMaxPoolSize = 20; - private int snodePushMqttMessageMinPoolSize = 10; + private int pushMqttMessageMinPoolSize = 10; - private int snodePushMqttMessageMaxPoolSize = 20; + private int pushMqttMessageMaxPoolSize = 20; - private int snodePushMqttMessageThreadPoolQueueCapacity = 10000; + private int pushMqttMessageThreadPoolQueueCapacity = 10000; private int listenPort = 1883; /** @@ -50,119 +50,84 @@ public class MqttConfig { @ImportantField private boolean aclEnable = false; - public long getHouseKeepingInterval() { - return houseKeepingInterval; - } - - public void setHouseKeepingInterval(long houseKeepingInterval) { - this.houseKeepingInterval = houseKeepingInterval; - } - - /** - * This configurable item defines interval of topics registration of broker to name server. Allowing values are - * between 10, 000 and 60, 000 milliseconds. - */ - private int registerNameServerPeriod = 1000 * 30; - - public int getRegisterNameServerPeriod() { - return registerNameServerPeriod; - } - - public void setRegisterNameServerPeriod(int registerNameServerPeriod) { - this.registerNameServerPeriod = registerNameServerPeriod; - } - - @ImportantField - private boolean fetchNamesrvAddrByAddressServer = false; - - public static String localHostName() { - try { - return InetAddress.getLocalHost().getHostName(); - } catch (UnknownHostException e) { - log.error("Failed to obtain the host name", e); - } - - return "DEFAULT_SNODE"; + public int getListenPort() { + return listenPort; } - public boolean isFetchNamesrvAddrByAddressServer() { - return fetchNamesrvAddrByAddressServer; + public void setListenPort(int listenPort) { + this.listenPort = listenPort; } - public void setFetchNamesrvAddrByAddressServer(boolean fetchNamesrvAddrByAddressServer) { - this.fetchNamesrvAddrByAddressServer = fetchNamesrvAddrByAddressServer; - } - public int getListenPort() { - return listenPort; + public boolean isAclEnable() { + return aclEnable; } - public String getRocketmqHome() { - return rocketmqHome; + public void setAclEnable(boolean aclEnable) { + this.aclEnable = aclEnable; } - public void setRocketmqHome(String rocketmqHome) { - this.rocketmqHome = rocketmqHome; + public ServerConfig getMqttServerConfig() { + return mqttServerConfig; } - public void setListenPort(int listenPort) { - this.listenPort = listenPort; + public void setMqttServerConfig(ServerConfig mqttServerConfig) { + this.mqttServerConfig = mqttServerConfig; } - public int getSnodeHandleMqttThreadPoolQueueCapacity() { - return snodeHandleMqttThreadPoolQueueCapacity; + public ClientConfig getMqttClientConfig() { + return mqttClientConfig; } - public void setSnodeHandleMqttThreadPoolQueueCapacity(int snodeHandleMqttThreadPoolQueueCapacity) { - this.snodeHandleMqttThreadPoolQueueCapacity = snodeHandleMqttThreadPoolQueueCapacity; + public void setMqttClientConfig(ClientConfig mqttClientConfig) { + this.mqttClientConfig = mqttClientConfig; } - public int getSnodeHandleMqttMessageMinPoolSize() { - return snodeHandleMqttMessageMinPoolSize; + public int getHandleMqttThreadPoolQueueCapacity() { + return handleMqttThreadPoolQueueCapacity; } - public void setSnodeHandleMqttMessageMinPoolSize(int snodeHandleMqttMessageMinPoolSize) { - this.snodeHandleMqttMessageMinPoolSize = snodeHandleMqttMessageMinPoolSize; + public void setHandleMqttThreadPoolQueueCapacity(int handleMqttThreadPoolQueueCapacity) { + this.handleMqttThreadPoolQueueCapacity = handleMqttThreadPoolQueueCapacity; } - public int getSnodeHandleMqttMessageMaxPoolSize() { - return snodeHandleMqttMessageMaxPoolSize; + public int getHandleMqttMessageMinPoolSize() { + return handleMqttMessageMinPoolSize; } - public void setSnodeHandleMqttMessageMaxPoolSize(int snodeHandleMqttMessageMaxPoolSize) { - this.snodeHandleMqttMessageMaxPoolSize = snodeHandleMqttMessageMaxPoolSize; + public void setHandleMqttMessageMinPoolSize(int handleMqttMessageMinPoolSize) { + this.handleMqttMessageMinPoolSize = handleMqttMessageMinPoolSize; } - public int getSnodePushMqttMessageMinPoolSize() { - return snodePushMqttMessageMinPoolSize; + public int getHandleMqttMessageMaxPoolSize() { + return handleMqttMessageMaxPoolSize; } - public void setSnodePushMqttMessageMinPoolSize(int snodePushMqttMessageMinPoolSize) { - this.snodePushMqttMessageMinPoolSize = snodePushMqttMessageMinPoolSize; + public void setHandleMqttMessageMaxPoolSize(int handleMqttMessageMaxPoolSize) { + this.handleMqttMessageMaxPoolSize = handleMqttMessageMaxPoolSize; } - public int getSnodePushMqttMessageMaxPoolSize() { - return snodePushMqttMessageMaxPoolSize; + public int getPushMqttMessageMinPoolSize() { + return pushMqttMessageMinPoolSize; } - public void setSnodePushMqttMessageMaxPoolSize(int snodePushMqttMessageMaxPoolSize) { - this.snodePushMqttMessageMaxPoolSize = snodePushMqttMessageMaxPoolSize; + public void setPushMqttMessageMinPoolSize(int pushMqttMessageMinPoolSize) { + this.pushMqttMessageMinPoolSize = pushMqttMessageMinPoolSize; } - public int getSnodePushMqttMessageThreadPoolQueueCapacity() { - return snodePushMqttMessageThreadPoolQueueCapacity; + public int getPushMqttMessageMaxPoolSize() { + return pushMqttMessageMaxPoolSize; } - public void setSnodePushMqttMessageThreadPoolQueueCapacity(int snodePushMqttMessageThreadPoolQueueCapacity) { - this.snodePushMqttMessageThreadPoolQueueCapacity = snodePushMqttMessageThreadPoolQueueCapacity; + public void setPushMqttMessageMaxPoolSize(int pushMqttMessageMaxPoolSize) { + this.pushMqttMessageMaxPoolSize = pushMqttMessageMaxPoolSize; } - public boolean isAclEnable() { - return aclEnable; + public int getPushMqttMessageThreadPoolQueueCapacity() { + return pushMqttMessageThreadPoolQueueCapacity; } - public void setAclEnable(boolean aclEnable) { - this.aclEnable = aclEnable; + public void setPushMqttMessageThreadPoolQueueCapacity(int pushMqttMessageThreadPoolQueueCapacity) { + this.pushMqttMessageThreadPoolQueueCapacity = pushMqttMessageThreadPoolQueueCapacity; } - } diff --git a/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java b/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java index fc1b7e6a..831219bd 100644 --- a/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java @@ -60,12 +60,6 @@ public class SnodeConfig { private int snodeSendMessageMaxPoolSize = 20; - private int snodeHandleMqttThreadPoolQueueCapacity = 10000; - - private int snodeHandleMqttMessageMinPoolSize = 10; - - private int snodeHandleMqttMessageMaxPoolSize = 20; - private int snodeHeartBeatCorePoolSize = 1; private int snodeHeartBeatMaxPoolSize = 2; @@ -88,12 +82,6 @@ public class SnodeConfig { private int snodePushMessageThreadPoolQueueCapacity = 10000; - private int snodePushMqttMessageMinPoolSize = 10; - - private int snodePushMqttMessageMaxPoolSize = 20; - - private int snodePushMqttMessageThreadPoolQueueCapacity = 10000; - private int slowConsumerThreshold = 1024; private final String sendMessageInterceptorPath = "META-INF/service/org.apache.rocketmq.snode.interceptor.SendMessageInterceptor"; @@ -277,30 +265,6 @@ public class SnodeConfig { this.snodeId = snodeId; } - public int getSnodeHandleMqttThreadPoolQueueCapacity() { - return snodeHandleMqttThreadPoolQueueCapacity; - } - - public void setSnodeHandleMqttThreadPoolQueueCapacity(int snodeHandleMqttThreadPoolQueueCapacity) { - this.snodeHandleMqttThreadPoolQueueCapacity = snodeHandleMqttThreadPoolQueueCapacity; - } - - public int getSnodeHandleMqttMessageMinPoolSize() { - return snodeHandleMqttMessageMinPoolSize; - } - - public void setSnodeHandleMqttMessageMinPoolSize(int snodeHandleMqttMessageMinPoolSize) { - this.snodeHandleMqttMessageMinPoolSize = snodeHandleMqttMessageMinPoolSize; - } - - public int getSnodeHandleMqttMessageMaxPoolSize() { - return snodeHandleMqttMessageMaxPoolSize; - } - - public void setSnodeHandleMqttMessageMaxPoolSize(int snodeHandleMqttMessageMaxPoolSize) { - this.snodeHandleMqttMessageMaxPoolSize = snodeHandleMqttMessageMaxPoolSize; - } - public String getSnodeName() { return snodeName; } @@ -365,30 +329,6 @@ public class SnodeConfig { this.snodePushMessageThreadPoolQueueCapacity = snodePushMessageThreadPoolQueueCapacity; } - public int getSnodePushMqttMessageMinPoolSize() { - return snodePushMqttMessageMinPoolSize; - } - - public void setSnodePushMqttMessageMinPoolSize(int snodePushMqttMessageMinPoolSize) { - this.snodePushMqttMessageMinPoolSize = snodePushMqttMessageMinPoolSize; - } - - public int getSnodePushMqttMessageMaxPoolSize() { - return snodePushMqttMessageMaxPoolSize; - } - - public void setSnodePushMqttMessageMaxPoolSize(int snodePushMqttMessageMaxPoolSize) { - this.snodePushMqttMessageMaxPoolSize = snodePushMqttMessageMaxPoolSize; - } - - public int getSnodePushMqttMessageThreadPoolQueueCapacity() { - return snodePushMqttMessageThreadPoolQueueCapacity; - } - - public void setSnodePushMqttMessageThreadPoolQueueCapacity(int snodePushMqttMessageThreadPoolQueueCapacity) { - this.snodePushMqttMessageThreadPoolQueueCapacity = snodePushMqttMessageThreadPoolQueueCapacity; - } - public String getSendMessageInterceptorPath() { return sendMessageInterceptorPath; } diff --git a/pom.xml b/pom.xml index 8b0ce422..3899247f 100644 --- a/pom.xml +++ b/pom.xml @@ -653,6 +653,11 @@ org.eclipse.paho.client.mqttv3 1.2.0 + + com.google.code.gson + gson + LATEST + diff --git a/remoting/pom.xml b/remoting/pom.xml index 9abb2b21..26885a56 100644 --- a/remoting/pom.xml +++ b/remoting/pom.xml @@ -15,7 +15,8 @@ limitations under the License. --> - + org.apache.rocketmq rocketmq-all @@ -38,8 +39,8 @@ fastjson - org.msgpack - msgpack + org.msgpack + msgpack io.netty @@ -62,5 +63,9 @@ com.google.guava guava + + com.google.code.gson + gson + diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/ServerConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/ServerConfig.java index 97c3d3eb..dda2528f 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/ServerConfig.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/ServerConfig.java @@ -20,7 +20,6 @@ import org.apache.rocketmq.remoting.netty.NettySystemConfig; public class ServerConfig implements Cloneable { private int listenPort = 8888; - private int mqttListenPort = 1883; private int serverWorkerThreads = 8; private int serverCallbackExecutorThreads = 8; private int serverSelectorThreads = 3; @@ -76,14 +75,6 @@ public class ServerConfig implements Cloneable { this.listenPort = listenPort; } - public int getMqttListenPort() { - return mqttListenPort; - } - - public void setMqttListenPort(int mqttListenPort) { - this.mqttListenPort = mqttListenPort; - } - public int getServerWorkerThreads() { return serverWorkerThreads; } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java index 7c008faf..e823a69c 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java @@ -80,8 +80,6 @@ public class RemotingCommand { private transient byte[] body; - private Object payload; - public RemotingCommand() { } @@ -261,14 +259,6 @@ public class RemotingCommand { this.body = body; } - public Object getPayload() { - return payload; - } - - public void setPayload(Object payload) { - this.payload = payload; - } - public HashMap getExtFields() { return extFields; } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttRemotingClient.java index 05dd1b25..725cfe74 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttRemotingClient.java @@ -60,7 +60,7 @@ import org.apache.rocketmq.remoting.util.ThreadUtils; public class MqttRemotingClient extends NettyRemotingClientAbstract implements RemotingClient { private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); - private ClientConfig nettyClientConfig; + private ClientConfig mqttClientConfig; private Bootstrap bootstrap = new Bootstrap(); private EventLoopGroup eventLoopGroupWorker; @@ -76,33 +76,38 @@ public class MqttRemotingClient extends NettyRemotingClientAbstract implements R public MqttRemotingClient() { super(); + loadProperties(); } - public MqttRemotingClient(final ClientConfig nettyClientConfig) { - this(nettyClientConfig, null); + private void loadProperties() { + + } + + public MqttRemotingClient(final ClientConfig mqttClientConfig) { + this(mqttClientConfig, null); } - public MqttRemotingClient(final ClientConfig nettyClientConfig, + public MqttRemotingClient(final ClientConfig mqttClientConfig, final ChannelEventListener channelEventListener) { - super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue()); - init(nettyClientConfig, channelEventListener); + super(mqttClientConfig.getClientOnewaySemaphoreValue(), mqttClientConfig.getClientAsyncSemaphoreValue()); + init(mqttClientConfig, channelEventListener); } @Override - public RemotingClient init(ClientConfig nettyClientConfig, ChannelEventListener channelEventListener) { - this.nettyClientConfig = nettyClientConfig; + public RemotingClient init(ClientConfig mqttClientConfig, ChannelEventListener channelEventListener) { + this.mqttClientConfig = mqttClientConfig; this.channelEventListener = channelEventListener; - this.eventLoopGroupWorker = new NioEventLoopGroup(nettyClientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads", - nettyClientConfig.getClientWorkerThreads())); + this.eventLoopGroupWorker = new NioEventLoopGroup(mqttClientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads", + mqttClientConfig.getClientWorkerThreads())); this.publicExecutor = ThreadUtils.newFixedThreadPool( - nettyClientConfig.getClientCallbackExecutorThreads(), - 10000, "Remoting-PublicExecutor", true); - this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(), - ThreadUtils.newGenericThreadFactory("NettyClientWorkerThreads", nettyClientConfig.getClientWorkerThreads())); - if (nettyClientConfig.isUseTLS()) { + mqttClientConfig.getClientCallbackExecutorThreads(), + 10000, "MqttRemoting-PublicExecutor", true); + this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(mqttClientConfig.getClientWorkerThreads(), + ThreadUtils.newGenericThreadFactory("MqttNettyClientWorkerThreads", mqttClientConfig.getClientWorkerThreads())); + if (mqttClientConfig.isUseTLS()) { try { sslContext = TlsHelper.buildSslContext(true); - log.info("SSL enabled for client"); + log.info("SSL enabled for mqtt client"); } catch (IOException e) { log.error("Failed to create SSLContext", e); } catch (CertificateException e) { @@ -123,14 +128,14 @@ public class MqttRemotingClient extends NettyRemotingClientAbstract implements R bootstrap = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, false) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()) - .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()) - .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, mqttClientConfig.getConnectTimeoutMillis()) + .option(ChannelOption.SO_SNDBUF, mqttClientConfig.getClientSocketSndBufSize()) + .option(ChannelOption.SO_RCVBUF, mqttClientConfig.getClientSocketRcvBufSize()) .handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); - if (nettyClientConfig.isUseTLS()) { + if (mqttClientConfig.isUseTLS()) { if (null != sslContext) { pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc())); log.info("Prepend SSL handler"); @@ -142,7 +147,7 @@ public class MqttRemotingClient extends NettyRemotingClientAbstract implements R defaultEventExecutorGroup, MqttEncoder.INSTANCE, new MqttDecoder(), - new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()), + new IdleStateHandler(0, 0, mqttClientConfig.getClientChannelMaxIdleTimeSeconds()), new NettyConnectManageHandler(), new NettyClientHandler()); } @@ -199,7 +204,7 @@ public class MqttRemotingClient extends NettyRemotingClientAbstract implements R throw (RemotingSendRequestException) remotingException; } if (remotingException instanceof RemotingTimeoutException) { - if (nettyClientConfig.isClientCloseSocketIfTimeout()) { + if (mqttClientConfig.isClientCloseSocketIfTimeout()) { this.closeRemotingChannel(addr, remotingChannel); log.warn("InvokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttRemotingServer.java index 6d4fc20a..3a5c688f 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttRemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttRemotingServer.java @@ -71,7 +71,7 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R private ServerBootstrap serverBootstrap; private EventLoopGroup eventLoopGroupSelector; private EventLoopGroup eventLoopGroupBoss; - private ServerConfig nettyServerConfig; + private ServerConfig mqttServerConfig; private ExecutorService publicExecutor; private ChannelEventListener channelEventListener; @@ -89,32 +89,32 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R super(); } - public MqttRemotingServer(final ServerConfig nettyServerConfig) { - this(nettyServerConfig, null); + public MqttRemotingServer(final ServerConfig mqttServerConfig) { + this(mqttServerConfig, null); } - public MqttRemotingServer(final ServerConfig nettyServerConfig, + public MqttRemotingServer(final ServerConfig mqttServerConfig, final ChannelEventListener channelEventListener) { - init(nettyServerConfig, channelEventListener); + init(mqttServerConfig, channelEventListener); } @Override public RemotingServer init(ServerConfig serverConfig, ChannelEventListener channelEventListener) { - this.nettyServerConfig = serverConfig; - super.init(nettyServerConfig.getServerOnewaySemaphoreValue(), - nettyServerConfig.getServerAsyncSemaphoreValue()); + this.mqttServerConfig = serverConfig; + super.init(mqttServerConfig.getServerOnewaySemaphoreValue(), + mqttServerConfig.getServerAsyncSemaphoreValue()); this.serverBootstrap = new ServerBootstrap(); this.channelEventListener = channelEventListener; - int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads(); + int publicThreadNums = mqttServerConfig.getServerCallbackExecutorThreads(); if (publicThreadNums <= 0) { publicThreadNums = 4; } this.publicExecutor = ThreadUtils.newFixedThreadPool( publicThreadNums, 10000, "MqttRemoting-PublicExecutor", true); - if (JvmUtils.isUseEpoll() && this.nettyServerConfig.isUseEpollNativeSelector()) { + if (JvmUtils.isUseEpoll() && this.mqttServerConfig.isUseEpollNativeSelector()) { this.eventLoopGroupSelector = new EpollEventLoopGroup( serverConfig.getServerSelectorThreads(), ThreadUtils.newGenericThreadFactory("MqttNettyEpollIoThreads", @@ -134,7 +134,7 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R serverConfig.getServerSelectorThreads())); this.socketChannelClass = NioServerSocketChannel.class; } - this.port = nettyServerConfig.getMqttListenPort(); + this.port = mqttServerConfig.getListenPort(); this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( serverConfig.getServerWorkerThreads(), ThreadUtils.newGenericThreadFactory("MqttNettyWorkerThreads", @@ -169,9 +169,9 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R .option(ChannelOption.SO_KEEPALIVE, false) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_SNDBUF, - nettyServerConfig.getServerSocketSndBufSize()) + mqttServerConfig.getServerSocketSndBufSize()) .childOption(ChannelOption.SO_RCVBUF, - nettyServerConfig.getServerSocketRcvBufSize()) + mqttServerConfig.getServerSocketRcvBufSize()) .localAddress(new InetSocketAddress(this.port)) .childHandler(new ChannelInitializer() { @Override @@ -184,11 +184,11 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R MqttEncoder.INSTANCE, new MqttMessage2RemotingCommandHandler(), new RemotingCommand2MqttMessageHandler(), - new IdleStateHandler(nettyServerConfig + new IdleStateHandler(mqttServerConfig .getConnectionChannelReaderIdleSeconds(), - nettyServerConfig + mqttServerConfig .getConnectionChannelWriterIdleSeconds(), - nettyServerConfig + mqttServerConfig .getServerChannelMaxIdleTimeSeconds()), new NettyConnectManageHandler(), new NettyServerHandler() @@ -197,7 +197,7 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R } }); - if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) { + if (mqttServerConfig.isServerPooledByteBufAllocatorEnable()) { childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/EncodeDecodeDispatcher.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/EncodeDecodeDispatcher.java index 6b3cb35c..2c17f912 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/EncodeDecodeDispatcher.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/EncodeDecodeDispatcher.java @@ -45,5 +45,4 @@ public class EncodeDecodeDispatcher { public static Map getEncodeDecodeDispatcher() { return encodeDecodeDispatcher; } - } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttConnectEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttConnectEncodeDecode.java index 61586742..b5a394f2 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttConnectEncodeDecode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttConnectEncodeDecode.java @@ -23,6 +23,7 @@ import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; +import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil; public class MqttConnectEncodeDecode implements Message2MessageEncodeDecode { @@ -53,7 +54,7 @@ public class MqttConnectEncodeDecode implements Message2MessageEncodeDecode { requestCommand = RemotingCommand .createRequestCommand(1000, mqttHeader); - requestCommand.setPayload(((MqttConnectMessage) mqttMessage).payload()); + requestCommand.setBody(MqttEncodeDecodeUtil.encode(((MqttConnectMessage) mqttMessage).payload())); return requestCommand; } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPublishEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPublishEncodeDecode.java index 1b30acd1..3476438c 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPublishEncodeDecode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPublishEncodeDecode.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.remoting.transport.mqtt.dispatcher; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessageType; @@ -48,9 +49,12 @@ public class MqttPublishEncodeDecode implements Message2MessageEncodeDecode { requestCommand = RemotingCommand .createRequestCommand(1000, mqttHeader); - //invoke copy to generate a new ByteBuf or increase refCnt by 1 by invoking retain() method, because release method is invoked in Message2MessageEncodeDecode.channelRead - requestCommand.setPayload(((MqttPublishMessage) mqttMessage).payload().copy()); + ByteBuf payload = ((MqttPublishMessage) mqttMessage).payload(); + byte[] body = new byte[payload.readableBytes()]; + payload.readBytes(body); + requestCommand.setBody(body); return requestCommand; + } @Override @@ -60,6 +64,6 @@ public class MqttPublishEncodeDecode implements Message2MessageEncodeDecode { new MqttFixedHeader(MqttMessageType.PUBLISH, mqttHeader.isDup(), MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(), mqttHeader.getRemainingLength()), - new MqttPublishVariableHeader(mqttHeader.getTopicName(), mqttHeader.getPacketId()), (ByteBuf) remotingCommand.getPayload()); + new MqttPublishVariableHeader(mqttHeader.getTopicName(), mqttHeader.getPacketId()), Unpooled.copiedBuffer(remotingCommand.getBody())); } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubackEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubackEncodeDecode.java index c87bc149..a8353a88 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubackEncodeDecode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubackEncodeDecode.java @@ -28,6 +28,7 @@ import java.io.UnsupportedEncodingException; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; +import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil; public class MqttSubackEncodeDecode implements Message2MessageEncodeDecode { @@ -43,6 +44,6 @@ public class MqttSubackEncodeDecode implements Message2MessageEncodeDecode { new MqttFixedHeader(MqttMessageType.SUBACK, mqttHeader.isDup(), MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(), mqttHeader.getRemainingLength()), - MqttMessageIdVariableHeader.from(mqttHeader.getMessageId()), (MqttSubAckPayload) remotingCommand.getPayload()); + MqttMessageIdVariableHeader.from(mqttHeader.getMessageId()), (MqttSubAckPayload) MqttEncodeDecodeUtil.decode(remotingCommand.getBody(),MqttSubAckPayload.class)); } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubscribeEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubscribeEncodeDecode.java index dec009cf..09490370 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubscribeEncodeDecode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubscribeEncodeDecode.java @@ -23,6 +23,7 @@ import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; import io.netty.handler.codec.mqtt.MqttSubscribeMessage; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; +import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil; public class MqttSubscribeEncodeDecode implements Message2MessageEncodeDecode { @@ -44,7 +45,7 @@ public class MqttSubscribeEncodeDecode implements Message2MessageEncodeDecode { requestCommand = RemotingCommand .createRequestCommand(1000, mqttHeader); - requestCommand.setPayload(((MqttSubscribeMessage) mqttMessage).payload()); + requestCommand.setBody(MqttEncodeDecodeUtil.encode(((MqttSubscribeMessage) mqttMessage).payload())); return requestCommand; } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubscribeEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubscribeEncodeDecode.java index 957754bb..49967f81 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubscribeEncodeDecode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubscribeEncodeDecode.java @@ -23,6 +23,7 @@ import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; +import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil; public class MqttUnSubscribeEncodeDecode implements Message2MessageEncodeDecode { @@ -42,7 +43,7 @@ public class MqttUnSubscribeEncodeDecode implements Message2MessageEncodeDecode mqttHeader.setMessageId(variableHeader.messageId()); requestCommand = RemotingCommand.createRequestCommand(1000, mqttHeader); - requestCommand.setPayload(((MqttUnsubscribeMessage) mqttMessage).payload()); + requestCommand.setBody(MqttEncodeDecodeUtil.encode(((MqttUnsubscribeMessage) mqttMessage).payload())); return requestCommand; } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java b/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java new file mode 100644 index 00000000..0eb1730a --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.util; + +import com.google.gson.Gson; +import java.nio.charset.Charset; + +public class MqttEncodeDecodeUtil { + private static final Gson GSON = new Gson(); + + public static byte[] encode(Object object) { + final String json = GSON.toJson(object); + if (json != null) { + return json.getBytes(Charset.forName("UTF-8")); + } + return null; + } + + public static Object decode(byte[] body, Class classOfT) { + final String json = new String(body, Charset.forName("UTF-8")); + return GSON.fromJson(json, classOfT); + } +} diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java index 8716cdd9..6f54ae02 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java @@ -24,6 +24,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.acl.AccessValidator; import org.apache.rocketmq.broker.BrokerStartup; +import org.apache.rocketmq.common.MqttConfig; import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; @@ -85,8 +86,11 @@ public class SnodeController { .getLogger(LoggerName.SNODE_LOGGER_NAME); private final SnodeConfig snodeConfig; + private final MqttConfig mqttConfig; private final ServerConfig nettyServerConfig; private final ClientConfig nettyClientConfig; + private final ServerConfig mqttServerConfig; + private final ClientConfig mqttClientConfig; private RemotingClient remotingClient; private RemotingServer snodeServer; private RemotingClient mqttRemotingClient; @@ -125,12 +129,13 @@ public class SnodeController { .newSingleThreadScheduledExecutor(new ThreadFactoryImpl( "SnodeControllerScheduledThread")); - public SnodeController(ServerConfig nettyServerConfig, - ClientConfig nettyClientConfig, - SnodeConfig snodeConfig) { - this.nettyClientConfig = nettyClientConfig; - this.nettyServerConfig = nettyServerConfig; + public SnodeController(SnodeConfig snodeConfig, MqttConfig mqttConfig) { + this.nettyClientConfig = snodeConfig.getNettyClientConfig(); + this.nettyServerConfig = snodeConfig.getNettyServerConfig(); + this.mqttServerConfig = mqttConfig.getMqttServerConfig(); + this.mqttClientConfig = mqttConfig.getMqttClientConfig(); this.snodeConfig = snodeConfig; + this.mqttConfig = mqttConfig; if (!this.snodeConfig.isEmbeddedModeEnable()) { this.enodeService = new RemoteEnodeServiceImpl(this); } else { @@ -144,8 +149,9 @@ public class SnodeController { } this.mqttRemotingClient = RemotingClientFactory.getInstance() .createRemotingClient(RemotingUtil.MQTT_PROTOCOL); + if (this.mqttRemotingClient != null) { - this.mqttRemotingClient.init(this.getNettyClientConfig(), null); + this.mqttRemotingClient.init(this.mqttClientConfig, null); } this.sendMessageExecutor = ThreadUtils.newThreadPoolExecutor( @@ -185,12 +191,12 @@ public class SnodeController { false); this.handleMqttMessageExecutor = ThreadUtils.newThreadPoolExecutor( - snodeConfig.getSnodeHandleMqttMessageMinPoolSize(), - snodeConfig.getSnodeHandleMqttMessageMaxPoolSize(), + mqttConfig.getHandleMqttMessageMinPoolSize(), + mqttConfig.getHandleMqttMessageMaxPoolSize(), 3000, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(snodeConfig.getSnodeHandleMqttThreadPoolQueueCapacity()), - "SnodeHandleMqttMessageThread", + new ArrayBlockingQueue<>(mqttConfig.getHandleMqttThreadPoolQueueCapacity()), + "handleMqttMessageThread", false); if (this.snodeConfig.getNamesrvAddr() != null) { @@ -224,6 +230,10 @@ public class SnodeController { return snodeConfig; } + public MqttConfig getMqttConfig() { + return mqttConfig; + } + private void initRemotingServerInterceptorGroup() { List remotingServerInterceptors = InterceptorFactory.getInstance() .loadInterceptors(this.snodeConfig.getRemotingServerInterceptorPath()); @@ -251,7 +261,7 @@ public class SnodeController { this.mqttRemotingServer = RemotingServerFactory.getInstance().createRemotingServer( RemotingUtil.MQTT_PROTOCOL); if (this.mqttRemotingServer != null) { - this.mqttRemotingServer.init(this.nettyServerConfig, this.clientHousekeepingService); + this.mqttRemotingServer.init(this.mqttServerConfig, this.clientHousekeepingService); this.mqttRemotingServer.registerInterceptorGroup(this.remotingServerInterceptorGroup); } registerProcessor(); @@ -350,7 +360,6 @@ public class SnodeController { } public void start() { - initialize(); if (snodeServer != null) { this.snodeServer.start(); } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java index a8d5b7dd..fffb6682 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java @@ -20,7 +20,9 @@ import ch.qos.logback.classic.LoggerContext; import ch.qos.logback.classic.joran.JoranConfigurator; import ch.qos.logback.core.joran.spi.JoranException; import java.io.BufferedInputStream; +import java.io.File; import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.util.Properties; @@ -32,6 +34,7 @@ import org.apache.commons.cli.PosixParser; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerStartup; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.MqttConfig; import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; @@ -48,19 +51,22 @@ import org.slf4j.LoggerFactory; import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_ENABLE; public class SnodeStartup { - private static InternalLogger log; + private static InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); public static Properties properties = null; public static CommandLine commandLine = null; public static String configFile = null; + private static final String DEFAULT_MQTT_CONFIG_FILE = "/conf/mqtt.properties"; + private static String mqttConfigFileName = System.getProperty("rocketmq.mqtt.config", DEFAULT_MQTT_CONFIG_FILE); public static void main(String[] args) throws IOException, JoranException { SnodeConfig snodeConfig = loadConfig(args); + MqttConfig mqttConfig = loadMqttConfig(snodeConfig); if (snodeConfig.isEmbeddedModeEnable()) { BrokerController brokerController = BrokerStartup.createBrokerController(args); BrokerStartup.start(brokerController); snodeConfig.setSnodeName(brokerController.getBrokerConfig().getBrokerName()); } - SnodeController snodeController = createSnodeController(snodeConfig); + SnodeController snodeController = createSnodeController(snodeConfig, mqttConfig); startup(snodeController); } @@ -94,7 +100,6 @@ public class SnodeStartup { SnodeConfig snodeConfig = new SnodeConfig(); final ServerConfig nettyServerConfig = new ServerConfig(); final ClientConfig nettyClientConfig = new ClientConfig(); - nettyServerConfig.setListenPort(snodeConfig.getListenPort()); nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE, String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING)))); @@ -142,12 +147,25 @@ public class SnodeStartup { return snodeConfig; } - public static SnodeController createSnodeController(SnodeConfig snodeConfig) throws JoranException { + public static MqttConfig loadMqttConfig(SnodeConfig snodeConfig) throws IOException { + MqttConfig mqttConfig = new MqttConfig(); + final ServerConfig mqttServerConfig = new ServerConfig(); + final ClientConfig mqttClientConfig = new ClientConfig(); + mqttServerConfig.setListenPort(mqttConfig.getListenPort()); + String file = snodeConfig.getRocketmqHome() + File.separator + mqttConfigFileName; + loadMqttProperties(file, mqttServerConfig, mqttClientConfig); + mqttConfig.setMqttServerConfig(mqttServerConfig); + mqttConfig.setMqttClientConfig(mqttClientConfig); + + MixAll.printObjectProperties(log, mqttConfig); + MixAll.printObjectProperties(log, mqttConfig.getMqttServerConfig()); + MixAll.printObjectProperties(log, mqttConfig.getMqttClientConfig()); + return mqttConfig; + } + + public static SnodeController createSnodeController(SnodeConfig snodeConfig, MqttConfig mqttConfig) throws JoranException { - final SnodeController snodeController = new SnodeController( - snodeConfig.getNettyServerConfig(), - snodeConfig.getNettyClientConfig(), - snodeConfig); + final SnodeController snodeController = new SnodeController(snodeConfig, mqttConfig); boolean initResult = snodeController.initialize(); if (!initResult) { @@ -199,5 +217,21 @@ public class SnodeStartup { return options; } + + private static void loadMqttProperties(String file, ServerConfig mqttServerConfig, + ClientConfig mqttClientConfig) throws IOException { + InputStream in; + try { + in = new BufferedInputStream(new FileInputStream(file)); + Properties properties = new Properties(); + properties.load(in); + MixAll.properties2Object(properties, mqttServerConfig); + MixAll.properties2Object(properties, mqttClientConfig); + in.close(); + } catch (FileNotFoundException e) { + log.info("The mqtt config file is not found. filePath={}", file); + } + + } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java index 8ae89844..9eb62c24 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java @@ -17,7 +17,7 @@ package org.apache.rocketmq.snode.processor; -import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttConnectPayload; import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; @@ -41,6 +41,7 @@ import org.apache.rocketmq.remoting.RequestProcessor; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; +import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil; import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.processor.mqtthandler.MessageHandler; import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler; @@ -98,16 +99,18 @@ public class DefaultMqttMessageProcessor implements RequestProcessor { mqttHeader.isHasPassword(), mqttHeader.isWillRetain(), mqttHeader.getWillQos(), mqttHeader.isWillFlag(), mqttHeader.isCleanSession(), mqttHeader.getKeepAliveTimeSeconds()); - MqttConnectPayload mqttConnectPayload = (MqttConnectPayload) message.getPayload(); +// MqttConnectPayload mqttConnectPayload = (MqttConnectPayload) message.getPayload(); + MqttConnectPayload mqttConnectPayload = (MqttConnectPayload) MqttEncodeDecodeUtil.decode(message.getBody(), MqttConnectPayload.class); mqttMessage = new MqttConnectMessage(fixedHeader, mqttConnectVariableHeader, mqttConnectPayload); break; case PUBLISH: MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(mqttHeader.getTopicName(), mqttHeader.getPacketId()); - mqttMessage = new MqttPublishMessage(fixedHeader, mqttPublishVariableHeader, (ByteBuf) message.getPayload()); + mqttMessage = new MqttPublishMessage(fixedHeader, mqttPublishVariableHeader, Unpooled.copiedBuffer(message.getBody())); break; case SUBSCRIBE: MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(mqttHeader.getMessageId()); - mqttMessage = new MqttSubscribeMessage(fixedHeader, mqttMessageIdVariableHeader, (MqttSubscribePayload) message.getPayload()); + MqttSubscribePayload mqttSubscribePayload = (MqttSubscribePayload) MqttEncodeDecodeUtil.decode(message.getBody(), MqttSubscribePayload.class); + mqttMessage = new MqttSubscribeMessage(fixedHeader, mqttMessageIdVariableHeader, mqttSubscribePayload); break; case UNSUBSCRIBE: case PINGREQ: diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttSubscribeMessageHandler.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttSubscribeMessageHandler.java index 59999967..34f91d36 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttSubscribeMessageHandler.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttSubscribeMessageHandler.java @@ -38,6 +38,7 @@ import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; +import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil; import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.client.Client; import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl; @@ -104,7 +105,7 @@ public class MqttSubscribeMessageHandler implements MessageHandler { List grantQoss = doSubscribe(client, payload.topicSubscriptions(), iotClientManager); //Publish retained messages to subscribers. MqttSubAckPayload mqttSubAckPayload = new MqttSubAckPayload(grantQoss); - command.setPayload(mqttSubAckPayload); + command.setBody(MqttEncodeDecodeUtil.encode(mqttSubAckPayload)); mqttHeader.setRemainingLength(0x02 + mqttSubAckPayload.grantedQoSLevels().size()); command.setRemark(null); command.setCode(ResponseCode.SUCCESS); diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/MqttPushServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/MqttPushServiceImpl.java index ea8b9734..e8530259 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/MqttPushServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/MqttPushServiceImpl.java @@ -53,12 +53,12 @@ public class MqttPushServiceImpl { public MqttPushServiceImpl(final SnodeController snodeController) { this.snodeController = snodeController; pushMqttMessageExecutorService = ThreadUtils.newThreadPoolExecutor( - this.snodeController.getSnodeConfig().getSnodePushMqttMessageMinPoolSize(), - this.snodeController.getSnodeConfig().getSnodePushMqttMessageMaxPoolSize(), + this.snodeController.getMqttConfig().getPushMqttMessageMinPoolSize(), + this.snodeController.getMqttConfig().getPushMqttMessageMaxPoolSize(), 3000, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(this.snodeController.getSnodeConfig().getSnodePushMqttMessageThreadPoolQueueCapacity()), - "SnodePushMqttMessageThread", + new ArrayBlockingQueue<>(this.snodeController.getMqttConfig().getPushMqttMessageThreadPoolQueueCapacity()), + "pushMqttMessageThread", false); } @@ -106,12 +106,14 @@ public class MqttPushServiceImpl { if (client.getRemotingChannel() instanceof NettyChannelHandlerContextImpl) { remotingChannel = new NettyChannelImpl(((NettyChannelHandlerContextImpl) client.getRemotingChannel()).getChannelHandlerContext().channel()); } - requestCommand.setPayload(message.copy()); + byte[] body = new byte[message.readableBytes()]; + message.readBytes(body); + requestCommand.setBody(body); snodeController.getMqttRemotingServer().push(remotingChannel, requestCommand, SnodeConstant.DEFAULT_TIMEOUT_MILLS); } } catch (Exception ex) { log.warn("Exception was thrown when pushing MQTT message to topic: {}, exception={}", topic, ex.getMessage()); - }finally { + } finally { System.out.println("Release Bytebuf"); ReferenceCountUtil.release(message); } @@ -136,7 +138,6 @@ public class MqttPushServiceImpl { mqttHeader.setRemainingLength(4 + topic.getBytes().length + message.readableBytes()); RemotingCommand pushMessage = RemotingCommand.createRequestCommand(RequestCode.MQTT_MESSAGE, mqttHeader); -// pushMessage.setPayload(message); return pushMessage; } diff --git a/snode/src/test/java/org/apache/rocketmq/snode/SnodeControllerTest.java b/snode/src/test/java/org/apache/rocketmq/snode/SnodeControllerTest.java index 414f06bb..a8c35a72 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/SnodeControllerTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/SnodeControllerTest.java @@ -16,8 +16,8 @@ */ package org.apache.rocketmq.snode; +import org.apache.rocketmq.common.MqttConfig; import org.apache.rocketmq.common.SnodeConfig; -import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.ServerConfig; import org.junit.Test; @@ -29,10 +29,7 @@ public class SnodeControllerTest { public void testSnodeRestart() { ServerConfig serverConfig = new ServerConfig(); serverConfig.setListenPort(10912); - SnodeController snodeController = new SnodeController( - serverConfig, - new ClientConfig(), - new SnodeConfig()); + SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig()); assertThat(snodeController.initialize()); snodeController.start(); snodeController.shutdown(); @@ -44,10 +41,7 @@ public class SnodeControllerTest { System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl.yml"); SnodeConfig snodeConfig = new SnodeConfig(); snodeConfig.setAclEnable(true); - SnodeController snodeController = new SnodeController( - new ServerConfig(), - new ClientConfig(), - snodeConfig); + SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig()); assertThat(snodeController.initialize()); snodeController.start(); snodeController.shutdown(); diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java b/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java index 6f75a326..076a0052 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java @@ -20,14 +20,14 @@ import io.netty.handler.codec.mqtt.MqttConnectPayload; import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttQoS; import java.io.UnsupportedEncodingException; +import org.apache.rocketmq.common.MqttConfig; import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.protocol.RequestCode; -import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.RemotingChannel; -import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; +import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil; import org.apache.rocketmq.snode.SnodeController; import org.junit.Before; import org.junit.Test; @@ -41,7 +41,7 @@ public class DefaultMqttMessageProcessorTest { private DefaultMqttMessageProcessor defaultMqttMessageProcessor; @Spy - private SnodeController snodeController = new SnodeController(new ServerConfig(), new ClientConfig(), new SnodeConfig()); + private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig()); @Mock private RemotingChannel remotingChannel; @@ -88,7 +88,7 @@ public class DefaultMqttMessageProcessorTest { MqttHeader mqttHeader = createMqttConnectMesssageHeader(); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.MQTT_MESSAGE, mqttHeader); MqttConnectPayload payload = new MqttConnectPayload("1234567", "testTopic", "willMessage".getBytes(), null, "1234567".getBytes()); - request.setPayload(payload); + request.setBody(MqttEncodeDecodeUtil.encode(payload)); return request; } } diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java b/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java index 8846ef1f..b0301e7e 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java @@ -22,10 +22,9 @@ import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttQoS; +import org.apache.rocketmq.common.MqttConfig; import org.apache.rocketmq.common.SnodeConfig; -import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.RemotingChannel; -import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler; import org.junit.Test; @@ -42,12 +41,10 @@ public class MqttConnectMessageHandlerTest { @Test public void testHandlerMessage() throws Exception { - MqttConnectMessageHandler mqttConnectMessageHandler = new MqttConnectMessageHandler( - new SnodeController(new ServerConfig(), new ClientConfig(), new SnodeConfig())); - + MqttConnectMessageHandler mqttConnectMessageHandler = new MqttConnectMessageHandler(new SnodeController(new SnodeConfig(), new MqttConfig())); MqttConnectPayload payload = new MqttConnectPayload("1234567", "testTopic", "willMessage".getBytes(), null, "1234567".getBytes()); MqttConnectMessage mqttConnectMessage = new MqttConnectMessage(new MqttFixedHeader( - MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 200),new MqttConnectVariableHeader(null,4,false,false,false,0,false,false,50),new MqttConnectPayload("abcd", "ttest", "message".getBytes(),"user","password".getBytes())); + MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 200), new MqttConnectVariableHeader(null, 4, false, false, false, 0, false, false, 50), new MqttConnectPayload("abcd", "ttest", "message".getBytes(), "user", "password".getBytes())); mqttConnectMessageHandler.handleMessage(mqttConnectMessage, remotingChannel); } diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttDisconnectMessageHandlerTest.java b/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttDisconnectMessageHandlerTest.java index 4bb73dfc..0f474b13 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttDisconnectMessageHandlerTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttDisconnectMessageHandlerTest.java @@ -20,11 +20,10 @@ import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttQoS; +import org.apache.rocketmq.common.MqttConfig; import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.message.mqtt.WillMessage; -import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.RemotingChannel; -import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.client.Client; import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl; @@ -43,8 +42,7 @@ public class MqttDisconnectMessageHandlerTest { @Test public void testHandlerMessage() throws Exception { - SnodeController snodeController = new SnodeController(new ServerConfig(), - new ClientConfig(), new SnodeConfig()); + SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig()); MqttDisconnectMessageHandler mqttDisconnectMessageHandler = new MqttDisconnectMessageHandler( snodeController); Client client = new Client(); diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java b/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java index a97f169d..db70f982 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java @@ -18,13 +18,12 @@ package org.apache.rocketmq.snode.processor; import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.MqttConfig; import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2; -import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.RemotingChannel; -import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.CodecHelper; import org.apache.rocketmq.remoting.protocol.RemotingCommand; @@ -48,7 +47,7 @@ public class SendMessageProcessorTest { private SendMessageProcessor sendMessageProcessor; @Spy - private SnodeController snodeController = new SnodeController(new ServerConfig(), new ClientConfig(), new SnodeConfig()); + private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig()); @Mock private RemotingChannel remotingChannel; @@ -56,7 +55,7 @@ public class SendMessageProcessorTest { private String topic = "snodeTopic"; private String group = "snodeGroup"; - + @Mock private EnodeService enodeService; diff --git a/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java b/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java index ff736c29..b39a0ae8 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java @@ -18,9 +18,8 @@ package org.apache.rocketmq.snode.service; import java.util.ArrayList; import java.util.List; +import org.apache.rocketmq.common.MqttConfig; import org.apache.rocketmq.common.SnodeConfig; -import org.apache.rocketmq.remoting.ClientConfig; -import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; @@ -46,7 +45,7 @@ import static org.mockito.Mockito.when; public class NnodeServiceImplTest extends SnodeTestBase { @Spy - private SnodeController snodeController = new SnodeController(new ServerConfig(), new ClientConfig(), new SnodeConfig()); + private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig()); @Mock private NettyRemotingClient remotingClient; diff --git a/snode/src/test/java/org/apache/rocketmq/snode/service/RemoteEnodeServiceImplTest.java b/snode/src/test/java/org/apache/rocketmq/snode/service/RemoteEnodeServiceImplTest.java index 582c16e1..4e529751 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/service/RemoteEnodeServiceImplTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/service/RemoteEnodeServiceImplTest.java @@ -18,13 +18,12 @@ package org.apache.rocketmq.snode.service; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.common.MqttConfig; import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; -import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.InvokeCallback; -import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.remoting.netty.ResponseFuture; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient; @@ -56,7 +55,7 @@ public class RemoteEnodeServiceImplTest extends SnodeTestBase { private EnodeService enodeService; @Spy - private SnodeController snodeController = new SnodeController(new ServerConfig(), new ClientConfig(), new SnodeConfig()); + private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig()); @Mock private NnodeService nnodeService; diff --git a/snode/src/test/java/org/apache/rocketmq/snode/service/SlowConsumerServiceImplTest.java b/snode/src/test/java/org/apache/rocketmq/snode/service/SlowConsumerServiceImplTest.java index 8b9457ca..048330dc 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/service/SlowConsumerServiceImplTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/service/SlowConsumerServiceImplTest.java @@ -16,9 +16,8 @@ */ package org.apache.rocketmq.snode.service; +import org.apache.rocketmq.common.MqttConfig; import org.apache.rocketmq.common.SnodeConfig; -import org.apache.rocketmq.remoting.ClientConfig; -import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.client.SlowConsumerService; import org.apache.rocketmq.snode.client.impl.SlowConsumerServiceImpl; @@ -38,7 +37,7 @@ import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) public class SlowConsumerServiceImplTest { @Spy - private SnodeController snodeController = new SnodeController(new ServerConfig(), new ClientConfig(), new SnodeConfig()); + private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig()); private final String enodeName = "testEndoe"; diff --git a/snode/src/test/java/org/apache/rocketmq/snode/service/WillMessageServiceImplTest.java b/snode/src/test/java/org/apache/rocketmq/snode/service/WillMessageServiceImplTest.java index f9ca3536..57f7c7a5 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/service/WillMessageServiceImplTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/service/WillMessageServiceImplTest.java @@ -16,10 +16,9 @@ */ package org.apache.rocketmq.snode.service; +import org.apache.rocketmq.common.MqttConfig; import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.message.mqtt.WillMessage; -import org.apache.rocketmq.remoting.ClientConfig; -import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.SnodeTestBase; import org.apache.rocketmq.snode.service.impl.WillMessageServiceImpl; @@ -33,8 +32,7 @@ import org.mockito.junit.MockitoJUnitRunner; public class WillMessageServiceImplTest extends SnodeTestBase { @Spy - private SnodeController snodeController = new SnodeController(new ServerConfig(), - new ClientConfig(), new SnodeConfig()); + private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig()); private WillMessageService willMessageService; -- GitLab