未验证 提交 a1f4358e 编写于 作者: H Heng Du 提交者: GitHub

Merge pull request #842 from xiangwangcheng/snode

[RIP-11]MQTT:completing  pub and sub logic of qos=0 messages(single node only )
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common;
import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig;
public class MqttConfig {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private ServerConfig mqttServerConfig;
private ClientConfig mqttClientConfig;
private int handleMqttThreadPoolQueueCapacity = 10000;
private int handleMqttMessageMinPoolSize = 10;
private int handleMqttMessageMaxPoolSize = 20;
private int pushMqttMessageMinPoolSize = 10;
private int pushMqttMessageMaxPoolSize = 20;
private int pushMqttMessageThreadPoolQueueCapacity = 10000;
private int listenPort = 1883;
/**
* Acl feature switch
*/
@ImportantField
private boolean aclEnable = false;
public int getListenPort() {
return listenPort;
}
public void setListenPort(int listenPort) {
this.listenPort = listenPort;
}
public boolean isAclEnable() {
return aclEnable;
}
public void setAclEnable(boolean aclEnable) {
this.aclEnable = aclEnable;
}
public ServerConfig getMqttServerConfig() {
return mqttServerConfig;
}
public void setMqttServerConfig(ServerConfig mqttServerConfig) {
this.mqttServerConfig = mqttServerConfig;
}
public ClientConfig getMqttClientConfig() {
return mqttClientConfig;
}
public void setMqttClientConfig(ClientConfig mqttClientConfig) {
this.mqttClientConfig = mqttClientConfig;
}
public int getHandleMqttThreadPoolQueueCapacity() {
return handleMqttThreadPoolQueueCapacity;
}
public void setHandleMqttThreadPoolQueueCapacity(int handleMqttThreadPoolQueueCapacity) {
this.handleMqttThreadPoolQueueCapacity = handleMqttThreadPoolQueueCapacity;
}
public int getHandleMqttMessageMinPoolSize() {
return handleMqttMessageMinPoolSize;
}
public void setHandleMqttMessageMinPoolSize(int handleMqttMessageMinPoolSize) {
this.handleMqttMessageMinPoolSize = handleMqttMessageMinPoolSize;
}
public int getHandleMqttMessageMaxPoolSize() {
return handleMqttMessageMaxPoolSize;
}
public void setHandleMqttMessageMaxPoolSize(int handleMqttMessageMaxPoolSize) {
this.handleMqttMessageMaxPoolSize = handleMqttMessageMaxPoolSize;
}
public int getPushMqttMessageMinPoolSize() {
return pushMqttMessageMinPoolSize;
}
public void setPushMqttMessageMinPoolSize(int pushMqttMessageMinPoolSize) {
this.pushMqttMessageMinPoolSize = pushMqttMessageMinPoolSize;
}
public int getPushMqttMessageMaxPoolSize() {
return pushMqttMessageMaxPoolSize;
}
public void setPushMqttMessageMaxPoolSize(int pushMqttMessageMaxPoolSize) {
this.pushMqttMessageMaxPoolSize = pushMqttMessageMaxPoolSize;
}
public int getPushMqttMessageThreadPoolQueueCapacity() {
return pushMqttMessageThreadPoolQueueCapacity;
}
public void setPushMqttMessageThreadPoolQueueCapacity(int pushMqttMessageThreadPoolQueueCapacity) {
this.pushMqttMessageThreadPoolQueueCapacity = pushMqttMessageThreadPoolQueueCapacity;
}
}
......@@ -56,16 +56,10 @@ public class SnodeConfig {
private int snodeSendThreadPoolQueueCapacity = 10000;
private int snodeHandleMqttThreadPoolQueueCapacity = 10000;
private int snodeSendMessageMinPoolSize = 10;
private int snodeSendMessageMaxPoolSize = 20;
private int snodeHandleMqttMessageMinPoolSize = 10;
private int snodeHandleMqttMessageMaxPoolSize = 20;
private int snodeHeartBeatCorePoolSize = 1;
private int snodeHeartBeatMaxPoolSize = 2;
......@@ -230,14 +224,6 @@ public class SnodeConfig {
this.snodeSendThreadPoolQueueCapacity = snodeSendThreadPoolQueueCapacity;
}
public int getSnodeHandleMqttThreadPoolQueueCapacity() {
return snodeHandleMqttThreadPoolQueueCapacity;
}
public void setSnodeHandleMqttThreadPoolQueueCapacity(
int snodeHandleMqttThreadPoolQueueCapacity) {
this.snodeHandleMqttThreadPoolQueueCapacity = snodeHandleMqttThreadPoolQueueCapacity;
}
public int getSnodeSendMessageMinPoolSize() {
return snodeSendMessageMinPoolSize;
......@@ -279,22 +265,6 @@ public class SnodeConfig {
this.snodeId = snodeId;
}
public int getSnodeHandleMqttMessageMinPoolSize() {
return snodeHandleMqttMessageMinPoolSize;
}
public void setSnodeHandleMqttMessageMinPoolSize(int snodeHandleMqttMessageMinPoolSize) {
this.snodeHandleMqttMessageMinPoolSize = snodeHandleMqttMessageMinPoolSize;
}
public int getSnodeHandleMqttMessageMaxPoolSize() {
return snodeHandleMqttMessageMaxPoolSize;
}
public void setSnodeHandleMqttMessageMaxPoolSize(int snodeHandleMqttMessageMaxPoolSize) {
this.snodeHandleMqttMessageMaxPoolSize = snodeHandleMqttMessageMaxPoolSize;
}
public String getSnodeName() {
return snodeName;
}
......
......@@ -68,8 +68,6 @@ public class MqttSubscriptionData extends SubscriptionData {
if (getClass() != obj.getClass())
return false;
MqttSubscriptionData other = (MqttSubscriptionData) obj;
if (qos != other.qos)
return false;
if (clientId != other.clientId) {
return false;
}
......
......@@ -62,5 +62,9 @@
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_hotspot</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
</dependency>
</dependencies>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.mqtt;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MqttSampleConsumer {
private static Logger log = LoggerFactory.getLogger(MqttSampleConsumer.class);
public static void main(String[] args) throws InterruptedException {
String topic = "mqtt-sample";
int qos = 0;
String broker = "tcp://127.0.0.1:1883";
String clinetId = "JavaSampleConsumer";
MemoryPersistence persistence = new MemoryPersistence();
{
try {
MqttClient sampleClient = new MqttClient(broker, clinetId, persistence);
MqttConnectOptions connectOptions = new MqttConnectOptions();
connectOptions.setCleanSession(true);
connectOptions.setKeepAliveInterval(6000);
log.info("Connecting to broker: " + broker);
sampleClient.connect(connectOptions);
log.info("Connected");
sampleClient.setCallback(new MqttCallback() {
@Override public void connectionLost(Throwable throwable) {
System.out.println("connection lost." + throwable.getLocalizedMessage());
}
@Override public void messageArrived(String s, MqttMessage message) throws Exception {
System.out.println(message.toString());
// System.exit(0);
}
@Override public void deliveryComplete(IMqttDeliveryToken token) {
try {
System.out.println("delivery complete." + token.getMessage().toString());
} catch (MqttException e) {
e.printStackTrace();
}
}
});
log.info("Subscribing topic: " + topic);
sampleClient.subscribe(topic, qos);
log.info("Subsrcribe success.");
Thread.sleep(100000000);
} catch (MqttException me) {
log.error("reason " + me.getReasonCode());
log.error("msg " + me.getMessage());
log.error("loc " + me.getLocalizedMessage());
log.error("cause " + me.getCause());
log.error("excep " + me);
me.printStackTrace();
me.printStackTrace();
System.exit(1);
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.mqtt;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MqttSampleProducer {
private static Logger log = LoggerFactory.getLogger(MqttSampleProducer.class);
public static void main(String[] args) throws InterruptedException {
String topic = "mqtt-sample";
String messageContent = "hello mqtt";
int qos = 0;
String broker = "tcp://127.0.0.1:1883";
String clientId = "JavaSampleProducer";
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setKeepAliveInterval(6000);
log.info("Connecting to broker: " + broker);
sampleClient.connect(connOpts);
log.info("Connected");
log.info("Publishing message: " + messageContent);
MqttMessage message = new MqttMessage(messageContent.getBytes());
message.setQos(qos);
message.setRetained(true);
sampleClient.publish(topic, message);
log.info("Message published");
/*sampleClient.disconnect();
log.info("Disconnected");*/
Thread.sleep(10000000);
} catch (MqttException me) {
log.error("reason " + me.getReasonCode());
log.error("msg " + me.getMessage());
log.error("loc " + me.getLocalizedMessage());
log.error("cause " + me.getCause());
log.error("excep " + me);
me.printStackTrace();
System.exit(1);
}
}
}
......@@ -648,6 +648,16 @@
<artifactId>simpleclient_hotspot</artifactId>
<version>0.6.0</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>LATEST</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>
......@@ -15,7 +15,8 @@
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
......@@ -38,8 +39,8 @@
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
......@@ -62,5 +63,9 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
</dependencies>
</project>
......@@ -20,7 +20,6 @@ import org.apache.rocketmq.remoting.netty.NettySystemConfig;
public class ServerConfig implements Cloneable {
private int listenPort = 8888;
private int mqttListenPort = 1883;
private int serverWorkerThreads = 8;
private int serverCallbackExecutorThreads = 8;
private int serverSelectorThreads = 3;
......@@ -76,14 +75,6 @@ public class ServerConfig implements Cloneable {
this.listenPort = listenPort;
}
public int getMqttListenPort() {
return mqttListenPort;
}
public void setMqttListenPort(int mqttListenPort) {
this.mqttListenPort = mqttListenPort;
}
public int getServerWorkerThreads() {
return serverWorkerThreads;
}
......
......@@ -42,7 +42,7 @@ public class MqttMessage2RemotingCommandHandler extends MessageToMessageDecoder<
if (!(msg instanceof MqttMessage)) {
return;
}
RemotingCommand requestCommand = null;
RemotingCommand requestCommand;
Message2MessageEncodeDecode message2MessageEncodeDecode = EncodeDecodeDispatcher
.getEncodeDecodeDispatcher().get(msg.fixedHeader().messageType());
if (message2MessageEncodeDecode == null) {
......
......@@ -60,7 +60,7 @@ import org.apache.rocketmq.remoting.util.ThreadUtils;
public class MqttRemotingClient extends NettyRemotingClientAbstract implements RemotingClient {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private ClientConfig nettyClientConfig;
private ClientConfig mqttClientConfig;
private Bootstrap bootstrap = new Bootstrap();
private EventLoopGroup eventLoopGroupWorker;
......@@ -76,33 +76,38 @@ public class MqttRemotingClient extends NettyRemotingClientAbstract implements R
public MqttRemotingClient() {
super();
loadProperties();
}
public MqttRemotingClient(final ClientConfig nettyClientConfig) {
this(nettyClientConfig, null);
private void loadProperties() {
}
public MqttRemotingClient(final ClientConfig mqttClientConfig) {
this(mqttClientConfig, null);
}
public MqttRemotingClient(final ClientConfig nettyClientConfig,
public MqttRemotingClient(final ClientConfig mqttClientConfig,
final ChannelEventListener channelEventListener) {
super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
init(nettyClientConfig, channelEventListener);
super(mqttClientConfig.getClientOnewaySemaphoreValue(), mqttClientConfig.getClientAsyncSemaphoreValue());
init(mqttClientConfig, channelEventListener);
}
@Override
public RemotingClient init(ClientConfig nettyClientConfig, ChannelEventListener channelEventListener) {
this.nettyClientConfig = nettyClientConfig;
public RemotingClient init(ClientConfig mqttClientConfig, ChannelEventListener channelEventListener) {
this.mqttClientConfig = mqttClientConfig;
this.channelEventListener = channelEventListener;
this.eventLoopGroupWorker = new NioEventLoopGroup(nettyClientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads",
nettyClientConfig.getClientWorkerThreads()));
this.eventLoopGroupWorker = new NioEventLoopGroup(mqttClientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads",
mqttClientConfig.getClientWorkerThreads()));
this.publicExecutor = ThreadUtils.newFixedThreadPool(
nettyClientConfig.getClientCallbackExecutorThreads(),
10000, "Remoting-PublicExecutor", true);
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),
ThreadUtils.newGenericThreadFactory("NettyClientWorkerThreads", nettyClientConfig.getClientWorkerThreads()));
if (nettyClientConfig.isUseTLS()) {
mqttClientConfig.getClientCallbackExecutorThreads(),
10000, "MqttRemoting-PublicExecutor", true);
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(mqttClientConfig.getClientWorkerThreads(),
ThreadUtils.newGenericThreadFactory("MqttNettyClientWorkerThreads", mqttClientConfig.getClientWorkerThreads()));
if (mqttClientConfig.isUseTLS()) {
try {
sslContext = TlsHelper.buildSslContext(true);
log.info("SSL enabled for client");
log.info("SSL enabled for mqtt client");
} catch (IOException e) {
log.error("Failed to create SSLContext", e);
} catch (CertificateException e) {
......@@ -123,14 +128,14 @@ public class MqttRemotingClient extends NettyRemotingClientAbstract implements R
bootstrap = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, mqttClientConfig.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, mqttClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, mqttClientConfig.getClientSocketRcvBufSize())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (nettyClientConfig.isUseTLS()) {
if (mqttClientConfig.isUseTLS()) {
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
log.info("Prepend SSL handler");
......@@ -142,7 +147,7 @@ public class MqttRemotingClient extends NettyRemotingClientAbstract implements R
defaultEventExecutorGroup,
MqttEncoder.INSTANCE,
new MqttDecoder(),
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
new IdleStateHandler(0, 0, mqttClientConfig.getClientChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyClientHandler());
}
......@@ -199,7 +204,7 @@ public class MqttRemotingClient extends NettyRemotingClientAbstract implements R
throw (RemotingSendRequestException) remotingException;
}
if (remotingException instanceof RemotingTimeoutException) {
if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
if (mqttClientConfig.isClientCloseSocketIfTimeout()) {
this.closeRemotingChannel(addr, remotingChannel);
log.warn("InvokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
}
......
......@@ -71,7 +71,7 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R
private ServerBootstrap serverBootstrap;
private EventLoopGroup eventLoopGroupSelector;
private EventLoopGroup eventLoopGroupBoss;
private ServerConfig nettyServerConfig;
private ServerConfig mqttServerConfig;
private ExecutorService publicExecutor;
private ChannelEventListener channelEventListener;
......@@ -89,32 +89,32 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R
super();
}
public MqttRemotingServer(final ServerConfig nettyServerConfig) {
this(nettyServerConfig, null);
public MqttRemotingServer(final ServerConfig mqttServerConfig) {
this(mqttServerConfig, null);
}
public MqttRemotingServer(final ServerConfig nettyServerConfig,
public MqttRemotingServer(final ServerConfig mqttServerConfig,
final ChannelEventListener channelEventListener) {
init(nettyServerConfig, channelEventListener);
init(mqttServerConfig, channelEventListener);
}
@Override
public RemotingServer init(ServerConfig serverConfig,
ChannelEventListener channelEventListener) {
this.nettyServerConfig = serverConfig;
super.init(nettyServerConfig.getServerOnewaySemaphoreValue(),
nettyServerConfig.getServerAsyncSemaphoreValue());
this.mqttServerConfig = serverConfig;
super.init(mqttServerConfig.getServerOnewaySemaphoreValue(),
mqttServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap();
this.channelEventListener = channelEventListener;
int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
int publicThreadNums = mqttServerConfig.getServerCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
this.publicExecutor = ThreadUtils.newFixedThreadPool(
publicThreadNums,
10000, "MqttRemoting-PublicExecutor", true);
if (JvmUtils.isUseEpoll() && this.nettyServerConfig.isUseEpollNativeSelector()) {
if (JvmUtils.isUseEpoll() && this.mqttServerConfig.isUseEpollNativeSelector()) {
this.eventLoopGroupSelector = new EpollEventLoopGroup(
serverConfig.getServerSelectorThreads(),
ThreadUtils.newGenericThreadFactory("MqttNettyEpollIoThreads",
......@@ -134,7 +134,7 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R
serverConfig.getServerSelectorThreads()));
this.socketChannelClass = NioServerSocketChannel.class;
}
this.port = nettyServerConfig.getMqttListenPort();
this.port = mqttServerConfig.getListenPort();
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
serverConfig.getServerWorkerThreads(),
ThreadUtils.newGenericThreadFactory("MqttNettyWorkerThreads",
......@@ -169,9 +169,9 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF,
nettyServerConfig.getServerSocketSndBufSize())
mqttServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF,
nettyServerConfig.getServerSocketRcvBufSize())
mqttServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
......@@ -184,11 +184,11 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R
MqttEncoder.INSTANCE,
new MqttMessage2RemotingCommandHandler(),
new RemotingCommand2MqttMessageHandler(),
new IdleStateHandler(nettyServerConfig
new IdleStateHandler(mqttServerConfig
.getConnectionChannelReaderIdleSeconds(),
nettyServerConfig
mqttServerConfig
.getConnectionChannelWriterIdleSeconds(),
nettyServerConfig
mqttServerConfig
.getServerChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyServerHandler()
......@@ -197,7 +197,7 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R
}
});
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
if (mqttServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
......
......@@ -43,8 +43,8 @@ public class RemotingCommand2MqttMessageHandler extends MessageToMessageEncoder<
if (!(msg instanceof RemotingCommand)) {
return;
}
MqttMessage mqttMessage = null;
MqttHeader mqttHeader = (MqttHeader) msg.decodeCommandCustomHeader(MqttHeader.class);
MqttMessage mqttMessage;
MqttHeader mqttHeader = (MqttHeader) msg.readCustomHeader();
Message2MessageEncodeDecode message2MessageEncodeDecode = EncodeDecodeDispatcher
.getEncodeDecodeDispatcher().get(
MqttMessageType.valueOf(mqttHeader.getMessageType()));
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.transport.mqtt;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.util.internal.StringUtil;
import java.io.UnsupportedEncodingException;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
/**
* Payload of {@link MqttConnectMessage}
*/
public final class RocketMQMqttConnectPayload extends RemotingSerializable {
private String clientIdentifier;
private String willTopic;
private String willMessage;
private String userName;
private String password;
public RocketMQMqttConnectPayload(
String clientIdentifier,
String willTopic,
String willMessage,
String userName,
String password) {
this.clientIdentifier = clientIdentifier;
this.willTopic = willTopic;
this.willMessage = willMessage;
this.userName = userName;
this.password = password;
}
public static RocketMQMqttConnectPayload fromMqttConnectPayload(MqttConnectPayload payload) {
return new RocketMQMqttConnectPayload(payload.clientIdentifier(), payload.willTopic(),
payload.willMessage(), payload.userName(), payload.password());
}
public MqttConnectPayload toMqttConnectPayload() throws UnsupportedEncodingException {
return new MqttConnectPayload(this.clientIdentifier, this.willTopic, this.willMessage.getBytes(
RemotingUtil.REMOTING_CHARSET), this.userName, this.password.getBytes(RemotingUtil.REMOTING_CHARSET));
}
public String getClientIdentifier() {
return clientIdentifier;
}
public String getWillTopic() {
return willTopic;
}
public String getWillMessage() {
return willMessage;
}
public String getUserName() {
return userName;
}
public String getPassword() {
return password;
}
public void setClientIdentifier(String clientIdentifier) {
this.clientIdentifier = clientIdentifier;
}
public void setWillTopic(String willTopic) {
this.willTopic = willTopic;
}
public void setWillMessage(String willMessage) {
this.willMessage = willMessage;
}
public void setUserName(String userName) {
this.userName = userName;
}
public void setPassword(String password) {
this.password = password;
}
@Override
public String toString() {
return new StringBuilder(StringUtil.simpleClassName(this))
.append('[')
.append("clientIdentifier=").append(clientIdentifier)
.append(", willTopic=").append(willTopic)
.append(", willMessage=").append(willMessage)
.append(", userName=").append(userName)
.append(", password=").append(password)
.append(']')
.toString();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.transport.mqtt;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttSubscribePayload;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import io.netty.util.internal.StringUtil;
import java.io.UnsupportedEncodingException;
import java.util.Collections;
import java.util.List;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
/**
* Payload of {@link MqttSubscribeMessage}
*/
public final class RocketMQMqttSubscribePayload extends RemotingSerializable {
private List<MqttTopicSubscription> topicSubscriptions;
public RocketMQMqttSubscribePayload(List<MqttTopicSubscription> topicSubscriptions) {
this.topicSubscriptions = Collections.unmodifiableList(topicSubscriptions);
}
public List<MqttTopicSubscription> getTopicSubscriptions() {
return topicSubscriptions;
}
public void setTopicSubscriptions(
List<MqttTopicSubscription> topicSubscriptions) {
this.topicSubscriptions = topicSubscriptions;
}
public static RocketMQMqttSubscribePayload fromMqttSubscribePayload(MqttSubscribePayload payload) {
return new RocketMQMqttSubscribePayload(payload.topicSubscriptions());
}
public MqttSubscribePayload toMqttSubscribePayload() throws UnsupportedEncodingException {
return new MqttSubscribePayload(this.topicSubscriptions);
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder(StringUtil.simpleClassName(this)).append('[');
for (int i = 0; i < topicSubscriptions.size() - 1; i++) {
builder.append(topicSubscriptions.get(i)).append(", ");
}
builder.append(topicSubscriptions.get(topicSubscriptions.size() - 1));
builder.append(']');
return builder.toString();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.transport.mqtt;
import io.netty.handler.codec.mqtt.MqttUnsubscribePayload;
import io.netty.util.internal.StringUtil;
import java.io.UnsupportedEncodingException;
import java.util.Collections;
import java.util.List;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
/**
* Payload of the {@link io.netty.handler.codec.mqtt.MqttUnsubscribeMessage}
*/
public class RocketMQMqttUnSubscribePayload extends RemotingSerializable {
private List<String> topics;
public RocketMQMqttUnSubscribePayload(List<String> topics) {
this.topics = topics;
}
public List<String> getTopics() {
return topics;
}
public void setTopics(List<String> topics) {
this.topics = Collections.unmodifiableList(topics);
}
public static RocketMQMqttUnSubscribePayload fromMqttUnSubscribePayload(MqttUnsubscribePayload payload) {
return new RocketMQMqttUnSubscribePayload(payload.topics());
}
public MqttUnsubscribePayload toMqttUnsubscribePayload() throws UnsupportedEncodingException {
return new MqttUnsubscribePayload(this.topics);
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder(StringUtil.simpleClassName(this)).append('[');
for (int i = 0; i < topics.size() - 1; i++) {
builder.append("topicName = ").append(topics.get(i)).append(", ");
}
builder.append("topicName = ").append(topics.get(topics.size() - 1))
.append(']');
return builder.toString();
}
}
......@@ -29,15 +29,15 @@ public class EncodeDecodeDispatcher {
encodeDecodeDispatcher.put(MqttMessageType.CONNECT, new MqttConnectEncodeDecode());
encodeDecodeDispatcher.put(MqttMessageType.CONNACK, new MqttConnectackEncodeDecode());
encodeDecodeDispatcher.put(MqttMessageType.DISCONNECT, null);
encodeDecodeDispatcher.put(MqttMessageType.PUBLISH, null);
encodeDecodeDispatcher.put(MqttMessageType.PUBACK, null);
encodeDecodeDispatcher.put(MqttMessageType.PUBLISH, new MqttPublishEncodeDecode());
encodeDecodeDispatcher.put(MqttMessageType.PUBACK, new MqttPubackEncodeDecode());
encodeDecodeDispatcher.put(MqttMessageType.PUBREC, null);
encodeDecodeDispatcher.put(MqttMessageType.PUBREL, null);
encodeDecodeDispatcher.put(MqttMessageType.PUBCOMP, null);
encodeDecodeDispatcher.put(MqttMessageType.SUBSCRIBE, null);
encodeDecodeDispatcher.put(MqttMessageType.SUBACK, null);
encodeDecodeDispatcher.put(MqttMessageType.UNSUBSCRIBE, null);
encodeDecodeDispatcher.put(MqttMessageType.UNSUBACK, null);
encodeDecodeDispatcher.put(MqttMessageType.SUBSCRIBE, new MqttSubscribeEncodeDecode());
encodeDecodeDispatcher.put(MqttMessageType.SUBACK, new MqttSubackEncodeDecode());
encodeDecodeDispatcher.put(MqttMessageType.UNSUBSCRIBE, new MqttUnSubscribeEncodeDecode());
encodeDecodeDispatcher.put(MqttMessageType.UNSUBACK, new MqttUnSubackEncodeDecode());
encodeDecodeDispatcher.put(MqttMessageType.PINGREQ, null);
encodeDecodeDispatcher.put(MqttMessageType.PINGRESP, null);
}
......@@ -45,5 +45,4 @@ public class EncodeDecodeDispatcher {
public static Map<MqttMessageType, Message2MessageEncodeDecode> getEncodeDecodeDispatcher() {
return encodeDecodeDispatcher;
}
}
......@@ -18,6 +18,7 @@
package org.apache.rocketmq.remoting.transport.mqtt.dispatcher;
import io.netty.handler.codec.mqtt.MqttMessage;
import java.io.UnsupportedEncodingException;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
......@@ -25,5 +26,5 @@ public interface Message2MessageEncodeDecode {
RemotingCommand decode(MqttMessage mqttMessage);
MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException;
MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException, UnsupportedEncodingException;
}
......@@ -21,18 +21,15 @@ import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload;
import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil;
public class MqttConnectEncodeDecode implements Message2MessageEncodeDecode {
@Override
public RemotingCommand decode(MqttMessage mqttMessage) {
RocketMQMqttConnectPayload payload = RocketMQMqttConnectPayload
.fromMqttConnectPayload(((MqttConnectMessage) mqttMessage).payload());
RemotingCommand requestCommand = null;
RemotingCommand requestCommand;
MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
MqttConnectVariableHeader variableHeader = (MqttConnectVariableHeader) mqttMessage
.variableHeader();
......@@ -57,9 +54,7 @@ public class MqttConnectEncodeDecode implements Message2MessageEncodeDecode {
requestCommand = RemotingCommand
.createRequestCommand(1000, mqttHeader);
CodecHelper.makeCustomHeaderToNet(requestCommand);
requestCommand.setBody(payload.encode());
requestCommand.setBody(MqttEncodeDecodeUtil.encode(((MqttConnectMessage) mqttMessage).payload()));
return requestCommand;
}
......
......@@ -37,9 +37,7 @@ public class MqttConnectackEncodeDecode implements Message2MessageEncodeDecode {
@Override
public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException {
MqttHeader mqttHeader = (MqttHeader) remotingCommand
.decodeCommandCustomHeader(MqttHeader.class);
MqttHeader mqttHeader = (MqttHeader)remotingCommand.readCustomHeader();
return new MqttConnAckMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, mqttHeader.isDup(),
MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(),
......
......@@ -36,8 +36,7 @@ public class MqttPubackEncodeDecode implements Message2MessageEncodeDecode {
@Override
public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException {
MqttHeader mqttHeader = (MqttHeader) remotingCommand
.decodeCommandCustomHeader(MqttHeader.class);
MqttHeader mqttHeader = (MqttHeader) remotingCommand.readCustomHeader();
return new MqttPubAckMessage(
new MqttFixedHeader(MqttMessageType.PUBACK, mqttHeader.isDup(),
......
......@@ -18,11 +18,13 @@
package org.apache.rocketmq.remoting.transport.mqtt.dispatcher;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import org.apache.rocketmq.remoting.netty.CodecHelper;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
......@@ -30,13 +32,9 @@ public class MqttPublishEncodeDecode implements Message2MessageEncodeDecode {
@Override
public RemotingCommand decode(MqttMessage mqttMessage) {
ByteBuf byteBuf = ((MqttPublishMessage) mqttMessage).payload();
byte[] payload = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(payload);
RemotingCommand requestCommand = null;
RemotingCommand requestCommand;
MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
MqttConnAckVariableHeader variableHeader = (MqttConnAckVariableHeader) mqttMessage
MqttPublishVariableHeader variableHeader = (MqttPublishVariableHeader) mqttMessage
.variableHeader();
MqttHeader mqttHeader = new MqttHeader();
......@@ -46,19 +44,26 @@ public class MqttPublishEncodeDecode implements Message2MessageEncodeDecode {
mqttHeader.setRetain(mqttFixedHeader.isRetain());
mqttHeader.setRemainingLength(mqttFixedHeader.remainingLength());
mqttHeader.setConnectReturnCode(variableHeader.connectReturnCode().name());
mqttHeader.setSessionPresent(variableHeader.isSessionPresent());
mqttHeader.setTopicName(variableHeader.topicName());
mqttHeader.setPacketId(variableHeader.packetId());
requestCommand = RemotingCommand
.createRequestCommand(1000, mqttHeader);
CodecHelper.makeCustomHeaderToNet(requestCommand);
requestCommand.setBody(payload);
ByteBuf payload = ((MqttPublishMessage) mqttMessage).payload();
byte[] body = new byte[payload.readableBytes()];
payload.readBytes(body);
requestCommand.setBody(body);
return requestCommand;
}
@Override
public MqttMessage encode(RemotingCommand remotingCommand) {
return null;
MqttHeader mqttHeader = (MqttHeader) remotingCommand.readCustomHeader();
return new MqttPublishMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH, mqttHeader.isDup(),
MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(),
mqttHeader.getRemainingLength()),
new MqttPublishVariableHeader(mqttHeader.getTopicName(), mqttHeader.getPacketId()), Unpooled.copiedBuffer(remotingCommand.getBody()));
}
}
......@@ -24,9 +24,11 @@ import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import java.io.UnsupportedEncodingException;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil;
public class MqttSubackEncodeDecode implements Message2MessageEncodeDecode {
......@@ -36,14 +38,12 @@ public class MqttSubackEncodeDecode implements Message2MessageEncodeDecode {
}
@Override
public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException {
MqttHeader mqttHeader = (MqttHeader) remotingCommand
.decodeCommandCustomHeader(MqttHeader.class);
public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException, UnsupportedEncodingException {
MqttHeader mqttHeader = (MqttHeader) remotingCommand.readCustomHeader();
return new MqttSubAckMessage(
new MqttFixedHeader(MqttMessageType.SUBACK, mqttHeader.isDup(),
MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(),
mqttHeader.getRemainingLength()),
MqttMessageIdVariableHeader.from(mqttHeader.getMessageId()), new MqttSubAckPayload());
MqttMessageIdVariableHeader.from(mqttHeader.getMessageId()), (MqttSubAckPayload) MqttEncodeDecodeUtil.decode(remotingCommand.getBody(),MqttSubAckPayload.class));
}
}
......@@ -21,19 +21,15 @@ import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttSubscribePayload;
import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil;
public class MqttSubscribeEncodeDecode implements Message2MessageEncodeDecode {
@Override
public RemotingCommand decode(MqttMessage mqttMessage) {
RocketMQMqttSubscribePayload payload = RocketMQMqttSubscribePayload
.fromMqttSubscribePayload(((MqttSubscribeMessage) mqttMessage).payload());
RemotingCommand requestCommand = null;
RemotingCommand requestCommand;
MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) mqttMessage
.variableHeader();
......@@ -49,9 +45,7 @@ public class MqttSubscribeEncodeDecode implements Message2MessageEncodeDecode {
requestCommand = RemotingCommand
.createRequestCommand(1000, mqttHeader);
CodecHelper.makeCustomHeaderToNet(requestCommand);
requestCommand.setBody(payload.encode());
requestCommand.setBody(MqttEncodeDecodeUtil.encode(((MqttSubscribeMessage) mqttMessage).payload()));
return requestCommand;
}
......
......@@ -36,8 +36,7 @@ public class MqttUnSubackEncodeDecode implements Message2MessageEncodeDecode {
@Override
public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException {
MqttHeader mqttHeader = (MqttHeader) remotingCommand
.decodeCommandCustomHeader(MqttHeader.class);
MqttHeader mqttHeader = (MqttHeader) remotingCommand.readCustomHeader();
return new MqttUnsubAckMessage(
new MqttFixedHeader(MqttMessageType.UNSUBACK, mqttHeader.isDup(),
......
......@@ -21,19 +21,15 @@ import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttUnSubscribePayload;
import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil;
public class MqttUnSubscribeEncodeDecode implements Message2MessageEncodeDecode {
@Override
public RemotingCommand decode(MqttMessage mqttMessage) {
RocketMQMqttUnSubscribePayload payload = RocketMQMqttUnSubscribePayload
.fromMqttUnSubscribePayload(((MqttUnsubscribeMessage) mqttMessage).payload());
RemotingCommand requestCommand = null;
RemotingCommand requestCommand;
MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) mqttMessage
.variableHeader();
......@@ -44,14 +40,10 @@ public class MqttUnSubscribeEncodeDecode implements Message2MessageEncodeDecode
mqttHeader.setQosLevel(mqttFixedHeader.qosLevel().value());
mqttHeader.setRetain(mqttFixedHeader.isRetain());
mqttHeader.setRemainingLength(mqttFixedHeader.remainingLength());
mqttHeader.setMessageId(variableHeader.messageId());
requestCommand = RemotingCommand
.createRequestCommand(1000, mqttHeader);
CodecHelper.makeCustomHeaderToNet(requestCommand);
requestCommand.setBody(payload.encode());
requestCommand = RemotingCommand.createRequestCommand(1000, mqttHeader);
requestCommand.setBody(MqttEncodeDecodeUtil.encode(((MqttUnsubscribeMessage) mqttMessage).payload()));
return requestCommand;
}
......
......@@ -15,45 +15,24 @@
* limitations under the License.
*/
package org.apache.rocketmq.remoting.transport.mqtt;
package org.apache.rocketmq.remoting.util;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.util.internal.StringUtil;
import java.io.UnsupportedEncodingException;
import java.util.Collections;
import java.util.List;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
import com.google.gson.Gson;
import java.nio.charset.Charset;
/**
* Payload of {@link MqttSubAckMessage}
*/
public final class RocketMQMqttSubAckPayload extends RemotingSerializable {
private List<Integer> grantedQoSLevels;
public RocketMQMqttSubAckPayload(List<Integer> grantedQoSLevels) {
this.grantedQoSLevels = Collections.unmodifiableList(grantedQoSLevels);
}
public List<Integer> getGrantedQoSLevels() {
return grantedQoSLevels;
}
public void setGrantedQoSLevels(List<Integer> grantedQoSLevels) {
this.grantedQoSLevels = grantedQoSLevels;
}
public static RocketMQMqttSubAckPayload fromMqttSubAckPayload(MqttSubAckPayload payload) {
return new RocketMQMqttSubAckPayload(payload.grantedQoSLevels());
}
public class MqttEncodeDecodeUtil {
private static final Gson GSON = new Gson();
public MqttSubAckPayload toMqttSubAckPayload() throws UnsupportedEncodingException {
return new MqttSubAckPayload(this.grantedQoSLevels);
public static byte[] encode(Object object) {
final String json = GSON.toJson(object);
if (json != null) {
return json.getBytes(Charset.forName("UTF-8"));
}
return null;
}
@Override
public String toString() {
return StringUtil.simpleClassName(this) + '[' + "grantedQoSLevels=" + this.grantedQoSLevels + ']';
public static <T> Object decode(byte[] body, Class<T> classOfT) {
final String json = new String(body, Charset.forName("UTF-8"));
return GSON.fromJson(json, classOfT);
}
}
......@@ -24,6 +24,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.broker.BrokerStartup;
import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
......@@ -72,6 +73,7 @@ import org.apache.rocketmq.snode.service.WillMessageService;
import org.apache.rocketmq.snode.service.impl.ClientServiceImpl;
import org.apache.rocketmq.snode.service.impl.LocalEnodeServiceImpl;
import org.apache.rocketmq.snode.service.impl.MetricsServiceImpl;
import org.apache.rocketmq.snode.service.impl.MqttPushServiceImpl;
import org.apache.rocketmq.snode.service.impl.NnodeServiceImpl;
import org.apache.rocketmq.snode.service.impl.PushServiceImpl;
import org.apache.rocketmq.snode.service.impl.RemoteEnodeServiceImpl;
......@@ -84,8 +86,11 @@ public class SnodeController {
.getLogger(LoggerName.SNODE_LOGGER_NAME);
private final SnodeConfig snodeConfig;
private final MqttConfig mqttConfig;
private final ServerConfig nettyServerConfig;
private final ClientConfig nettyClientConfig;
private final ServerConfig mqttServerConfig;
private final ClientConfig mqttClientConfig;
private RemotingClient remotingClient;
private RemotingServer snodeServer;
private RemotingClient mqttRemotingClient;
......@@ -118,17 +123,19 @@ public class SnodeController {
private SlowConsumerService slowConsumerService;
private MetricsService metricsService;
private WillMessageService willMessageService;
private MqttPushServiceImpl mqttPushService;
private final ScheduledExecutorService scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"SnodeControllerScheduledThread"));
public SnodeController(ServerConfig nettyServerConfig,
ClientConfig nettyClientConfig,
SnodeConfig snodeConfig) {
this.nettyClientConfig = nettyClientConfig;
this.nettyServerConfig = nettyServerConfig;
public SnodeController(SnodeConfig snodeConfig, MqttConfig mqttConfig) {
this.nettyClientConfig = snodeConfig.getNettyClientConfig();
this.nettyServerConfig = snodeConfig.getNettyServerConfig();
this.mqttServerConfig = mqttConfig.getMqttServerConfig();
this.mqttClientConfig = mqttConfig.getMqttClientConfig();
this.snodeConfig = snodeConfig;
this.mqttConfig = mqttConfig;
if (!this.snodeConfig.isEmbeddedModeEnable()) {
this.enodeService = new RemoteEnodeServiceImpl(this);
} else {
......@@ -136,11 +143,16 @@ public class SnodeController {
}
this.nnodeService = new NnodeServiceImpl(this);
this.scheduledService = new ScheduledServiceImpl(this);
this.remotingClient = RemotingClientFactory.getInstance().createRemotingClient()
.init(this.getNettyClientConfig(), null);
this.remotingClient = RemotingClientFactory.getInstance().createRemotingClient();
if (this.remotingClient != null) {
this.remotingClient.init(this.getNettyClientConfig(), null);
}
this.mqttRemotingClient = RemotingClientFactory.getInstance()
.createRemotingClient(RemotingUtil.MQTT_PROTOCOL)
.init(this.getNettyClientConfig(), null);
.createRemotingClient(RemotingUtil.MQTT_PROTOCOL);
if (this.mqttRemotingClient != null) {
this.mqttRemotingClient.init(this.mqttClientConfig, null);
}
this.sendMessageExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeSendMessageMinPoolSize(),
......@@ -179,12 +191,12 @@ public class SnodeController {
false);
this.handleMqttMessageExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeHandleMqttMessageMinPoolSize(),
snodeConfig.getSnodeHandleMqttMessageMaxPoolSize(),
mqttConfig.getHandleMqttMessageMinPoolSize(),
mqttConfig.getHandleMqttMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeHandleMqttThreadPoolQueueCapacity()),
"SnodeHandleMqttMessageThread",
new ArrayBlockingQueue<>(mqttConfig.getHandleMqttThreadPoolQueueCapacity()),
"handleMqttMessageThread",
false);
if (this.snodeConfig.getNamesrvAddr() != null) {
......@@ -211,12 +223,17 @@ public class SnodeController {
this.slowConsumerService = new SlowConsumerServiceImpl(this);
this.metricsService = new MetricsServiceImpl();
this.willMessageService = new WillMessageServiceImpl(this);
this.mqttPushService = new MqttPushServiceImpl(this);
}
public SnodeConfig getSnodeConfig() {
return snodeConfig;
}
public MqttConfig getMqttConfig() {
return mqttConfig;
}
private void initRemotingServerInterceptorGroup() {
List<Interceptor> remotingServerInterceptors = InterceptorFactory.getInstance()
.loadInterceptors(this.snodeConfig.getRemotingServerInterceptorPath());
......@@ -233,15 +250,21 @@ public class SnodeController {
}
public boolean initialize() {
this.snodeServer = RemotingServerFactory.getInstance().createRemotingServer().init(this.nettyServerConfig, this.clientHousekeepingService);
this.mqttRemotingServer = RemotingServerFactory.getInstance().createRemotingServer(RemotingUtil.MQTT_PROTOCOL)
.init(this.nettyServerConfig, this.clientHousekeepingService);
this.registerProcessor();
initSnodeInterceptorGroup();
initRemotingServerInterceptorGroup();
initAclInterceptorGroup();
this.snodeServer.registerInterceptorGroup(this.remotingServerInterceptorGroup);
this.mqttRemotingServer.registerInterceptorGroup(this.remotingServerInterceptorGroup);
this.snodeServer = RemotingServerFactory.getInstance().createRemotingServer();
if (this.snodeServer != null) {
this.snodeServer.init(this.nettyServerConfig, this.clientHousekeepingService);
this.snodeServer.registerInterceptorGroup(this.remotingServerInterceptorGroup);
}
this.mqttRemotingServer = RemotingServerFactory.getInstance().createRemotingServer(
RemotingUtil.MQTT_PROTOCOL);
if (this.mqttRemotingServer != null) {
this.mqttRemotingServer.init(this.mqttServerConfig, this.clientHousekeepingService);
this.mqttRemotingServer.registerInterceptorGroup(this.remotingServerInterceptorGroup);
}
registerProcessor();
return true;
}
......@@ -312,31 +335,41 @@ public class SnodeController {
}
public void registerProcessor() {
this.snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor);
this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor);
this.snodeServer.registerProcessor(RequestCode.HEART_BEAT, heartbeatProcessor, this.heartbeatExecutor);
this.snodeServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, heartbeatProcessor, this.heartbeatExecutor);
this.snodeServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, heartbeatProcessor, this.heartbeatExecutor);
this.snodeServer.registerProcessor(RequestCode.PULL_MESSAGE, pullMessageProcessor, this.pullMessageExecutor);
this.snodeServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.GET_MIN_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor, this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, consumerManageProcessor, this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.LOCK_BATCH_MQ, consumerManageProcessor, this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.UNLOCK_BATCH_MQ, consumerManageProcessor, this.consumerManageExecutor);
this.mqttRemotingServer.registerProcessor(RequestCode.MQTT_MESSAGE, defaultMqttMessageProcessor, handleMqttMessageExecutor);
private void registerProcessor() {
if (snodeServer != null) {
this.snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor);
this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor);
this.snodeServer.registerProcessor(RequestCode.HEART_BEAT, heartbeatProcessor, this.heartbeatExecutor);
this.snodeServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, heartbeatProcessor, this.heartbeatExecutor);
this.snodeServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, heartbeatProcessor, this.heartbeatExecutor);
this.snodeServer.registerProcessor(RequestCode.PULL_MESSAGE, pullMessageProcessor, this.pullMessageExecutor);
this.snodeServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, pullMessageProcessor, this.pullMessageExecutor);
this.snodeServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.GET_MIN_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor, this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, consumerManageProcessor, this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.LOCK_BATCH_MQ, consumerManageProcessor, this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.UNLOCK_BATCH_MQ, consumerManageProcessor, this.consumerManageExecutor);
}
if (mqttRemotingServer != null) {
this.mqttRemotingServer.registerProcessor(RequestCode.MQTT_MESSAGE, defaultMqttMessageProcessor, handleMqttMessageExecutor);
}
}
public void start() {
initialize();
this.snodeServer.start();
this.mqttRemotingServer.start();
if (snodeServer != null) {
this.snodeServer.start();
}
if (mqttRemotingServer != null) {
this.mqttRemotingServer.start();
}
this.remotingClient.start();
this.mqttRemotingClient.start();
if (mqttRemotingClient != null) {
this.mqttRemotingClient.start();
}
this.scheduledService.startScheduleTask();
this.clientHousekeepingService.start(this.snodeConfig.getHouseKeepingInterval());
this.metricsService.start(this.snodeConfig.getMetricsExportPort());
......@@ -526,4 +559,12 @@ public class SnodeController {
WillMessageService willMessageService) {
this.willMessageService = willMessageService;
}
public MqttPushServiceImpl getMqttPushService() {
return mqttPushService;
}
public void setMqttPushService(MqttPushServiceImpl mqttPushService) {
this.mqttPushService = mqttPushService;
}
}
......@@ -20,7 +20,9 @@ import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
......@@ -32,12 +34,14 @@ import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerStartup;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
......@@ -47,19 +51,22 @@ import org.slf4j.LoggerFactory;
import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_ENABLE;
public class SnodeStartup {
private static InternalLogger log;
private static InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
public static Properties properties = null;
public static CommandLine commandLine = null;
public static String configFile = null;
private static final String DEFAULT_MQTT_CONFIG_FILE = "/conf/mqtt.properties";
private static String mqttConfigFileName = System.getProperty("rocketmq.mqtt.config", DEFAULT_MQTT_CONFIG_FILE);
public static void main(String[] args) throws IOException, JoranException {
SnodeConfig snodeConfig = loadConfig(args);
MqttConfig mqttConfig = loadMqttConfig(snodeConfig);
if (snodeConfig.isEmbeddedModeEnable()) {
BrokerController brokerController = BrokerStartup.createBrokerController(args);
BrokerStartup.start(brokerController);
snodeConfig.setSnodeName(brokerController.getBrokerConfig().getBrokerName());
}
SnodeController snodeController = createSnodeController(snodeConfig);
SnodeController snodeController = createSnodeController(snodeConfig, mqttConfig);
startup(snodeController);
}
......@@ -93,7 +100,6 @@ public class SnodeStartup {
SnodeConfig snodeConfig = new SnodeConfig();
final ServerConfig nettyServerConfig = new ServerConfig();
final ClientConfig nettyClientConfig = new ClientConfig();
nettyServerConfig.setListenPort(snodeConfig.getListenPort());
nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
......@@ -119,18 +125,47 @@ public class SnodeStartup {
System.exit(-2);
}
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), snodeConfig);
String namesrvAddr = snodeConfig.getNamesrvAddr();
if (null != namesrvAddr) {
try {
String[] addrArray = namesrvAddr.split(";");
for (String addr : addrArray) {
RemotingUtil.string2SocketAddress(addr);
}
} catch (Exception e) {
System.out.printf(
"The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
namesrvAddr);
System.exit(-3);
}
}
MixAll.printObjectProperties(log, snodeConfig);
MixAll.printObjectProperties(log, snodeConfig.getNettyServerConfig());
MixAll.printObjectProperties(log, snodeConfig.getNettyClientConfig());
return snodeConfig;
}
public static SnodeController createSnodeController(SnodeConfig snodeConfig) throws JoranException {
public static MqttConfig loadMqttConfig(SnodeConfig snodeConfig) throws IOException {
MqttConfig mqttConfig = new MqttConfig();
final ServerConfig mqttServerConfig = new ServerConfig();
final ClientConfig mqttClientConfig = new ClientConfig();
mqttServerConfig.setListenPort(mqttConfig.getListenPort());
String file = snodeConfig.getRocketmqHome() + File.separator + mqttConfigFileName;
loadMqttProperties(file, mqttServerConfig, mqttClientConfig);
mqttConfig.setMqttServerConfig(mqttServerConfig);
mqttConfig.setMqttClientConfig(mqttClientConfig);
MixAll.printObjectProperties(log, mqttConfig);
MixAll.printObjectProperties(log, mqttConfig.getMqttServerConfig());
MixAll.printObjectProperties(log, mqttConfig.getMqttClientConfig());
return mqttConfig;
}
public static SnodeController createSnodeController(SnodeConfig snodeConfig, MqttConfig mqttConfig) throws JoranException {
final SnodeController snodeController = new SnodeController(
snodeConfig.getNettyServerConfig(),
snodeConfig.getNettyClientConfig(),
snodeConfig);
final SnodeController snodeController = new SnodeController(snodeConfig, mqttConfig);
boolean initResult = snodeController.initialize();
if (!initResult) {
......@@ -182,5 +217,21 @@ public class SnodeStartup {
return options;
}
private static void loadMqttProperties(String file, ServerConfig mqttServerConfig,
ClientConfig mqttClientConfig) throws IOException {
InputStream in;
try {
in = new BufferedInputStream(new FileInputStream(file));
Properties properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, mqttServerConfig);
MixAll.properties2Object(properties, mqttClientConfig);
in.close();
} catch (FileNotFoundException e) {
log.info("The mqtt config file is not found. filePath={}", file);
}
}
}
......@@ -44,6 +44,8 @@ public class Client {
private boolean cleanSession;
private boolean willFlag;
private String snodeAddress;
public ClientRole getClientRole() {
......@@ -71,13 +73,14 @@ public class Client {
language == client.language &&
isConnected == client.isConnected &&
cleanSession == client.cleanSession &&
willFlag == client.willFlag &&
snodeAddress == client.snodeAddress;
}
@Override
public int hashCode() {
return Objects.hash(clientRole, clientId, groups, remotingChannel, heartbeatInterval,
lastUpdateTimestamp, version, language, isConnected, cleanSession, snodeAddress);
lastUpdateTimestamp, version, language, isConnected, cleanSession, willFlag, snodeAddress);
}
public RemotingChannel getRemotingChannel() {
......@@ -144,6 +147,14 @@ public class Client {
this.cleanSession = cleanSession;
}
public boolean isWillFlag() {
return willFlag;
}
public void setWillFlag(boolean willFlag) {
this.willFlag = willFlag;
}
public String getSnodeAddress() {
return snodeAddress;
}
......@@ -173,6 +184,7 @@ public class Client {
", language=" + language +
", isConnected=" + isConnected +
", cleanSession=" + cleanSession +
", willFlag=" + willFlag +
", snodeAddress=" + snodeAddress +
'}';
}
......
......@@ -43,7 +43,7 @@ public class ClientHousekeepingService implements ChannelEventListener {
public void start(long interval) {
this.producerManager.startScan(interval);
this.consumerManager.startScan(interval);
this.iotClientManager.startScan(interval);
// this.iotClientManager.startScan(interval);
}
public void shutdown() {
......
......@@ -144,7 +144,7 @@ public abstract class ClientManagerImpl implements ClientManager {
return updated;
}
private void removeClient(String groupId, RemotingChannel remotingChannel) {
protected void removeClient(String groupId, RemotingChannel remotingChannel) {
ConcurrentHashMap<RemotingChannel, Client> channelTable = groupClientTable.get(groupId);
if (channelTable != null) {
Client prev = channelTable.remove(remotingChannel);
......@@ -211,6 +211,9 @@ public abstract class ClientManagerImpl implements ClientManager {
}
ConcurrentHashMap<RemotingChannel, Client> channelClientMap = groupClientTable
.get(groupId);
if (remotingChannel instanceof NettyChannelHandlerContextImpl) {
remotingChannel = new NettyChannelImpl(((NettyChannelHandlerContextImpl) remotingChannel).getChannelHandlerContext().channel());
}
return channelClientMap.get(remotingChannel);
}
}
......@@ -19,9 +19,10 @@ package org.apache.rocketmq.snode.client.impl;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.heartbeat.MqttSubscriptionData;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
......@@ -35,7 +36,7 @@ public class IOTClientManagerImpl extends ClientManagerImpl {
public static final String IOT_GROUP = "IOT_GROUP";
private final SnodeController snodeController;
private final ConcurrentHashMap<String/*root topic*/, ConcurrentHashMap<Client, List<MqttSubscriptionData>>> topic2SubscriptionTable = new ConcurrentHashMap<>(
private final ConcurrentHashMap<String/*root topic*/, ConcurrentHashMap<Client, Set<SubscriptionData>>> topic2SubscriptionTable = new ConcurrentHashMap<>(
1024);
private final ConcurrentHashMap<String/*clientId*/, Subscription> clientId2Subscription = new ConcurrentHashMap<>(1024);
......@@ -47,9 +48,25 @@ public class IOTClientManagerImpl extends ClientManagerImpl {
//do the logic when client sends unsubscribe packet.
}
@Override public void onClose(Set<String> groups, RemotingChannel remotingChannel) {
for (String groupId : groups) {
//remove client after invoking onClosed method(client may be used in onClosed)
onClosed(groupId, remotingChannel);
removeClient(groupId, remotingChannel);
}
}
@Override
public void onClosed(String group, RemotingChannel remotingChannel) {
//do the logic when connection is closed by any reason.
//step1. Clean subscription data if cleanSession=1
Client client = this.getClient(IOT_GROUP, remotingChannel);
if (client.isCleanSession()) {
cleanSessionState(client.getClientId());
}
//step2. Publish will message associated with current connection(Question: Does will message need to be deleted after publishing.)
//step3. If will retain is true, add the will message to retain message.
}
@Override
......@@ -63,10 +80,10 @@ public class IOTClientManagerImpl extends ClientManagerImpl {
public void cleanSessionState(String clientId) {
clientId2Subscription.remove(clientId);
for (Iterator<Map.Entry<String, ConcurrentHashMap<Client, List<MqttSubscriptionData>>>> iterator = topic2SubscriptionTable.entrySet().iterator(); iterator.hasNext(); ) {
Map.Entry<String, ConcurrentHashMap<Client, List<MqttSubscriptionData>>> next = iterator.next();
for (Iterator<Map.Entry<Client, List<MqttSubscriptionData>>> iterator1 = next.getValue().entrySet().iterator(); iterator1.hasNext(); ) {
Map.Entry<Client, List<MqttSubscriptionData>> next1 = iterator1.next();
for (Iterator<Map.Entry<String, ConcurrentHashMap<Client, Set<SubscriptionData>>>> iterator = topic2SubscriptionTable.entrySet().iterator(); iterator.hasNext(); ) {
Map.Entry<String, ConcurrentHashMap<Client, Set<SubscriptionData>>> next = iterator.next();
for (Iterator<Map.Entry<Client, Set<SubscriptionData>>> iterator1 = next.getValue().entrySet().iterator(); iterator1.hasNext(); ) {
Map.Entry<Client, Set<SubscriptionData>> next1 = iterator1.next();
if (!next1.getKey().getClientId().equals(clientId)) {
continue;
}
......@@ -84,7 +101,7 @@ public class IOTClientManagerImpl extends ClientManagerImpl {
return snodeController;
}
public ConcurrentHashMap<String, ConcurrentHashMap<Client, List<MqttSubscriptionData>>> getTopic2SubscriptionTable() {
public ConcurrentHashMap<String, ConcurrentHashMap<Client, Set<SubscriptionData>>> getTopic2SubscriptionTable() {
return topic2SubscriptionTable;
}
......
......@@ -17,15 +17,20 @@
package org.apache.rocketmq.snode.processor;
import com.alibaba.fastjson.JSON;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttSubscribePayload;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import org.apache.rocketmq.common.constant.LoggerName;
......@@ -33,11 +38,10 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload;
import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.processor.mqtthandler.MessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler;
......@@ -83,20 +87,33 @@ public class DefaultMqttMessageProcessor implements RequestProcessor {
@Override
public RemotingCommand processRequest(RemotingChannel remotingChannel, RemotingCommand message)
throws RemotingCommandException, UnsupportedEncodingException {
MqttHeader mqttHeader = (MqttHeader) message.decodeCommandCustomHeader(MqttHeader.class);
MqttHeader mqttHeader = (MqttHeader) message.readCustomHeader();
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.valueOf(mqttHeader.getMessageType()),
mqttHeader.isDup(), MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(),
mqttHeader.getRemainingLength());
MqttMessage mqttMessage = null;
switch (fixedHeader.messageType()) {
case CONNECT:
MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader(
MqttConnectVariableHeader mqttConnectVariableHeader = new MqttConnectVariableHeader(
mqttHeader.getName(), mqttHeader.getVersion(), mqttHeader.isHasUserName(),
mqttHeader.isHasPassword(), mqttHeader.isWillRetain(),
mqttHeader.getWillQos(), mqttHeader.isWillFlag(),
mqttHeader.isCleanSession(), mqttHeader.getKeepAliveTimeSeconds());
RocketMQMqttConnectPayload payload = decode(message.getBody(), RocketMQMqttConnectPayload.class);
mqttMessage = new MqttConnectMessage(fixedHeader, variableHeader, payload.toMqttConnectPayload());
// MqttConnectPayload mqttConnectPayload = (MqttConnectPayload) message.getPayload();
MqttConnectPayload mqttConnectPayload = (MqttConnectPayload) MqttEncodeDecodeUtil.decode(message.getBody(), MqttConnectPayload.class);
mqttMessage = new MqttConnectMessage(fixedHeader, mqttConnectVariableHeader, mqttConnectPayload);
break;
case PUBLISH:
MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(mqttHeader.getTopicName(), mqttHeader.getPacketId());
mqttMessage = new MqttPublishMessage(fixedHeader, mqttPublishVariableHeader, Unpooled.copiedBuffer(message.getBody()));
break;
case SUBSCRIBE:
MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(mqttHeader.getMessageId());
MqttSubscribePayload mqttSubscribePayload = (MqttSubscribePayload) MqttEncodeDecodeUtil.decode(message.getBody(), MqttSubscribePayload.class);
mqttMessage = new MqttSubscribeMessage(fixedHeader, mqttMessageIdVariableHeader, mqttSubscribePayload);
break;
case UNSUBSCRIBE:
case PINGREQ:
case DISCONNECT:
}
return type2handler.get(MqttMessageType.valueOf(mqttHeader.getMessageType())).handleMessage(mqttMessage, remotingChannel);
......@@ -107,11 +124,6 @@ public class DefaultMqttMessageProcessor implements RequestProcessor {
return false;
}
private <T> T decode(final byte[] data, Class<T> classOfT) {
final String json = new String(data, Charset.forName(RemotingUtil.REMOTING_CHARSET));
return JSON.parseObject(json, classOfT);
}
private void registerMessageHandler(MqttMessageType type, MessageHandler handler) {
type2handler.put(type, handler);
}
......
......@@ -23,6 +23,7 @@ import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.util.HashSet;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.mqtt.WillMessage;
......@@ -92,6 +93,7 @@ public class MqttConnectMessageHandler implements MessageHandler {
}
//treat a second CONNECT packet as a protocol violation and disconnect
if (isConnected(remotingChannel, payload.clientIdentifier())) {
log.error("This client has been connected. The second CONNECT packet is treated as a protocol vialation and the connection will be closed.");
remotingChannel.close();
return null;
}
......@@ -118,6 +120,7 @@ public class MqttConnectMessageHandler implements MessageHandler {
Client client = new Client();
client.setClientId(payload.clientIdentifier());
client.setClientRole(ClientRole.IOTCLIENT);
client.setGroups(new HashSet<String>(){{add("IOT_GROUP");}});
client.setConnected(true);
client.setRemotingChannel(remotingChannel);
client.setLastUpdateTimestamp(System.currentTimeMillis());
......
......@@ -62,7 +62,9 @@ public class MqttDisconnectMessageHandler implements MessageHandler {
snodeController.getWillMessageService().deleteWillMessage(client.getClientId());
}
client.setConnected(false);
remotingChannel.close();
if (remotingChannel.isActive()) {
remotingChannel.close();
}
return null;
}
}
......@@ -17,13 +17,27 @@
package org.apache.rocketmq.snode.processor.mqtthandler;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.exception.WrongMessageTypeException;
import org.apache.rocketmq.snode.util.MqttUtil;
public class MqttPublishMessageHandler implements MessageHandler {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private final SnodeController snodeController;
public MqttPublishMessageHandler(SnodeController snodeController) {
......@@ -32,7 +46,40 @@ public class MqttPublishMessageHandler implements MessageHandler {
@Override
public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
if (!(message instanceof MqttPublishMessage)) {
log.error("Wrong message type! Expected type: PUBLISH but {} was received.", message.fixedHeader().messageType());
throw new WrongMessageTypeException("Wrong message type exception.");
}
MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) message;
MqttFixedHeader fixedHeader = mqttPublishMessage.fixedHeader();
MqttPublishVariableHeader variableHeader = mqttPublishMessage.variableHeader();
if (MqttUtil.isQosLegal(fixedHeader.qosLevel())) {
log.error("The QoS level should be 0 or 1 or 2. The connection will be closed. remotingChannel={}, MqttMessage={}", remotingChannel.toString(), message.toString());
remotingChannel.close();
return null;
}
ByteBuf payload = mqttPublishMessage.payload();
if (fixedHeader.qosLevel().equals(MqttQoS.AT_MOST_ONCE)) {
snodeController.getMqttPushService().pushMessageQos0(variableHeader.topicName(), payload);
} else if (fixedHeader.qosLevel().equals(MqttQoS.AT_LEAST_ONCE)) {
//Push messages to subscribers and add it to IN-FLIGHT messages
}
if (fixedHeader.qosLevel().value() > 0) {
RemotingCommand command = RemotingCommand.createResponseCommand(MqttHeader.class);
MqttHeader mqttHeader = (MqttHeader) command.readCustomHeader();
if (fixedHeader.qosLevel().equals(MqttQoS.AT_MOST_ONCE)) {
mqttHeader.setMessageType(MqttMessageType.PUBACK.value());
mqttHeader.setDup(false);
mqttHeader.setQosLevel(MqttQoS.AT_MOST_ONCE.value());
mqttHeader.setRetain(false);
mqttHeader.setRemainingLength(2);
mqttHeader.setPacketId(0);
} else if (fixedHeader.qosLevel().equals(MqttQoS.AT_LEAST_ONCE)) {
//PUBREC/PUBREL/PUBCOMP
}
return command;
}
return null;
}
}
......@@ -25,7 +25,9 @@ import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttSubscribePayload;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.ResponseCode;
......@@ -36,7 +38,7 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttSubAckPayload;
import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.Client;
import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl;
......@@ -80,6 +82,7 @@ public class MqttSubscribeMessageHandler implements MessageHandler {
if (payload.topicSubscriptions() == null || payload.topicSubscriptions().size() == 0) {
log.error("The payload of a SUBSCRIBE packet MUST contain at least one Topic Filter / QoS pair. This will be treated as protocol violation and the connection will be closed. remotingChannel={}, MqttMessage={}", remotingChannel.toString(), message.toString());
remotingChannel.close();
return null;
}
if (isQosLegal(payload.topicSubscriptions())) {
log.error("The QoS level of Topic Filter / QoS pairs should be 0 or 1 or 2. The connection will be closed. remotingChannel={}, MqttMessage={}", remotingChannel.toString(), message.toString());
......@@ -97,12 +100,13 @@ public class MqttSubscribeMessageHandler implements MessageHandler {
mqttHeader.setDup(false);
mqttHeader.setQosLevel(MqttQoS.AT_MOST_ONCE.value());
mqttHeader.setRetain(false);
// mqttHeader.setRemainingLength(0x02);
mqttHeader.setMessageId(mqttSubscribeMessage.variableHeader().messageId());
List<Integer> grantQoss = doSubscribe(client, payload.topicSubscriptions(), iotClientManager);
RocketMQMqttSubAckPayload ackPayload = RocketMQMqttSubAckPayload.fromMqttSubAckPayload(new MqttSubAckPayload(grantQoss));
command.setBody(ackPayload.encode());
//Publish retained messages to subscribers.
MqttSubAckPayload mqttSubAckPayload = new MqttSubAckPayload(grantQoss);
command.setBody(MqttEncodeDecodeUtil.encode(mqttSubAckPayload));
mqttHeader.setRemainingLength(0x02 + mqttSubAckPayload.grantedQoSLevels().size());
command.setRemark(null);
command.setCode(ResponseCode.SUCCESS);
return command;
......@@ -111,9 +115,9 @@ public class MqttSubscribeMessageHandler implements MessageHandler {
private List<Integer> doSubscribe(Client client, List<MqttTopicSubscription> mqttTopicSubscriptions,
IOTClientManagerImpl iotClientManager) {
//do the logic when client sends subscribe packet.
//1.register clientId2Subscription
//1.update clientId2Subscription
ConcurrentHashMap<String, Subscription> clientId2Subscription = iotClientManager.getClientId2Subscription();
ConcurrentHashMap<String, ConcurrentHashMap<Client, List<MqttSubscriptionData>>> topic2SubscriptionTable = iotClientManager.getTopic2SubscriptionTable();
ConcurrentHashMap<String, ConcurrentHashMap<Client, Set<SubscriptionData>>> topic2SubscriptionTable = iotClientManager.getTopic2SubscriptionTable();
Subscription subscription = null;
if (clientId2Subscription.containsKey(client.getClientId())) {
subscription = clientId2Subscription.get(client.getClientId());
......@@ -128,7 +132,25 @@ public class MqttSubscribeMessageHandler implements MessageHandler {
grantQoss.add(actualQos);
SubscriptionData subscriptionData = new MqttSubscriptionData(mqttTopicSubscription.qualityOfService().value(), client.getClientId(), mqttTopicSubscription.topicName());
subscriptionDatas.put(mqttTopicSubscription.topicName(), subscriptionData);
//2.register topic2SubscriptionTable
//2.update topic2SubscriptionTable
String rootTopic = MqttUtil.getRootTopic(mqttTopicSubscription.topicName());
ConcurrentHashMap<Client, Set<SubscriptionData>> client2SubscriptionData = topic2SubscriptionTable.get(rootTopic);
if (client2SubscriptionData == null) {
client2SubscriptionData = new ConcurrentHashMap<>();
ConcurrentHashMap<Client, Set<SubscriptionData>> prev = topic2SubscriptionTable.putIfAbsent(rootTopic, client2SubscriptionData);
if (prev != null) {
client2SubscriptionData = prev;
}
Set<SubscriptionData> subscriptionDataSet = client2SubscriptionData.get(client);
if (subscriptionDataSet == null) {
subscriptionDataSet = new HashSet<>();
Set<SubscriptionData> prevSubscriptionDataSet = client2SubscriptionData.putIfAbsent(client, subscriptionDataSet);
if (prevSubscriptionDataSet != null) {
subscriptionDataSet = prevSubscriptionDataSet;
}
subscriptionDataSet.add(subscriptionData);
}
}
}
return grantQoss;
}
......
......@@ -17,34 +17,109 @@
package org.apache.rocketmq.snode.processor.mqtthandler;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import io.netty.handler.codec.mqtt.MqttUnsubscribePayload;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.Client;
import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl;
import org.apache.rocketmq.snode.client.impl.Subscription;
import org.apache.rocketmq.snode.exception.WrongMessageTypeException;
import org.apache.rocketmq.snode.util.MqttUtil;
/**
* handle the UNSUBSCRIBE message from the client
* <ol>
* <li>extract topic filters to be un-subscribed</li>
* <li>get the topics matching with the topic filters</li>
* <li>verify the authorization of the client to the </li>
* <li>remove subscription from the SubscriptionStore</li>
* </ol>
*/
public class MqttUnsubscribeMessagHandler implements MessageHandler {
/* private SubscriptionStore subscriptionStore;
public MqttUnsubscribeMessagHandler(SubscriptionStore subscriptionStore) {
this.subscriptionStore = subscriptionStore;
}*/
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private final SnodeController snodeController;
public MqttUnsubscribeMessagHandler(SnodeController snodeController) {
this.snodeController = snodeController;
}
@Override
public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
return null;
if (!(message instanceof MqttUnsubscribeMessage)) {
log.error("Wrong message type! Expected type: UNSUBSCRIBE but {} was received. MqttMessage={}", message.fixedHeader().messageType(), message.toString());
throw new WrongMessageTypeException("Wrong message type exception.");
}
MqttUnsubscribeMessage unsubscribeMessage = (MqttUnsubscribeMessage) message;
MqttFixedHeader fixedHeader = unsubscribeMessage.fixedHeader();
if (fixedHeader.isDup() || !fixedHeader.qosLevel().equals(MqttQoS.AT_LEAST_ONCE) || fixedHeader.isRetain()) {
log.error("Malformed value of reserved bits(bits 3,2,1,0) of fixed header. Expected=0010, received={}{}{}{}", fixedHeader.isDup() ? 1 : 0, Integer.toBinaryString(fixedHeader.qosLevel().value()), fixedHeader.isRetain() ? 1 : 0);
remotingChannel.close();
return null;
}
MqttUnsubscribePayload payload = unsubscribeMessage.payload();
if (payload.topics() == null || payload.topics().size() == 0) {
log.error("The payload of a UNSUBSCRIBE packet MUST contain at least one Topic Filter. This will be treated as protocol violation and the connection will be closed. remotingChannel={}, MqttMessage={}", remotingChannel.toString(), message.toString());
remotingChannel.close();
return null;
}
IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) snodeController.getIotClientManager();
Client client = iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel);
if (client == null) {
log.error("Can't find associated client, the connection will be closed. remotingChannel={}, MqttMessage={}", remotingChannel.toString(), message.toString());
remotingChannel.close();
return null;
}
RemotingCommand command = RemotingCommand.createResponseCommand(MqttHeader.class);
MqttHeader mqttHeader = (MqttHeader) command.readCustomHeader();
mqttHeader.setMessageType(MqttMessageType.SUBACK.value());
mqttHeader.setDup(false);
mqttHeader.setQosLevel(MqttQoS.AT_MOST_ONCE.value());
mqttHeader.setRetain(false);
mqttHeader.setMessageId(unsubscribeMessage.variableHeader().messageId());
doUnsubscribe(client, payload.topics(), iotClientManager);
mqttHeader.setRemainingLength(0x02);
command.setRemark(null);
command.setCode(ResponseCode.SUCCESS);
return command;
}
private void doUnsubscribe(Client client, List<String> topics, IOTClientManagerImpl iotClientManager) {
ConcurrentHashMap<String, Subscription> clientId2Subscription = iotClientManager.getClientId2Subscription();
ConcurrentHashMap<String, ConcurrentHashMap<Client, Set<SubscriptionData>>> topic2SubscriptionTable = iotClientManager.getTopic2SubscriptionTable();
for (String topicFilter : topics) {
//1.update clientId2Subscription
if (clientId2Subscription.containsKey(client.getClientId())) {
Subscription subscription = clientId2Subscription.get(client.getClientId());
subscription.getSubscriptionTable().remove(topicFilter);
}
//2.update topic2SubscriptionTable
String rootTopic = MqttUtil.getRootTopic(topicFilter);
ConcurrentHashMap<Client, Set<SubscriptionData>> client2SubscriptionData = topic2SubscriptionTable.get(rootTopic);
if (client2SubscriptionData != null) {
Set<SubscriptionData> subscriptionDataSet = client2SubscriptionData.get(client);
if (subscriptionDataSet != null) {
Iterator<SubscriptionData> iterator = subscriptionDataSet.iterator();
while (iterator.hasNext()) {
if (iterator.next().getTopic().equals(topicFilter))
iterator.remove();
}
}
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.service.impl;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.util.ReferenceCountUtil;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.Client;
import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl;
import org.apache.rocketmq.snode.constant.SnodeConstant;
import org.apache.rocketmq.snode.util.MqttUtil;
public class MqttPushServiceImpl {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private SnodeController snodeController;
private ExecutorService pushMqttMessageExecutorService;
public MqttPushServiceImpl(final SnodeController snodeController) {
this.snodeController = snodeController;
pushMqttMessageExecutorService = ThreadUtils.newThreadPoolExecutor(
this.snodeController.getMqttConfig().getPushMqttMessageMinPoolSize(),
this.snodeController.getMqttConfig().getPushMqttMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(this.snodeController.getMqttConfig().getPushMqttMessageThreadPoolQueueCapacity()),
"pushMqttMessageThread",
false);
}
public class MqttPushTask implements Runnable {
private AtomicBoolean canceled = new AtomicBoolean(false);
private final ByteBuf message;
private final String topic;
private final Integer qos;
private boolean retain;
private Integer packetId;
public MqttPushTask(final String topic, final ByteBuf message, final Integer qos, boolean retain,
Integer packetId) {
this.message = message;
this.topic = topic;
this.qos = qos;
this.retain = retain;
this.packetId = packetId;
}
@Override
public void run() {
if (!canceled.get()) {
try {
RemotingCommand requestCommand = buildRequestCommand(topic, qos, retain, packetId);
//find those clients publishing the message to
IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) snodeController.getIotClientManager();
ConcurrentHashMap<String, ConcurrentHashMap<Client, Set<SubscriptionData>>> topic2SubscriptionTable = iotClientManager.getTopic2SubscriptionTable();
Set<Client> clients = new HashSet<>();
if (topic2SubscriptionTable.containsKey(MqttUtil.getRootTopic(topic))) {
ConcurrentHashMap<Client, Set<SubscriptionData>> client2SubscriptionDatas = topic2SubscriptionTable.get(MqttUtil.getRootTopic(topic));
for (Map.Entry<Client, Set<SubscriptionData>> entry : client2SubscriptionDatas.entrySet()) {
Set<SubscriptionData> subscriptionDatas = entry.getValue();
for (SubscriptionData subscriptionData : subscriptionDatas) {
if (MqttUtil.isMatch(subscriptionData.getTopic(), topic)) {
clients.add(entry.getKey());
break;
}
}
}
}
for (Client client : clients) {
RemotingChannel remotingChannel = client.getRemotingChannel();
if (client.getRemotingChannel() instanceof NettyChannelHandlerContextImpl) {
remotingChannel = new NettyChannelImpl(((NettyChannelHandlerContextImpl) client.getRemotingChannel()).getChannelHandlerContext().channel());
}
byte[] body = new byte[message.readableBytes()];
message.readBytes(body);
requestCommand.setBody(body);
snodeController.getMqttRemotingServer().push(remotingChannel, requestCommand, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
}
} catch (Exception ex) {
log.warn("Exception was thrown when pushing MQTT message to topic: {}, exception={}", topic, ex.getMessage());
} finally {
System.out.println("Release Bytebuf");
ReferenceCountUtil.release(message);
}
} else {
log.info("Push message to topic: {} canceled!", topic);
}
}
private RemotingCommand buildRequestCommand(final String topic, final Integer qos, boolean retain,
Integer packetId) {
MqttHeader mqttHeader = new MqttHeader();
mqttHeader.setMessageType(MqttMessageType.PUBLISH.value());
if (qos == 0) {
mqttHeader.setDup(false);//DUP is always 0 for qos=0 messages
} else {
mqttHeader.setDup(false);//DUP is depending on whether it is a re-delivery of an earlier attempt.
}
mqttHeader.setQosLevel(qos);
mqttHeader.setRetain(retain);
mqttHeader.setPacketId(packetId);
mqttHeader.setTopicName(topic);
mqttHeader.setRemainingLength(4 + topic.getBytes().length + message.readableBytes());
RemotingCommand pushMessage = RemotingCommand.createRequestCommand(RequestCode.MQTT_MESSAGE, mqttHeader);
return pushMessage;
}
public void setCanceled(AtomicBoolean canceled) {
this.canceled = canceled;
}
}
public void pushMessageQos0(final String topic, final ByteBuf message) {
MqttPushTask pushTask = new MqttPushTask(topic, message, 0, false, 0);
pushMqttMessageExecutorService.submit(pushTask);
}
public void pushMessageQos1(final String topic, final ByteBuf message, final Integer qos, boolean retain,
Integer packetId) {
MqttPushTask pushTask = new MqttPushTask(topic, message, qos, retain, packetId);
pushMqttMessageExecutorService.submit(pushTask);
}
public void shutdown() {
this.pushMqttMessageExecutorService.shutdown();
}
}
......@@ -17,6 +17,7 @@
package org.apache.rocketmq.snode.util;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.util.UUID;
import org.apache.rocketmq.snode.constant.MqttConstant;
......@@ -33,4 +34,33 @@ public class MqttUtil {
public static int actualQos(int qos) {
return Math.min(MqttConstant.MAX_SUPPORTED_QOS, qos);
}
public static boolean isQosLegal(MqttQoS qos) {
if (!qos.equals(MqttQoS.AT_LEAST_ONCE) && !qos.equals(MqttQoS.AT_MOST_ONCE) && !qos.equals(MqttQoS.EXACTLY_ONCE)) {
return false;
}
return false;
}
public static boolean isMatch(String topicFiter, String topic) {
if (!topicFiter.contains(MqttConstant.SUBSCRIPTION_FLAG_PLUS) && !topicFiter.contains(MqttConstant.SUBSCRIPTION_FLAG_SHARP)) {
return topicFiter.equals(topic);
}
String[] filterTopics = topicFiter.split(MqttConstant.SUBSCRIPTION_SEPARATOR);
String[] actualTopics = topic.split(MqttConstant.SUBSCRIPTION_SEPARATOR);
int i = 0;
for (; i < filterTopics.length && i < actualTopics.length; i++) {
if (MqttConstant.SUBSCRIPTION_FLAG_PLUS.equals(filterTopics[i])) {
continue;
}
if (MqttConstant.SUBSCRIPTION_FLAG_SHARP.equals(filterTopics[i])) {
return true;
}
if (!filterTopics[i].equals(actualTopics[i])) {
return false;
}
}
return i == actualTopics.length;
}
}
......@@ -16,8 +16,8 @@
*/
package org.apache.rocketmq.snode;
import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig;
import org.junit.Test;
......@@ -29,10 +29,7 @@ public class SnodeControllerTest {
public void testSnodeRestart() {
ServerConfig serverConfig = new ServerConfig();
serverConfig.setListenPort(10912);
SnodeController snodeController = new SnodeController(
serverConfig,
new ClientConfig(),
new SnodeConfig());
SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig());
assertThat(snodeController.initialize());
snodeController.start();
snodeController.shutdown();
......@@ -44,10 +41,7 @@ public class SnodeControllerTest {
System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl.yml");
SnodeConfig snodeConfig = new SnodeConfig();
snodeConfig.setAclEnable(true);
SnodeController snodeController = new SnodeController(
new ServerConfig(),
new ClientConfig(),
snodeConfig);
SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig());
assertThat(snodeController.initialize());
snodeController.start();
snodeController.shutdown();
......
......@@ -16,23 +16,18 @@
*/
package org.apache.rocketmq.snode.processor;
import com.alibaba.fastjson.JSON;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload;
import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil;
import org.apache.rocketmq.snode.SnodeController;
import org.junit.Before;
import org.junit.Test;
......@@ -46,7 +41,7 @@ public class DefaultMqttMessageProcessorTest {
private DefaultMqttMessageProcessor defaultMqttMessageProcessor;
@Spy
private SnodeController snodeController = new SnodeController(new ServerConfig(), new ClientConfig(), new SnodeConfig());
private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig());
@Mock
private RemotingChannel remotingChannel;
......@@ -93,16 +88,7 @@ public class DefaultMqttMessageProcessorTest {
MqttHeader mqttHeader = createMqttConnectMesssageHeader();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.MQTT_MESSAGE, mqttHeader);
MqttConnectPayload payload = new MqttConnectPayload("1234567", "testTopic", "willMessage".getBytes(), null, "1234567".getBytes());
request.setBody(RocketMQMqttConnectPayload.fromMqttConnectPayload(payload).encode());
CodecHelper.makeCustomHeaderToNet(request);
request.setBody(MqttEncodeDecodeUtil.encode(payload));
return request;
}
private byte[] encode(Object obj) {
String json = JSON.toJSONString(obj, false);
if (json != null) {
return json.getBytes(Charset.forName(RemotingUtil.REMOTING_CHARSET));
}
return null;
}
}
......@@ -22,10 +22,9 @@ import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler;
import org.junit.Test;
......@@ -42,12 +41,10 @@ public class MqttConnectMessageHandlerTest {
@Test
public void testHandlerMessage() throws Exception {
MqttConnectMessageHandler mqttConnectMessageHandler = new MqttConnectMessageHandler(
new SnodeController(new ServerConfig(), new ClientConfig(), new SnodeConfig()));
MqttConnectMessageHandler mqttConnectMessageHandler = new MqttConnectMessageHandler(new SnodeController(new SnodeConfig(), new MqttConfig()));
MqttConnectPayload payload = new MqttConnectPayload("1234567", "testTopic", "willMessage".getBytes(), null, "1234567".getBytes());
MqttConnectMessage mqttConnectMessage = new MqttConnectMessage(new MqttFixedHeader(
MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 200),new MqttConnectVariableHeader(null,4,false,false,false,0,false,false,50),new MqttConnectPayload("abcd", "ttest", "message".getBytes(),"user","password".getBytes()));
MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 200), new MqttConnectVariableHeader(null, 4, false, false, false, 0, false, false, 50), new MqttConnectPayload("abcd", "ttest", "message".getBytes(), "user", "password".getBytes()));
mqttConnectMessageHandler.handleMessage(mqttConnectMessage, remotingChannel);
}
......
......@@ -20,11 +20,10 @@ import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.message.mqtt.WillMessage;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.Client;
import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl;
......@@ -43,8 +42,7 @@ public class MqttDisconnectMessageHandlerTest {
@Test
public void testHandlerMessage() throws Exception {
SnodeController snodeController = new SnodeController(new ServerConfig(),
new ClientConfig(), new SnodeConfig());
SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig());
MqttDisconnectMessageHandler mqttDisconnectMessageHandler = new MqttDisconnectMessageHandler(
snodeController);
Client client = new Client();
......
......@@ -18,13 +18,12 @@ package org.apache.rocketmq.snode.processor;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
......@@ -48,7 +47,7 @@ public class SendMessageProcessorTest {
private SendMessageProcessor sendMessageProcessor;
@Spy
private SnodeController snodeController = new SnodeController(new ServerConfig(), new ClientConfig(), new SnodeConfig());
private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig());
@Mock
private RemotingChannel remotingChannel;
......@@ -56,7 +55,7 @@ public class SendMessageProcessorTest {
private String topic = "snodeTopic";
private String group = "snodeGroup";
@Mock
private EnodeService enodeService;
......
......@@ -18,9 +18,8 @@ package org.apache.rocketmq.snode.service;
import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
......@@ -46,7 +45,7 @@ import static org.mockito.Mockito.when;
public class NnodeServiceImplTest extends SnodeTestBase {
@Spy
private SnodeController snodeController = new SnodeController(new ServerConfig(), new ClientConfig(), new SnodeConfig());
private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig());
@Mock
private NettyRemotingClient remotingClient;
......
......@@ -18,13 +18,12 @@ package org.apache.rocketmq.snode.service;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient;
......@@ -56,7 +55,7 @@ public class RemoteEnodeServiceImplTest extends SnodeTestBase {
private EnodeService enodeService;
@Spy
private SnodeController snodeController = new SnodeController(new ServerConfig(), new ClientConfig(), new SnodeConfig());
private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig());
@Mock
private NnodeService nnodeService;
......
......@@ -16,9 +16,8 @@
*/
package org.apache.rocketmq.snode.service;
import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.SlowConsumerService;
import org.apache.rocketmq.snode.client.impl.SlowConsumerServiceImpl;
......@@ -38,7 +37,7 @@ import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class SlowConsumerServiceImplTest {
@Spy
private SnodeController snodeController = new SnodeController(new ServerConfig(), new ClientConfig(), new SnodeConfig());
private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig());
private final String enodeName = "testEndoe";
......
......@@ -16,10 +16,9 @@
*/
package org.apache.rocketmq.snode.service;
import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.message.mqtt.WillMessage;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.SnodeTestBase;
import org.apache.rocketmq.snode.service.impl.WillMessageServiceImpl;
......@@ -33,8 +32,7 @@ import org.mockito.junit.MockitoJUnitRunner;
public class WillMessageServiceImplTest extends SnodeTestBase {
@Spy
private SnodeController snodeController = new SnodeController(new ServerConfig(),
new ClientConfig(), new SnodeConfig());
private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig());
private WillMessageService willMessageService;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册