提交 c36f09aa 编写于 作者: C chengxiangwang

1.add mqttServerConfig/mqttClientConfig 2.delete payload from RemotingCommand

上级 bd21a92d
......@@ -16,32 +16,32 @@
*/
package org.apache.rocketmq.common;
import java.net.InetAddress;
import java.net.UnknownHostException;
import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig;
public class MqttConfig {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
private ServerConfig mqttServerConfig;
private int snodeHandleMqttThreadPoolQueueCapacity = 10000;
private ClientConfig mqttClientConfig;
private int snodeHandleMqttMessageMinPoolSize = 10;
private int handleMqttThreadPoolQueueCapacity = 10000;
private int snodeHandleMqttMessageMaxPoolSize = 20;
private int handleMqttMessageMinPoolSize = 10;
private long houseKeepingInterval = 10 * 1000;
private int handleMqttMessageMaxPoolSize = 20;
private int snodePushMqttMessageMinPoolSize = 10;
private int pushMqttMessageMinPoolSize = 10;
private int snodePushMqttMessageMaxPoolSize = 20;
private int pushMqttMessageMaxPoolSize = 20;
private int snodePushMqttMessageThreadPoolQueueCapacity = 10000;
private int pushMqttMessageThreadPoolQueueCapacity = 10000;
private int listenPort = 1883;
/**
......@@ -50,119 +50,84 @@ public class MqttConfig {
@ImportantField
private boolean aclEnable = false;
public long getHouseKeepingInterval() {
return houseKeepingInterval;
}
public void setHouseKeepingInterval(long houseKeepingInterval) {
this.houseKeepingInterval = houseKeepingInterval;
}
/**
* This configurable item defines interval of topics registration of broker to name server. Allowing values are
* between 10, 000 and 60, 000 milliseconds.
*/
private int registerNameServerPeriod = 1000 * 30;
public int getRegisterNameServerPeriod() {
return registerNameServerPeriod;
}
public void setRegisterNameServerPeriod(int registerNameServerPeriod) {
this.registerNameServerPeriod = registerNameServerPeriod;
}
@ImportantField
private boolean fetchNamesrvAddrByAddressServer = false;
public static String localHostName() {
try {
return InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
log.error("Failed to obtain the host name", e);
}
return "DEFAULT_SNODE";
public int getListenPort() {
return listenPort;
}
public boolean isFetchNamesrvAddrByAddressServer() {
return fetchNamesrvAddrByAddressServer;
public void setListenPort(int listenPort) {
this.listenPort = listenPort;
}
public void setFetchNamesrvAddrByAddressServer(boolean fetchNamesrvAddrByAddressServer) {
this.fetchNamesrvAddrByAddressServer = fetchNamesrvAddrByAddressServer;
}
public int getListenPort() {
return listenPort;
public boolean isAclEnable() {
return aclEnable;
}
public String getRocketmqHome() {
return rocketmqHome;
public void setAclEnable(boolean aclEnable) {
this.aclEnable = aclEnable;
}
public void setRocketmqHome(String rocketmqHome) {
this.rocketmqHome = rocketmqHome;
public ServerConfig getMqttServerConfig() {
return mqttServerConfig;
}
public void setListenPort(int listenPort) {
this.listenPort = listenPort;
public void setMqttServerConfig(ServerConfig mqttServerConfig) {
this.mqttServerConfig = mqttServerConfig;
}
public int getSnodeHandleMqttThreadPoolQueueCapacity() {
return snodeHandleMqttThreadPoolQueueCapacity;
public ClientConfig getMqttClientConfig() {
return mqttClientConfig;
}
public void setSnodeHandleMqttThreadPoolQueueCapacity(int snodeHandleMqttThreadPoolQueueCapacity) {
this.snodeHandleMqttThreadPoolQueueCapacity = snodeHandleMqttThreadPoolQueueCapacity;
public void setMqttClientConfig(ClientConfig mqttClientConfig) {
this.mqttClientConfig = mqttClientConfig;
}
public int getSnodeHandleMqttMessageMinPoolSize() {
return snodeHandleMqttMessageMinPoolSize;
public int getHandleMqttThreadPoolQueueCapacity() {
return handleMqttThreadPoolQueueCapacity;
}
public void setSnodeHandleMqttMessageMinPoolSize(int snodeHandleMqttMessageMinPoolSize) {
this.snodeHandleMqttMessageMinPoolSize = snodeHandleMqttMessageMinPoolSize;
public void setHandleMqttThreadPoolQueueCapacity(int handleMqttThreadPoolQueueCapacity) {
this.handleMqttThreadPoolQueueCapacity = handleMqttThreadPoolQueueCapacity;
}
public int getSnodeHandleMqttMessageMaxPoolSize() {
return snodeHandleMqttMessageMaxPoolSize;
public int getHandleMqttMessageMinPoolSize() {
return handleMqttMessageMinPoolSize;
}
public void setSnodeHandleMqttMessageMaxPoolSize(int snodeHandleMqttMessageMaxPoolSize) {
this.snodeHandleMqttMessageMaxPoolSize = snodeHandleMqttMessageMaxPoolSize;
public void setHandleMqttMessageMinPoolSize(int handleMqttMessageMinPoolSize) {
this.handleMqttMessageMinPoolSize = handleMqttMessageMinPoolSize;
}
public int getSnodePushMqttMessageMinPoolSize() {
return snodePushMqttMessageMinPoolSize;
public int getHandleMqttMessageMaxPoolSize() {
return handleMqttMessageMaxPoolSize;
}
public void setSnodePushMqttMessageMinPoolSize(int snodePushMqttMessageMinPoolSize) {
this.snodePushMqttMessageMinPoolSize = snodePushMqttMessageMinPoolSize;
public void setHandleMqttMessageMaxPoolSize(int handleMqttMessageMaxPoolSize) {
this.handleMqttMessageMaxPoolSize = handleMqttMessageMaxPoolSize;
}
public int getSnodePushMqttMessageMaxPoolSize() {
return snodePushMqttMessageMaxPoolSize;
public int getPushMqttMessageMinPoolSize() {
return pushMqttMessageMinPoolSize;
}
public void setSnodePushMqttMessageMaxPoolSize(int snodePushMqttMessageMaxPoolSize) {
this.snodePushMqttMessageMaxPoolSize = snodePushMqttMessageMaxPoolSize;
public void setPushMqttMessageMinPoolSize(int pushMqttMessageMinPoolSize) {
this.pushMqttMessageMinPoolSize = pushMqttMessageMinPoolSize;
}
public int getSnodePushMqttMessageThreadPoolQueueCapacity() {
return snodePushMqttMessageThreadPoolQueueCapacity;
public int getPushMqttMessageMaxPoolSize() {
return pushMqttMessageMaxPoolSize;
}
public void setSnodePushMqttMessageThreadPoolQueueCapacity(int snodePushMqttMessageThreadPoolQueueCapacity) {
this.snodePushMqttMessageThreadPoolQueueCapacity = snodePushMqttMessageThreadPoolQueueCapacity;
public void setPushMqttMessageMaxPoolSize(int pushMqttMessageMaxPoolSize) {
this.pushMqttMessageMaxPoolSize = pushMqttMessageMaxPoolSize;
}
public boolean isAclEnable() {
return aclEnable;
public int getPushMqttMessageThreadPoolQueueCapacity() {
return pushMqttMessageThreadPoolQueueCapacity;
}
public void setAclEnable(boolean aclEnable) {
this.aclEnable = aclEnable;
public void setPushMqttMessageThreadPoolQueueCapacity(int pushMqttMessageThreadPoolQueueCapacity) {
this.pushMqttMessageThreadPoolQueueCapacity = pushMqttMessageThreadPoolQueueCapacity;
}
}
......@@ -60,12 +60,6 @@ public class SnodeConfig {
private int snodeSendMessageMaxPoolSize = 20;
private int snodeHandleMqttThreadPoolQueueCapacity = 10000;
private int snodeHandleMqttMessageMinPoolSize = 10;
private int snodeHandleMqttMessageMaxPoolSize = 20;
private int snodeHeartBeatCorePoolSize = 1;
private int snodeHeartBeatMaxPoolSize = 2;
......@@ -88,12 +82,6 @@ public class SnodeConfig {
private int snodePushMessageThreadPoolQueueCapacity = 10000;
private int snodePushMqttMessageMinPoolSize = 10;
private int snodePushMqttMessageMaxPoolSize = 20;
private int snodePushMqttMessageThreadPoolQueueCapacity = 10000;
private int slowConsumerThreshold = 1024;
private final String sendMessageInterceptorPath = "META-INF/service/org.apache.rocketmq.snode.interceptor.SendMessageInterceptor";
......@@ -277,30 +265,6 @@ public class SnodeConfig {
this.snodeId = snodeId;
}
public int getSnodeHandleMqttThreadPoolQueueCapacity() {
return snodeHandleMqttThreadPoolQueueCapacity;
}
public void setSnodeHandleMqttThreadPoolQueueCapacity(int snodeHandleMqttThreadPoolQueueCapacity) {
this.snodeHandleMqttThreadPoolQueueCapacity = snodeHandleMqttThreadPoolQueueCapacity;
}
public int getSnodeHandleMqttMessageMinPoolSize() {
return snodeHandleMqttMessageMinPoolSize;
}
public void setSnodeHandleMqttMessageMinPoolSize(int snodeHandleMqttMessageMinPoolSize) {
this.snodeHandleMqttMessageMinPoolSize = snodeHandleMqttMessageMinPoolSize;
}
public int getSnodeHandleMqttMessageMaxPoolSize() {
return snodeHandleMqttMessageMaxPoolSize;
}
public void setSnodeHandleMqttMessageMaxPoolSize(int snodeHandleMqttMessageMaxPoolSize) {
this.snodeHandleMqttMessageMaxPoolSize = snodeHandleMqttMessageMaxPoolSize;
}
public String getSnodeName() {
return snodeName;
}
......@@ -365,30 +329,6 @@ public class SnodeConfig {
this.snodePushMessageThreadPoolQueueCapacity = snodePushMessageThreadPoolQueueCapacity;
}
public int getSnodePushMqttMessageMinPoolSize() {
return snodePushMqttMessageMinPoolSize;
}
public void setSnodePushMqttMessageMinPoolSize(int snodePushMqttMessageMinPoolSize) {
this.snodePushMqttMessageMinPoolSize = snodePushMqttMessageMinPoolSize;
}
public int getSnodePushMqttMessageMaxPoolSize() {
return snodePushMqttMessageMaxPoolSize;
}
public void setSnodePushMqttMessageMaxPoolSize(int snodePushMqttMessageMaxPoolSize) {
this.snodePushMqttMessageMaxPoolSize = snodePushMqttMessageMaxPoolSize;
}
public int getSnodePushMqttMessageThreadPoolQueueCapacity() {
return snodePushMqttMessageThreadPoolQueueCapacity;
}
public void setSnodePushMqttMessageThreadPoolQueueCapacity(int snodePushMqttMessageThreadPoolQueueCapacity) {
this.snodePushMqttMessageThreadPoolQueueCapacity = snodePushMqttMessageThreadPoolQueueCapacity;
}
public String getSendMessageInterceptorPath() {
return sendMessageInterceptorPath;
}
......
......@@ -653,6 +653,11 @@
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>LATEST</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>
......@@ -15,7 +15,8 @@
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
......@@ -38,8 +39,8 @@
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
......@@ -62,5 +63,9 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
</dependencies>
</project>
......@@ -20,7 +20,6 @@ import org.apache.rocketmq.remoting.netty.NettySystemConfig;
public class ServerConfig implements Cloneable {
private int listenPort = 8888;
private int mqttListenPort = 1883;
private int serverWorkerThreads = 8;
private int serverCallbackExecutorThreads = 8;
private int serverSelectorThreads = 3;
......@@ -76,14 +75,6 @@ public class ServerConfig implements Cloneable {
this.listenPort = listenPort;
}
public int getMqttListenPort() {
return mqttListenPort;
}
public void setMqttListenPort(int mqttListenPort) {
this.mqttListenPort = mqttListenPort;
}
public int getServerWorkerThreads() {
return serverWorkerThreads;
}
......
......@@ -80,8 +80,6 @@ public class RemotingCommand {
private transient byte[] body;
private Object payload;
public RemotingCommand() {
}
......@@ -261,14 +259,6 @@ public class RemotingCommand {
this.body = body;
}
public Object getPayload() {
return payload;
}
public void setPayload(Object payload) {
this.payload = payload;
}
public HashMap<String, String> getExtFields() {
return extFields;
}
......
......@@ -60,7 +60,7 @@ import org.apache.rocketmq.remoting.util.ThreadUtils;
public class MqttRemotingClient extends NettyRemotingClientAbstract implements RemotingClient {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private ClientConfig nettyClientConfig;
private ClientConfig mqttClientConfig;
private Bootstrap bootstrap = new Bootstrap();
private EventLoopGroup eventLoopGroupWorker;
......@@ -76,33 +76,38 @@ public class MqttRemotingClient extends NettyRemotingClientAbstract implements R
public MqttRemotingClient() {
super();
loadProperties();
}
public MqttRemotingClient(final ClientConfig nettyClientConfig) {
this(nettyClientConfig, null);
private void loadProperties() {
}
public MqttRemotingClient(final ClientConfig mqttClientConfig) {
this(mqttClientConfig, null);
}
public MqttRemotingClient(final ClientConfig nettyClientConfig,
public MqttRemotingClient(final ClientConfig mqttClientConfig,
final ChannelEventListener channelEventListener) {
super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
init(nettyClientConfig, channelEventListener);
super(mqttClientConfig.getClientOnewaySemaphoreValue(), mqttClientConfig.getClientAsyncSemaphoreValue());
init(mqttClientConfig, channelEventListener);
}
@Override
public RemotingClient init(ClientConfig nettyClientConfig, ChannelEventListener channelEventListener) {
this.nettyClientConfig = nettyClientConfig;
public RemotingClient init(ClientConfig mqttClientConfig, ChannelEventListener channelEventListener) {
this.mqttClientConfig = mqttClientConfig;
this.channelEventListener = channelEventListener;
this.eventLoopGroupWorker = new NioEventLoopGroup(nettyClientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads",
nettyClientConfig.getClientWorkerThreads()));
this.eventLoopGroupWorker = new NioEventLoopGroup(mqttClientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads",
mqttClientConfig.getClientWorkerThreads()));
this.publicExecutor = ThreadUtils.newFixedThreadPool(
nettyClientConfig.getClientCallbackExecutorThreads(),
10000, "Remoting-PublicExecutor", true);
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),
ThreadUtils.newGenericThreadFactory("NettyClientWorkerThreads", nettyClientConfig.getClientWorkerThreads()));
if (nettyClientConfig.isUseTLS()) {
mqttClientConfig.getClientCallbackExecutorThreads(),
10000, "MqttRemoting-PublicExecutor", true);
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(mqttClientConfig.getClientWorkerThreads(),
ThreadUtils.newGenericThreadFactory("MqttNettyClientWorkerThreads", mqttClientConfig.getClientWorkerThreads()));
if (mqttClientConfig.isUseTLS()) {
try {
sslContext = TlsHelper.buildSslContext(true);
log.info("SSL enabled for client");
log.info("SSL enabled for mqtt client");
} catch (IOException e) {
log.error("Failed to create SSLContext", e);
} catch (CertificateException e) {
......@@ -123,14 +128,14 @@ public class MqttRemotingClient extends NettyRemotingClientAbstract implements R
bootstrap = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, mqttClientConfig.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, mqttClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, mqttClientConfig.getClientSocketRcvBufSize())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (nettyClientConfig.isUseTLS()) {
if (mqttClientConfig.isUseTLS()) {
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
log.info("Prepend SSL handler");
......@@ -142,7 +147,7 @@ public class MqttRemotingClient extends NettyRemotingClientAbstract implements R
defaultEventExecutorGroup,
MqttEncoder.INSTANCE,
new MqttDecoder(),
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
new IdleStateHandler(0, 0, mqttClientConfig.getClientChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyClientHandler());
}
......@@ -199,7 +204,7 @@ public class MqttRemotingClient extends NettyRemotingClientAbstract implements R
throw (RemotingSendRequestException) remotingException;
}
if (remotingException instanceof RemotingTimeoutException) {
if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
if (mqttClientConfig.isClientCloseSocketIfTimeout()) {
this.closeRemotingChannel(addr, remotingChannel);
log.warn("InvokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
}
......
......@@ -71,7 +71,7 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R
private ServerBootstrap serverBootstrap;
private EventLoopGroup eventLoopGroupSelector;
private EventLoopGroup eventLoopGroupBoss;
private ServerConfig nettyServerConfig;
private ServerConfig mqttServerConfig;
private ExecutorService publicExecutor;
private ChannelEventListener channelEventListener;
......@@ -89,32 +89,32 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R
super();
}
public MqttRemotingServer(final ServerConfig nettyServerConfig) {
this(nettyServerConfig, null);
public MqttRemotingServer(final ServerConfig mqttServerConfig) {
this(mqttServerConfig, null);
}
public MqttRemotingServer(final ServerConfig nettyServerConfig,
public MqttRemotingServer(final ServerConfig mqttServerConfig,
final ChannelEventListener channelEventListener) {
init(nettyServerConfig, channelEventListener);
init(mqttServerConfig, channelEventListener);
}
@Override
public RemotingServer init(ServerConfig serverConfig,
ChannelEventListener channelEventListener) {
this.nettyServerConfig = serverConfig;
super.init(nettyServerConfig.getServerOnewaySemaphoreValue(),
nettyServerConfig.getServerAsyncSemaphoreValue());
this.mqttServerConfig = serverConfig;
super.init(mqttServerConfig.getServerOnewaySemaphoreValue(),
mqttServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap();
this.channelEventListener = channelEventListener;
int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
int publicThreadNums = mqttServerConfig.getServerCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
this.publicExecutor = ThreadUtils.newFixedThreadPool(
publicThreadNums,
10000, "MqttRemoting-PublicExecutor", true);
if (JvmUtils.isUseEpoll() && this.nettyServerConfig.isUseEpollNativeSelector()) {
if (JvmUtils.isUseEpoll() && this.mqttServerConfig.isUseEpollNativeSelector()) {
this.eventLoopGroupSelector = new EpollEventLoopGroup(
serverConfig.getServerSelectorThreads(),
ThreadUtils.newGenericThreadFactory("MqttNettyEpollIoThreads",
......@@ -134,7 +134,7 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R
serverConfig.getServerSelectorThreads()));
this.socketChannelClass = NioServerSocketChannel.class;
}
this.port = nettyServerConfig.getMqttListenPort();
this.port = mqttServerConfig.getListenPort();
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
serverConfig.getServerWorkerThreads(),
ThreadUtils.newGenericThreadFactory("MqttNettyWorkerThreads",
......@@ -169,9 +169,9 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF,
nettyServerConfig.getServerSocketSndBufSize())
mqttServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF,
nettyServerConfig.getServerSocketRcvBufSize())
mqttServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
......@@ -184,11 +184,11 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R
MqttEncoder.INSTANCE,
new MqttMessage2RemotingCommandHandler(),
new RemotingCommand2MqttMessageHandler(),
new IdleStateHandler(nettyServerConfig
new IdleStateHandler(mqttServerConfig
.getConnectionChannelReaderIdleSeconds(),
nettyServerConfig
mqttServerConfig
.getConnectionChannelWriterIdleSeconds(),
nettyServerConfig
mqttServerConfig
.getServerChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyServerHandler()
......@@ -197,7 +197,7 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R
}
});
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
if (mqttServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
......
......@@ -45,5 +45,4 @@ public class EncodeDecodeDispatcher {
public static Map<MqttMessageType, Message2MessageEncodeDecode> getEncodeDecodeDispatcher() {
return encodeDecodeDispatcher;
}
}
......@@ -23,6 +23,7 @@ import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil;
public class MqttConnectEncodeDecode implements Message2MessageEncodeDecode {
......@@ -53,7 +54,7 @@ public class MqttConnectEncodeDecode implements Message2MessageEncodeDecode {
requestCommand = RemotingCommand
.createRequestCommand(1000, mqttHeader);
requestCommand.setPayload(((MqttConnectMessage) mqttMessage).payload());
requestCommand.setBody(MqttEncodeDecodeUtil.encode(((MqttConnectMessage) mqttMessage).payload()));
return requestCommand;
}
......
......@@ -18,6 +18,7 @@
package org.apache.rocketmq.remoting.transport.mqtt.dispatcher;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
......@@ -48,9 +49,12 @@ public class MqttPublishEncodeDecode implements Message2MessageEncodeDecode {
requestCommand = RemotingCommand
.createRequestCommand(1000, mqttHeader);
//invoke copy to generate a new ByteBuf or increase refCnt by 1 by invoking retain() method, because release method is invoked in Message2MessageEncodeDecode.channelRead
requestCommand.setPayload(((MqttPublishMessage) mqttMessage).payload().copy());
ByteBuf payload = ((MqttPublishMessage) mqttMessage).payload();
byte[] body = new byte[payload.readableBytes()];
payload.readBytes(body);
requestCommand.setBody(body);
return requestCommand;
}
@Override
......@@ -60,6 +64,6 @@ public class MqttPublishEncodeDecode implements Message2MessageEncodeDecode {
new MqttFixedHeader(MqttMessageType.PUBLISH, mqttHeader.isDup(),
MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(),
mqttHeader.getRemainingLength()),
new MqttPublishVariableHeader(mqttHeader.getTopicName(), mqttHeader.getPacketId()), (ByteBuf) remotingCommand.getPayload());
new MqttPublishVariableHeader(mqttHeader.getTopicName(), mqttHeader.getPacketId()), Unpooled.copiedBuffer(remotingCommand.getBody()));
}
}
......@@ -28,6 +28,7 @@ import java.io.UnsupportedEncodingException;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil;
public class MqttSubackEncodeDecode implements Message2MessageEncodeDecode {
......@@ -43,6 +44,6 @@ public class MqttSubackEncodeDecode implements Message2MessageEncodeDecode {
new MqttFixedHeader(MqttMessageType.SUBACK, mqttHeader.isDup(),
MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(),
mqttHeader.getRemainingLength()),
MqttMessageIdVariableHeader.from(mqttHeader.getMessageId()), (MqttSubAckPayload) remotingCommand.getPayload());
MqttMessageIdVariableHeader.from(mqttHeader.getMessageId()), (MqttSubAckPayload) MqttEncodeDecodeUtil.decode(remotingCommand.getBody(),MqttSubAckPayload.class));
}
}
......@@ -23,6 +23,7 @@ import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil;
public class MqttSubscribeEncodeDecode implements Message2MessageEncodeDecode {
......@@ -44,7 +45,7 @@ public class MqttSubscribeEncodeDecode implements Message2MessageEncodeDecode {
requestCommand = RemotingCommand
.createRequestCommand(1000, mqttHeader);
requestCommand.setPayload(((MqttSubscribeMessage) mqttMessage).payload());
requestCommand.setBody(MqttEncodeDecodeUtil.encode(((MqttSubscribeMessage) mqttMessage).payload()));
return requestCommand;
}
......
......@@ -23,6 +23,7 @@ import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil;
public class MqttUnSubscribeEncodeDecode implements Message2MessageEncodeDecode {
......@@ -42,7 +43,7 @@ public class MqttUnSubscribeEncodeDecode implements Message2MessageEncodeDecode
mqttHeader.setMessageId(variableHeader.messageId());
requestCommand = RemotingCommand.createRequestCommand(1000, mqttHeader);
requestCommand.setPayload(((MqttUnsubscribeMessage) mqttMessage).payload());
requestCommand.setBody(MqttEncodeDecodeUtil.encode(((MqttUnsubscribeMessage) mqttMessage).payload()));
return requestCommand;
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.util;
import com.google.gson.Gson;
import java.nio.charset.Charset;
public class MqttEncodeDecodeUtil {
private static final Gson GSON = new Gson();
public static byte[] encode(Object object) {
final String json = GSON.toJson(object);
if (json != null) {
return json.getBytes(Charset.forName("UTF-8"));
}
return null;
}
public static <T> Object decode(byte[] body, Class<T> classOfT) {
final String json = new String(body, Charset.forName("UTF-8"));
return GSON.fromJson(json, classOfT);
}
}
......@@ -24,6 +24,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.broker.BrokerStartup;
import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
......@@ -85,8 +86,11 @@ public class SnodeController {
.getLogger(LoggerName.SNODE_LOGGER_NAME);
private final SnodeConfig snodeConfig;
private final MqttConfig mqttConfig;
private final ServerConfig nettyServerConfig;
private final ClientConfig nettyClientConfig;
private final ServerConfig mqttServerConfig;
private final ClientConfig mqttClientConfig;
private RemotingClient remotingClient;
private RemotingServer snodeServer;
private RemotingClient mqttRemotingClient;
......@@ -125,12 +129,13 @@ public class SnodeController {
.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"SnodeControllerScheduledThread"));
public SnodeController(ServerConfig nettyServerConfig,
ClientConfig nettyClientConfig,
SnodeConfig snodeConfig) {
this.nettyClientConfig = nettyClientConfig;
this.nettyServerConfig = nettyServerConfig;
public SnodeController(SnodeConfig snodeConfig, MqttConfig mqttConfig) {
this.nettyClientConfig = snodeConfig.getNettyClientConfig();
this.nettyServerConfig = snodeConfig.getNettyServerConfig();
this.mqttServerConfig = mqttConfig.getMqttServerConfig();
this.mqttClientConfig = mqttConfig.getMqttClientConfig();
this.snodeConfig = snodeConfig;
this.mqttConfig = mqttConfig;
if (!this.snodeConfig.isEmbeddedModeEnable()) {
this.enodeService = new RemoteEnodeServiceImpl(this);
} else {
......@@ -144,8 +149,9 @@ public class SnodeController {
}
this.mqttRemotingClient = RemotingClientFactory.getInstance()
.createRemotingClient(RemotingUtil.MQTT_PROTOCOL);
if (this.mqttRemotingClient != null) {
this.mqttRemotingClient.init(this.getNettyClientConfig(), null);
this.mqttRemotingClient.init(this.mqttClientConfig, null);
}
this.sendMessageExecutor = ThreadUtils.newThreadPoolExecutor(
......@@ -185,12 +191,12 @@ public class SnodeController {
false);
this.handleMqttMessageExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeHandleMqttMessageMinPoolSize(),
snodeConfig.getSnodeHandleMqttMessageMaxPoolSize(),
mqttConfig.getHandleMqttMessageMinPoolSize(),
mqttConfig.getHandleMqttMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeHandleMqttThreadPoolQueueCapacity()),
"SnodeHandleMqttMessageThread",
new ArrayBlockingQueue<>(mqttConfig.getHandleMqttThreadPoolQueueCapacity()),
"handleMqttMessageThread",
false);
if (this.snodeConfig.getNamesrvAddr() != null) {
......@@ -224,6 +230,10 @@ public class SnodeController {
return snodeConfig;
}
public MqttConfig getMqttConfig() {
return mqttConfig;
}
private void initRemotingServerInterceptorGroup() {
List<Interceptor> remotingServerInterceptors = InterceptorFactory.getInstance()
.loadInterceptors(this.snodeConfig.getRemotingServerInterceptorPath());
......@@ -251,7 +261,7 @@ public class SnodeController {
this.mqttRemotingServer = RemotingServerFactory.getInstance().createRemotingServer(
RemotingUtil.MQTT_PROTOCOL);
if (this.mqttRemotingServer != null) {
this.mqttRemotingServer.init(this.nettyServerConfig, this.clientHousekeepingService);
this.mqttRemotingServer.init(this.mqttServerConfig, this.clientHousekeepingService);
this.mqttRemotingServer.registerInterceptorGroup(this.remotingServerInterceptorGroup);
}
registerProcessor();
......@@ -350,7 +360,6 @@ public class SnodeController {
}
public void start() {
initialize();
if (snodeServer != null) {
this.snodeServer.start();
}
......
......@@ -20,7 +20,9 @@ import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
......@@ -32,6 +34,7 @@ import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerStartup;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
......@@ -48,19 +51,22 @@ import org.slf4j.LoggerFactory;
import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_ENABLE;
public class SnodeStartup {
private static InternalLogger log;
private static InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
public static Properties properties = null;
public static CommandLine commandLine = null;
public static String configFile = null;
private static final String DEFAULT_MQTT_CONFIG_FILE = "/conf/mqtt.properties";
private static String mqttConfigFileName = System.getProperty("rocketmq.mqtt.config", DEFAULT_MQTT_CONFIG_FILE);
public static void main(String[] args) throws IOException, JoranException {
SnodeConfig snodeConfig = loadConfig(args);
MqttConfig mqttConfig = loadMqttConfig(snodeConfig);
if (snodeConfig.isEmbeddedModeEnable()) {
BrokerController brokerController = BrokerStartup.createBrokerController(args);
BrokerStartup.start(brokerController);
snodeConfig.setSnodeName(brokerController.getBrokerConfig().getBrokerName());
}
SnodeController snodeController = createSnodeController(snodeConfig);
SnodeController snodeController = createSnodeController(snodeConfig, mqttConfig);
startup(snodeController);
}
......@@ -94,7 +100,6 @@ public class SnodeStartup {
SnodeConfig snodeConfig = new SnodeConfig();
final ServerConfig nettyServerConfig = new ServerConfig();
final ClientConfig nettyClientConfig = new ClientConfig();
nettyServerConfig.setListenPort(snodeConfig.getListenPort());
nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
......@@ -142,12 +147,25 @@ public class SnodeStartup {
return snodeConfig;
}
public static SnodeController createSnodeController(SnodeConfig snodeConfig) throws JoranException {
public static MqttConfig loadMqttConfig(SnodeConfig snodeConfig) throws IOException {
MqttConfig mqttConfig = new MqttConfig();
final ServerConfig mqttServerConfig = new ServerConfig();
final ClientConfig mqttClientConfig = new ClientConfig();
mqttServerConfig.setListenPort(mqttConfig.getListenPort());
String file = snodeConfig.getRocketmqHome() + File.separator + mqttConfigFileName;
loadMqttProperties(file, mqttServerConfig, mqttClientConfig);
mqttConfig.setMqttServerConfig(mqttServerConfig);
mqttConfig.setMqttClientConfig(mqttClientConfig);
MixAll.printObjectProperties(log, mqttConfig);
MixAll.printObjectProperties(log, mqttConfig.getMqttServerConfig());
MixAll.printObjectProperties(log, mqttConfig.getMqttClientConfig());
return mqttConfig;
}
public static SnodeController createSnodeController(SnodeConfig snodeConfig, MqttConfig mqttConfig) throws JoranException {
final SnodeController snodeController = new SnodeController(
snodeConfig.getNettyServerConfig(),
snodeConfig.getNettyClientConfig(),
snodeConfig);
final SnodeController snodeController = new SnodeController(snodeConfig, mqttConfig);
boolean initResult = snodeController.initialize();
if (!initResult) {
......@@ -199,5 +217,21 @@ public class SnodeStartup {
return options;
}
private static void loadMqttProperties(String file, ServerConfig mqttServerConfig,
ClientConfig mqttClientConfig) throws IOException {
InputStream in;
try {
in = new BufferedInputStream(new FileInputStream(file));
Properties properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, mqttServerConfig);
MixAll.properties2Object(properties, mqttClientConfig);
in.close();
} catch (FileNotFoundException e) {
log.info("The mqtt config file is not found. filePath={}", file);
}
}
}
......@@ -17,7 +17,7 @@
package org.apache.rocketmq.snode.processor;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
......@@ -41,6 +41,7 @@ import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.processor.mqtthandler.MessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler;
......@@ -98,16 +99,18 @@ public class DefaultMqttMessageProcessor implements RequestProcessor {
mqttHeader.isHasPassword(), mqttHeader.isWillRetain(),
mqttHeader.getWillQos(), mqttHeader.isWillFlag(),
mqttHeader.isCleanSession(), mqttHeader.getKeepAliveTimeSeconds());
MqttConnectPayload mqttConnectPayload = (MqttConnectPayload) message.getPayload();
// MqttConnectPayload mqttConnectPayload = (MqttConnectPayload) message.getPayload();
MqttConnectPayload mqttConnectPayload = (MqttConnectPayload) MqttEncodeDecodeUtil.decode(message.getBody(), MqttConnectPayload.class);
mqttMessage = new MqttConnectMessage(fixedHeader, mqttConnectVariableHeader, mqttConnectPayload);
break;
case PUBLISH:
MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(mqttHeader.getTopicName(), mqttHeader.getPacketId());
mqttMessage = new MqttPublishMessage(fixedHeader, mqttPublishVariableHeader, (ByteBuf) message.getPayload());
mqttMessage = new MqttPublishMessage(fixedHeader, mqttPublishVariableHeader, Unpooled.copiedBuffer(message.getBody()));
break;
case SUBSCRIBE:
MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(mqttHeader.getMessageId());
mqttMessage = new MqttSubscribeMessage(fixedHeader, mqttMessageIdVariableHeader, (MqttSubscribePayload) message.getPayload());
MqttSubscribePayload mqttSubscribePayload = (MqttSubscribePayload) MqttEncodeDecodeUtil.decode(message.getBody(), MqttSubscribePayload.class);
mqttMessage = new MqttSubscribeMessage(fixedHeader, mqttMessageIdVariableHeader, mqttSubscribePayload);
break;
case UNSUBSCRIBE:
case PINGREQ:
......
......@@ -38,6 +38,7 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.Client;
import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl;
......@@ -104,7 +105,7 @@ public class MqttSubscribeMessageHandler implements MessageHandler {
List<Integer> grantQoss = doSubscribe(client, payload.topicSubscriptions(), iotClientManager);
//Publish retained messages to subscribers.
MqttSubAckPayload mqttSubAckPayload = new MqttSubAckPayload(grantQoss);
command.setPayload(mqttSubAckPayload);
command.setBody(MqttEncodeDecodeUtil.encode(mqttSubAckPayload));
mqttHeader.setRemainingLength(0x02 + mqttSubAckPayload.grantedQoSLevels().size());
command.setRemark(null);
command.setCode(ResponseCode.SUCCESS);
......
......@@ -53,12 +53,12 @@ public class MqttPushServiceImpl {
public MqttPushServiceImpl(final SnodeController snodeController) {
this.snodeController = snodeController;
pushMqttMessageExecutorService = ThreadUtils.newThreadPoolExecutor(
this.snodeController.getSnodeConfig().getSnodePushMqttMessageMinPoolSize(),
this.snodeController.getSnodeConfig().getSnodePushMqttMessageMaxPoolSize(),
this.snodeController.getMqttConfig().getPushMqttMessageMinPoolSize(),
this.snodeController.getMqttConfig().getPushMqttMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(this.snodeController.getSnodeConfig().getSnodePushMqttMessageThreadPoolQueueCapacity()),
"SnodePushMqttMessageThread",
new ArrayBlockingQueue<>(this.snodeController.getMqttConfig().getPushMqttMessageThreadPoolQueueCapacity()),
"pushMqttMessageThread",
false);
}
......@@ -106,12 +106,14 @@ public class MqttPushServiceImpl {
if (client.getRemotingChannel() instanceof NettyChannelHandlerContextImpl) {
remotingChannel = new NettyChannelImpl(((NettyChannelHandlerContextImpl) client.getRemotingChannel()).getChannelHandlerContext().channel());
}
requestCommand.setPayload(message.copy());
byte[] body = new byte[message.readableBytes()];
message.readBytes(body);
requestCommand.setBody(body);
snodeController.getMqttRemotingServer().push(remotingChannel, requestCommand, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
}
} catch (Exception ex) {
log.warn("Exception was thrown when pushing MQTT message to topic: {}, exception={}", topic, ex.getMessage());
}finally {
} finally {
System.out.println("Release Bytebuf");
ReferenceCountUtil.release(message);
}
......@@ -136,7 +138,6 @@ public class MqttPushServiceImpl {
mqttHeader.setRemainingLength(4 + topic.getBytes().length + message.readableBytes());
RemotingCommand pushMessage = RemotingCommand.createRequestCommand(RequestCode.MQTT_MESSAGE, mqttHeader);
// pushMessage.setPayload(message);
return pushMessage;
}
......
......@@ -16,8 +16,8 @@
*/
package org.apache.rocketmq.snode;
import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig;
import org.junit.Test;
......@@ -29,10 +29,7 @@ public class SnodeControllerTest {
public void testSnodeRestart() {
ServerConfig serverConfig = new ServerConfig();
serverConfig.setListenPort(10912);
SnodeController snodeController = new SnodeController(
serverConfig,
new ClientConfig(),
new SnodeConfig());
SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig());
assertThat(snodeController.initialize());
snodeController.start();
snodeController.shutdown();
......@@ -44,10 +41,7 @@ public class SnodeControllerTest {
System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl.yml");
SnodeConfig snodeConfig = new SnodeConfig();
snodeConfig.setAclEnable(true);
SnodeController snodeController = new SnodeController(
new ServerConfig(),
new ClientConfig(),
snodeConfig);
SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig());
assertThat(snodeController.initialize());
snodeController.start();
snodeController.shutdown();
......
......@@ -20,14 +20,14 @@ import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.io.UnsupportedEncodingException;
import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil;
import org.apache.rocketmq.snode.SnodeController;
import org.junit.Before;
import org.junit.Test;
......@@ -41,7 +41,7 @@ public class DefaultMqttMessageProcessorTest {
private DefaultMqttMessageProcessor defaultMqttMessageProcessor;
@Spy
private SnodeController snodeController = new SnodeController(new ServerConfig(), new ClientConfig(), new SnodeConfig());
private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig());
@Mock
private RemotingChannel remotingChannel;
......@@ -88,7 +88,7 @@ public class DefaultMqttMessageProcessorTest {
MqttHeader mqttHeader = createMqttConnectMesssageHeader();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.MQTT_MESSAGE, mqttHeader);
MqttConnectPayload payload = new MqttConnectPayload("1234567", "testTopic", "willMessage".getBytes(), null, "1234567".getBytes());
request.setPayload(payload);
request.setBody(MqttEncodeDecodeUtil.encode(payload));
return request;
}
}
......@@ -22,10 +22,9 @@ import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler;
import org.junit.Test;
......@@ -42,12 +41,10 @@ public class MqttConnectMessageHandlerTest {
@Test
public void testHandlerMessage() throws Exception {
MqttConnectMessageHandler mqttConnectMessageHandler = new MqttConnectMessageHandler(
new SnodeController(new ServerConfig(), new ClientConfig(), new SnodeConfig()));
MqttConnectMessageHandler mqttConnectMessageHandler = new MqttConnectMessageHandler(new SnodeController(new SnodeConfig(), new MqttConfig()));
MqttConnectPayload payload = new MqttConnectPayload("1234567", "testTopic", "willMessage".getBytes(), null, "1234567".getBytes());
MqttConnectMessage mqttConnectMessage = new MqttConnectMessage(new MqttFixedHeader(
MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 200),new MqttConnectVariableHeader(null,4,false,false,false,0,false,false,50),new MqttConnectPayload("abcd", "ttest", "message".getBytes(),"user","password".getBytes()));
MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 200), new MqttConnectVariableHeader(null, 4, false, false, false, 0, false, false, 50), new MqttConnectPayload("abcd", "ttest", "message".getBytes(), "user", "password".getBytes()));
mqttConnectMessageHandler.handleMessage(mqttConnectMessage, remotingChannel);
}
......
......@@ -20,11 +20,10 @@ import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.message.mqtt.WillMessage;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.Client;
import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl;
......@@ -43,8 +42,7 @@ public class MqttDisconnectMessageHandlerTest {
@Test
public void testHandlerMessage() throws Exception {
SnodeController snodeController = new SnodeController(new ServerConfig(),
new ClientConfig(), new SnodeConfig());
SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig());
MqttDisconnectMessageHandler mqttDisconnectMessageHandler = new MqttDisconnectMessageHandler(
snodeController);
Client client = new Client();
......
......@@ -18,13 +18,12 @@ package org.apache.rocketmq.snode.processor;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
......@@ -48,7 +47,7 @@ public class SendMessageProcessorTest {
private SendMessageProcessor sendMessageProcessor;
@Spy
private SnodeController snodeController = new SnodeController(new ServerConfig(), new ClientConfig(), new SnodeConfig());
private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig());
@Mock
private RemotingChannel remotingChannel;
......@@ -56,7 +55,7 @@ public class SendMessageProcessorTest {
private String topic = "snodeTopic";
private String group = "snodeGroup";
@Mock
private EnodeService enodeService;
......
......@@ -18,9 +18,8 @@ package org.apache.rocketmq.snode.service;
import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
......@@ -46,7 +45,7 @@ import static org.mockito.Mockito.when;
public class NnodeServiceImplTest extends SnodeTestBase {
@Spy
private SnodeController snodeController = new SnodeController(new ServerConfig(), new ClientConfig(), new SnodeConfig());
private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig());
@Mock
private NettyRemotingClient remotingClient;
......
......@@ -18,13 +18,12 @@ package org.apache.rocketmq.snode.service;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient;
......@@ -56,7 +55,7 @@ public class RemoteEnodeServiceImplTest extends SnodeTestBase {
private EnodeService enodeService;
@Spy
private SnodeController snodeController = new SnodeController(new ServerConfig(), new ClientConfig(), new SnodeConfig());
private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig());
@Mock
private NnodeService nnodeService;
......
......@@ -16,9 +16,8 @@
*/
package org.apache.rocketmq.snode.service;
import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.SlowConsumerService;
import org.apache.rocketmq.snode.client.impl.SlowConsumerServiceImpl;
......@@ -38,7 +37,7 @@ import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class SlowConsumerServiceImplTest {
@Spy
private SnodeController snodeController = new SnodeController(new ServerConfig(), new ClientConfig(), new SnodeConfig());
private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig());
private final String enodeName = "testEndoe";
......
......@@ -16,10 +16,9 @@
*/
package org.apache.rocketmq.snode.service;
import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.message.mqtt.WillMessage;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.SnodeTestBase;
import org.apache.rocketmq.snode.service.impl.WillMessageServiceImpl;
......@@ -33,8 +32,7 @@ import org.mockito.junit.MockitoJUnitRunner;
public class WillMessageServiceImplTest extends SnodeTestBase {
@Spy
private SnodeController snodeController = new SnodeController(new ServerConfig(),
new ClientConfig(), new SnodeConfig());
private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig());
private WillMessageService willMessageService;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册