diff --git a/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java b/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java
index dd380759b08fd5358e1583d5542af45891c34612..fc1b7e6a309833de0ca19c5d0320fe7d75ab9729 100644
--- a/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java
@@ -56,12 +56,12 @@ public class SnodeConfig {
private int snodeSendThreadPoolQueueCapacity = 10000;
- private int snodeHandleMqttThreadPoolQueueCapacity = 10000;
-
private int snodeSendMessageMinPoolSize = 10;
private int snodeSendMessageMaxPoolSize = 20;
+ private int snodeHandleMqttThreadPoolQueueCapacity = 10000;
+
private int snodeHandleMqttMessageMinPoolSize = 10;
private int snodeHandleMqttMessageMaxPoolSize = 20;
@@ -88,6 +88,12 @@ public class SnodeConfig {
private int snodePushMessageThreadPoolQueueCapacity = 10000;
+ private int snodePushMqttMessageMinPoolSize = 10;
+
+ private int snodePushMqttMessageMaxPoolSize = 20;
+
+ private int snodePushMqttMessageThreadPoolQueueCapacity = 10000;
+
private int slowConsumerThreshold = 1024;
private final String sendMessageInterceptorPath = "META-INF/service/org.apache.rocketmq.snode.interceptor.SendMessageInterceptor";
@@ -230,14 +236,6 @@ public class SnodeConfig {
this.snodeSendThreadPoolQueueCapacity = snodeSendThreadPoolQueueCapacity;
}
- public int getSnodeHandleMqttThreadPoolQueueCapacity() {
- return snodeHandleMqttThreadPoolQueueCapacity;
- }
-
- public void setSnodeHandleMqttThreadPoolQueueCapacity(
- int snodeHandleMqttThreadPoolQueueCapacity) {
- this.snodeHandleMqttThreadPoolQueueCapacity = snodeHandleMqttThreadPoolQueueCapacity;
- }
public int getSnodeSendMessageMinPoolSize() {
return snodeSendMessageMinPoolSize;
@@ -279,6 +277,14 @@ public class SnodeConfig {
this.snodeId = snodeId;
}
+ public int getSnodeHandleMqttThreadPoolQueueCapacity() {
+ return snodeHandleMqttThreadPoolQueueCapacity;
+ }
+
+ public void setSnodeHandleMqttThreadPoolQueueCapacity(int snodeHandleMqttThreadPoolQueueCapacity) {
+ this.snodeHandleMqttThreadPoolQueueCapacity = snodeHandleMqttThreadPoolQueueCapacity;
+ }
+
public int getSnodeHandleMqttMessageMinPoolSize() {
return snodeHandleMqttMessageMinPoolSize;
}
@@ -359,6 +365,30 @@ public class SnodeConfig {
this.snodePushMessageThreadPoolQueueCapacity = snodePushMessageThreadPoolQueueCapacity;
}
+ public int getSnodePushMqttMessageMinPoolSize() {
+ return snodePushMqttMessageMinPoolSize;
+ }
+
+ public void setSnodePushMqttMessageMinPoolSize(int snodePushMqttMessageMinPoolSize) {
+ this.snodePushMqttMessageMinPoolSize = snodePushMqttMessageMinPoolSize;
+ }
+
+ public int getSnodePushMqttMessageMaxPoolSize() {
+ return snodePushMqttMessageMaxPoolSize;
+ }
+
+ public void setSnodePushMqttMessageMaxPoolSize(int snodePushMqttMessageMaxPoolSize) {
+ this.snodePushMqttMessageMaxPoolSize = snodePushMqttMessageMaxPoolSize;
+ }
+
+ public int getSnodePushMqttMessageThreadPoolQueueCapacity() {
+ return snodePushMqttMessageThreadPoolQueueCapacity;
+ }
+
+ public void setSnodePushMqttMessageThreadPoolQueueCapacity(int snodePushMqttMessageThreadPoolQueueCapacity) {
+ this.snodePushMqttMessageThreadPoolQueueCapacity = snodePushMqttMessageThreadPoolQueueCapacity;
+ }
+
public String getSendMessageInterceptorPath() {
return sendMessageInterceptorPath;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/MqttSubscriptionData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/MqttSubscriptionData.java
index 33fa1a878cdf78f1524543678e6b4d2b4d298381..f52c100e1e05ab136a8d49ff60a93b7902686475 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/MqttSubscriptionData.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/MqttSubscriptionData.java
@@ -68,8 +68,6 @@ public class MqttSubscriptionData extends SubscriptionData {
if (getClass() != obj.getClass())
return false;
MqttSubscriptionData other = (MqttSubscriptionData) obj;
- if (qos != other.qos)
- return false;
if (clientId != other.clientId) {
return false;
}
diff --git a/example/pom.xml b/example/pom.xml
index b192e03f2c0590b354e1b0f057073a023bff1ed5..6706d07e2bb3bcd64b567abba54b4364700411aa 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -62,5 +62,9 @@
io.prometheus
simpleclient_hotspot
+
+ org.eclipse.paho
+ org.eclipse.paho.client.mqttv3
+
diff --git a/example/src/main/java/org/apache/rocketmq/example/mqtt/MqttSampleConsumer.java b/example/src/main/java/org/apache/rocketmq/example/mqtt/MqttSampleConsumer.java
new file mode 100644
index 0000000000000000000000000000000000000000..8c52a8f1e62da0278a3a1cf77c2bbd95cabb1952
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/mqtt/MqttSampleConsumer.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.example.mqtt;
+
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MqttSampleConsumer {
+
+ private static Logger log = LoggerFactory.getLogger(MqttSampleConsumer.class);
+
+ public static void main(String[] args) throws InterruptedException {
+ String topic = "mqtt-sample";
+ int qos = 0;
+ String broker = "tcp://127.0.0.1:1883";
+ String clinetId = "JavaSampleConsumer";
+
+ MemoryPersistence persistence = new MemoryPersistence();
+
+ {
+ try {
+ MqttClient sampleClient = new MqttClient(broker, clinetId, persistence);
+ MqttConnectOptions connectOptions = new MqttConnectOptions();
+ connectOptions.setCleanSession(true);
+ connectOptions.setKeepAliveInterval(6000);
+ log.info("Connecting to broker: " + broker);
+ sampleClient.connect(connectOptions);
+ log.info("Connected");
+ sampleClient.setCallback(new MqttCallback() {
+ @Override public void connectionLost(Throwable throwable) {
+ System.out.println("connection lost." + throwable.getLocalizedMessage());
+ }
+
+ @Override public void messageArrived(String s, MqttMessage message) throws Exception {
+ System.out.println(message.toString());
+// System.exit(0);
+ }
+
+ @Override public void deliveryComplete(IMqttDeliveryToken token) {
+ try {
+ System.out.println("delivery complete." + token.getMessage().toString());
+ } catch (MqttException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ log.info("Subscribing topic: " + topic);
+ sampleClient.subscribe(topic, qos);
+ log.info("Subsrcribe success.");
+ Thread.sleep(100000000);
+ } catch (MqttException me) {
+ log.error("reason " + me.getReasonCode());
+ log.error("msg " + me.getMessage());
+ log.error("loc " + me.getLocalizedMessage());
+ log.error("cause " + me.getCause());
+ log.error("excep " + me);
+ me.printStackTrace();
+ me.printStackTrace();
+ System.exit(1);
+ }
+ }
+ }
+
+}
diff --git a/example/src/main/java/org/apache/rocketmq/example/mqtt/MqttSampleProducer.java b/example/src/main/java/org/apache/rocketmq/example/mqtt/MqttSampleProducer.java
new file mode 100644
index 0000000000000000000000000000000000000000..429a0187b9eaf8dbbc44caef946be6b45877110f
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/mqtt/MqttSampleProducer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.example.mqtt;
+
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MqttSampleProducer {
+
+ private static Logger log = LoggerFactory.getLogger(MqttSampleProducer.class);
+
+ public static void main(String[] args) throws InterruptedException {
+ String topic = "mqtt-sample";
+ String messageContent = "hello mqtt";
+ int qos = 0;
+ String broker = "tcp://127.0.0.1:1883";
+ String clientId = "JavaSampleProducer";
+
+ MemoryPersistence persistence = new MemoryPersistence();
+
+ try {
+ MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
+ MqttConnectOptions connOpts = new MqttConnectOptions();
+ connOpts.setCleanSession(true);
+ connOpts.setKeepAliveInterval(6000);
+ log.info("Connecting to broker: " + broker);
+ sampleClient.connect(connOpts);
+ log.info("Connected");
+ log.info("Publishing message: " + messageContent);
+ MqttMessage message = new MqttMessage(messageContent.getBytes());
+ message.setQos(qos);
+ message.setRetained(true);
+ sampleClient.publish(topic, message);
+ log.info("Message published");
+ /*sampleClient.disconnect();
+ log.info("Disconnected");*/
+ Thread.sleep(10000000);
+ } catch (MqttException me) {
+ log.error("reason " + me.getReasonCode());
+ log.error("msg " + me.getMessage());
+ log.error("loc " + me.getLocalizedMessage());
+ log.error("cause " + me.getCause());
+ log.error("excep " + me);
+ me.printStackTrace();
+ System.exit(1);
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index e18627d28af45b06d4458a6be5e4f23459658105..8b0ce4223e9a455928f1714f756c40750b74476e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -648,6 +648,11 @@
simpleclient_hotspot
0.6.0
+
+ org.eclipse.paho
+ org.eclipse.paho.client.mqttv3
+ 1.2.0
+
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
index e823a69c767d1ec8646cc769b65e97e1f5f13dc9..7c008fafba1d689c4010344efa9576da4a779b65 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
@@ -80,6 +80,8 @@ public class RemotingCommand {
private transient byte[] body;
+ private Object payload;
+
public RemotingCommand() {
}
@@ -259,6 +261,14 @@ public class RemotingCommand {
this.body = body;
}
+ public Object getPayload() {
+ return payload;
+ }
+
+ public void setPayload(Object payload) {
+ this.payload = payload;
+ }
+
public HashMap getExtFields() {
return extFields;
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttMessage2RemotingCommandHandler.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttMessage2RemotingCommandHandler.java
index 2199c77752917523d805d787e6b5958655ca85e0..41c21741de2d026ce925775728c81180a7fcaa26 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttMessage2RemotingCommandHandler.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttMessage2RemotingCommandHandler.java
@@ -42,7 +42,7 @@ public class MqttMessage2RemotingCommandHandler extends MessageToMessageDecoder<
if (!(msg instanceof MqttMessage)) {
return;
}
- RemotingCommand requestCommand = null;
+ RemotingCommand requestCommand;
Message2MessageEncodeDecode message2MessageEncodeDecode = EncodeDecodeDispatcher
.getEncodeDecodeDispatcher().get(msg.fixedHeader().messageType());
if (message2MessageEncodeDecode == null) {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RemotingCommand2MqttMessageHandler.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RemotingCommand2MqttMessageHandler.java
index af4dc9141df41d255c72c8e1e4b8c952fc174ec9..e5c34c0ecbd7b6cf8e0712fb22ca42cdb7643a9d 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RemotingCommand2MqttMessageHandler.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RemotingCommand2MqttMessageHandler.java
@@ -43,8 +43,8 @@ public class RemotingCommand2MqttMessageHandler extends MessageToMessageEncoder<
if (!(msg instanceof RemotingCommand)) {
return;
}
- MqttMessage mqttMessage = null;
- MqttHeader mqttHeader = (MqttHeader) msg.decodeCommandCustomHeader(MqttHeader.class);
+ MqttMessage mqttMessage;
+ MqttHeader mqttHeader = (MqttHeader) msg.readCustomHeader();
Message2MessageEncodeDecode message2MessageEncodeDecode = EncodeDecodeDispatcher
.getEncodeDecodeDispatcher().get(
MqttMessageType.valueOf(mqttHeader.getMessageType()));
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttConnectPayload.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttConnectPayload.java
deleted file mode 100644
index 32735ccd506c9d559f4a6d69b35c1900bcec66d3..0000000000000000000000000000000000000000
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttConnectPayload.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.transport.mqtt;
-
-import io.netty.handler.codec.mqtt.MqttConnectMessage;
-import io.netty.handler.codec.mqtt.MqttConnectPayload;
-import io.netty.util.internal.StringUtil;
-import java.io.UnsupportedEncodingException;
-import org.apache.rocketmq.remoting.common.RemotingUtil;
-import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
-
-/**
- * Payload of {@link MqttConnectMessage}
- */
-public final class RocketMQMqttConnectPayload extends RemotingSerializable {
-
- private String clientIdentifier;
- private String willTopic;
- private String willMessage;
- private String userName;
- private String password;
-
- public RocketMQMqttConnectPayload(
- String clientIdentifier,
- String willTopic,
- String willMessage,
- String userName,
- String password) {
- this.clientIdentifier = clientIdentifier;
- this.willTopic = willTopic;
- this.willMessage = willMessage;
- this.userName = userName;
- this.password = password;
- }
-
- public static RocketMQMqttConnectPayload fromMqttConnectPayload(MqttConnectPayload payload) {
- return new RocketMQMqttConnectPayload(payload.clientIdentifier(), payload.willTopic(),
- payload.willMessage(), payload.userName(), payload.password());
- }
-
- public MqttConnectPayload toMqttConnectPayload() throws UnsupportedEncodingException {
- return new MqttConnectPayload(this.clientIdentifier, this.willTopic, this.willMessage.getBytes(
- RemotingUtil.REMOTING_CHARSET), this.userName, this.password.getBytes(RemotingUtil.REMOTING_CHARSET));
- }
- public String getClientIdentifier() {
- return clientIdentifier;
- }
-
- public String getWillTopic() {
- return willTopic;
- }
-
- public String getWillMessage() {
- return willMessage;
- }
-
- public String getUserName() {
- return userName;
- }
-
- public String getPassword() {
- return password;
- }
-
- public void setClientIdentifier(String clientIdentifier) {
- this.clientIdentifier = clientIdentifier;
- }
-
- public void setWillTopic(String willTopic) {
- this.willTopic = willTopic;
- }
-
- public void setWillMessage(String willMessage) {
- this.willMessage = willMessage;
- }
-
- public void setUserName(String userName) {
- this.userName = userName;
- }
-
- public void setPassword(String password) {
- this.password = password;
- }
-
- @Override
- public String toString() {
- return new StringBuilder(StringUtil.simpleClassName(this))
- .append('[')
- .append("clientIdentifier=").append(clientIdentifier)
- .append(", willTopic=").append(willTopic)
- .append(", willMessage=").append(willMessage)
- .append(", userName=").append(userName)
- .append(", password=").append(password)
- .append(']')
- .toString();
- }
-}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttSubAckPayload.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttSubAckPayload.java
deleted file mode 100644
index 495ffbcf77786a856420ac0125416605d28170d3..0000000000000000000000000000000000000000
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttSubAckPayload.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.transport.mqtt;
-
-import io.netty.handler.codec.mqtt.MqttSubAckMessage;
-import io.netty.handler.codec.mqtt.MqttSubAckPayload;
-import io.netty.util.internal.StringUtil;
-import java.io.UnsupportedEncodingException;
-import java.util.Collections;
-import java.util.List;
-import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
-
-/**
- * Payload of {@link MqttSubAckMessage}
- */
-public final class RocketMQMqttSubAckPayload extends RemotingSerializable {
-
- private List grantedQoSLevels;
-
- public RocketMQMqttSubAckPayload(List grantedQoSLevels) {
- this.grantedQoSLevels = Collections.unmodifiableList(grantedQoSLevels);
- }
-
- public List getGrantedQoSLevels() {
- return grantedQoSLevels;
- }
-
- public void setGrantedQoSLevels(List grantedQoSLevels) {
- this.grantedQoSLevels = grantedQoSLevels;
- }
-
- public static RocketMQMqttSubAckPayload fromMqttSubAckPayload(MqttSubAckPayload payload) {
- return new RocketMQMqttSubAckPayload(payload.grantedQoSLevels());
- }
-
- public MqttSubAckPayload toMqttSubAckPayload() throws UnsupportedEncodingException {
- return new MqttSubAckPayload(this.grantedQoSLevels);
- }
-
- @Override
- public String toString() {
- return StringUtil.simpleClassName(this) + '[' + "grantedQoSLevels=" + this.grantedQoSLevels + ']';
- }
-}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttSubscribePayload.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttSubscribePayload.java
deleted file mode 100644
index 8b2a19d9f951f59b440135f6fb1e9c157324fd9f..0000000000000000000000000000000000000000
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttSubscribePayload.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.transport.mqtt;
-
-import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
-import io.netty.handler.codec.mqtt.MqttSubscribePayload;
-import io.netty.handler.codec.mqtt.MqttTopicSubscription;
-import io.netty.util.internal.StringUtil;
-import java.io.UnsupportedEncodingException;
-import java.util.Collections;
-import java.util.List;
-import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
-
-/**
- * Payload of {@link MqttSubscribeMessage}
- */
-public final class RocketMQMqttSubscribePayload extends RemotingSerializable {
-
- private List topicSubscriptions;
-
- public RocketMQMqttSubscribePayload(List topicSubscriptions) {
- this.topicSubscriptions = Collections.unmodifiableList(topicSubscriptions);
- }
-
- public List getTopicSubscriptions() {
- return topicSubscriptions;
- }
-
- public void setTopicSubscriptions(
- List topicSubscriptions) {
- this.topicSubscriptions = topicSubscriptions;
- }
-
- public static RocketMQMqttSubscribePayload fromMqttSubscribePayload(MqttSubscribePayload payload) {
- return new RocketMQMqttSubscribePayload(payload.topicSubscriptions());
- }
-
- public MqttSubscribePayload toMqttSubscribePayload() throws UnsupportedEncodingException {
- return new MqttSubscribePayload(this.topicSubscriptions);
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder(StringUtil.simpleClassName(this)).append('[');
- for (int i = 0; i < topicSubscriptions.size() - 1; i++) {
- builder.append(topicSubscriptions.get(i)).append(", ");
- }
- builder.append(topicSubscriptions.get(topicSubscriptions.size() - 1));
- builder.append(']');
- return builder.toString();
- }
-}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttUnSubscribePayload.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttUnSubscribePayload.java
deleted file mode 100644
index 8e7484383d219f35138b657dddd7c6d463d08ff4..0000000000000000000000000000000000000000
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttUnSubscribePayload.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.transport.mqtt;
-
-import io.netty.handler.codec.mqtt.MqttUnsubscribePayload;
-import io.netty.util.internal.StringUtil;
-import java.io.UnsupportedEncodingException;
-import java.util.Collections;
-import java.util.List;
-import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
-
-/**
- * Payload of the {@link io.netty.handler.codec.mqtt.MqttUnsubscribeMessage}
- */
-public class RocketMQMqttUnSubscribePayload extends RemotingSerializable {
- private List topics;
-
- public RocketMQMqttUnSubscribePayload(List topics) {
- this.topics = topics;
- }
-
- public List getTopics() {
- return topics;
- }
-
- public void setTopics(List topics) {
- this.topics = Collections.unmodifiableList(topics);
- }
-
- public static RocketMQMqttUnSubscribePayload fromMqttUnSubscribePayload(MqttUnsubscribePayload payload) {
- return new RocketMQMqttUnSubscribePayload(payload.topics());
- }
-
- public MqttUnsubscribePayload toMqttUnsubscribePayload() throws UnsupportedEncodingException {
- return new MqttUnsubscribePayload(this.topics);
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder(StringUtil.simpleClassName(this)).append('[');
- for (int i = 0; i < topics.size() - 1; i++) {
- builder.append("topicName = ").append(topics.get(i)).append(", ");
- }
- builder.append("topicName = ").append(topics.get(topics.size() - 1))
- .append(']');
- return builder.toString();
- }
-}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/EncodeDecodeDispatcher.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/EncodeDecodeDispatcher.java
index 27e40cf25006e37b77b3810bbd00a7debd944dcf..6b3cb35cbadc7835ef6815325c9c555c2a5b49de 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/EncodeDecodeDispatcher.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/EncodeDecodeDispatcher.java
@@ -29,15 +29,15 @@ public class EncodeDecodeDispatcher {
encodeDecodeDispatcher.put(MqttMessageType.CONNECT, new MqttConnectEncodeDecode());
encodeDecodeDispatcher.put(MqttMessageType.CONNACK, new MqttConnectackEncodeDecode());
encodeDecodeDispatcher.put(MqttMessageType.DISCONNECT, null);
- encodeDecodeDispatcher.put(MqttMessageType.PUBLISH, null);
- encodeDecodeDispatcher.put(MqttMessageType.PUBACK, null);
+ encodeDecodeDispatcher.put(MqttMessageType.PUBLISH, new MqttPublishEncodeDecode());
+ encodeDecodeDispatcher.put(MqttMessageType.PUBACK, new MqttPubackEncodeDecode());
encodeDecodeDispatcher.put(MqttMessageType.PUBREC, null);
encodeDecodeDispatcher.put(MqttMessageType.PUBREL, null);
encodeDecodeDispatcher.put(MqttMessageType.PUBCOMP, null);
- encodeDecodeDispatcher.put(MqttMessageType.SUBSCRIBE, null);
- encodeDecodeDispatcher.put(MqttMessageType.SUBACK, null);
- encodeDecodeDispatcher.put(MqttMessageType.UNSUBSCRIBE, null);
- encodeDecodeDispatcher.put(MqttMessageType.UNSUBACK, null);
+ encodeDecodeDispatcher.put(MqttMessageType.SUBSCRIBE, new MqttSubscribeEncodeDecode());
+ encodeDecodeDispatcher.put(MqttMessageType.SUBACK, new MqttSubackEncodeDecode());
+ encodeDecodeDispatcher.put(MqttMessageType.UNSUBSCRIBE, new MqttUnSubscribeEncodeDecode());
+ encodeDecodeDispatcher.put(MqttMessageType.UNSUBACK, new MqttUnSubackEncodeDecode());
encodeDecodeDispatcher.put(MqttMessageType.PINGREQ, null);
encodeDecodeDispatcher.put(MqttMessageType.PINGRESP, null);
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/Message2MessageEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/Message2MessageEncodeDecode.java
index ad0570b2bf7b6586c8d93156ea5764363302167d..97d0dfd29101bbdc8c5865ab87436c95b927ae81 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/Message2MessageEncodeDecode.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/Message2MessageEncodeDecode.java
@@ -18,6 +18,7 @@
package org.apache.rocketmq.remoting.transport.mqtt.dispatcher;
import io.netty.handler.codec.mqtt.MqttMessage;
+import java.io.UnsupportedEncodingException;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -25,5 +26,5 @@ public interface Message2MessageEncodeDecode {
RemotingCommand decode(MqttMessage mqttMessage);
- MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException;
+ MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException, UnsupportedEncodingException;
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttConnectEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttConnectEncodeDecode.java
index 0f0f3d79440958787064e68dfa0cd2d68b9a0ff8..61586742889c37285aa9b514165e5c5dbe2cc51a 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttConnectEncodeDecode.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttConnectEncodeDecode.java
@@ -21,18 +21,14 @@ import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
-import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
-import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload;
public class MqttConnectEncodeDecode implements Message2MessageEncodeDecode {
@Override
public RemotingCommand decode(MqttMessage mqttMessage) {
- RocketMQMqttConnectPayload payload = RocketMQMqttConnectPayload
- .fromMqttConnectPayload(((MqttConnectMessage) mqttMessage).payload());
- RemotingCommand requestCommand = null;
+ RemotingCommand requestCommand;
MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
MqttConnectVariableHeader variableHeader = (MqttConnectVariableHeader) mqttMessage
.variableHeader();
@@ -57,9 +53,7 @@ public class MqttConnectEncodeDecode implements Message2MessageEncodeDecode {
requestCommand = RemotingCommand
.createRequestCommand(1000, mqttHeader);
- CodecHelper.makeCustomHeaderToNet(requestCommand);
-
- requestCommand.setBody(payload.encode());
+ requestCommand.setPayload(((MqttConnectMessage) mqttMessage).payload());
return requestCommand;
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttConnectackEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttConnectackEncodeDecode.java
index f7c60190c7f3b3015e8000e8fe072462d7e1bb16..1f3dc2a41b62cbd97fa3ee4ffb1ae78c4d1ba109 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttConnectackEncodeDecode.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttConnectackEncodeDecode.java
@@ -37,9 +37,7 @@ public class MqttConnectackEncodeDecode implements Message2MessageEncodeDecode {
@Override
public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException {
- MqttHeader mqttHeader = (MqttHeader) remotingCommand
- .decodeCommandCustomHeader(MqttHeader.class);
-
+ MqttHeader mqttHeader = (MqttHeader)remotingCommand.readCustomHeader();
return new MqttConnAckMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, mqttHeader.isDup(),
MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(),
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPubackEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPubackEncodeDecode.java
index fde2fcc96fdf04ced076c77ab74b3061a6b51f4c..0f11a6b8576b2288c5ef79f98946905850f4055e 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPubackEncodeDecode.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPubackEncodeDecode.java
@@ -36,8 +36,7 @@ public class MqttPubackEncodeDecode implements Message2MessageEncodeDecode {
@Override
public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException {
- MqttHeader mqttHeader = (MqttHeader) remotingCommand
- .decodeCommandCustomHeader(MqttHeader.class);
+ MqttHeader mqttHeader = (MqttHeader) remotingCommand.readCustomHeader();
return new MqttPubAckMessage(
new MqttFixedHeader(MqttMessageType.PUBACK, mqttHeader.isDup(),
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPublishEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPublishEncodeDecode.java
index ee76e95dbe9f630288591f3aa199970c07c339e6..1b30acd10701db68a2755bdbfb06df5410129a0f 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPublishEncodeDecode.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPublishEncodeDecode.java
@@ -18,11 +18,12 @@
package org.apache.rocketmq.remoting.transport.mqtt.dispatcher;
import io.netty.buffer.ByteBuf;
-import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
-import org.apache.rocketmq.remoting.netty.CodecHelper;
+import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
+import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
@@ -30,13 +31,9 @@ public class MqttPublishEncodeDecode implements Message2MessageEncodeDecode {
@Override
public RemotingCommand decode(MqttMessage mqttMessage) {
- ByteBuf byteBuf = ((MqttPublishMessage) mqttMessage).payload();
- byte[] payload = new byte[byteBuf.readableBytes()];
- byteBuf.readBytes(payload);
-
- RemotingCommand requestCommand = null;
+ RemotingCommand requestCommand;
MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
- MqttConnAckVariableHeader variableHeader = (MqttConnAckVariableHeader) mqttMessage
+ MqttPublishVariableHeader variableHeader = (MqttPublishVariableHeader) mqttMessage
.variableHeader();
MqttHeader mqttHeader = new MqttHeader();
@@ -46,19 +43,23 @@ public class MqttPublishEncodeDecode implements Message2MessageEncodeDecode {
mqttHeader.setRetain(mqttFixedHeader.isRetain());
mqttHeader.setRemainingLength(mqttFixedHeader.remainingLength());
- mqttHeader.setConnectReturnCode(variableHeader.connectReturnCode().name());
- mqttHeader.setSessionPresent(variableHeader.isSessionPresent());
+ mqttHeader.setTopicName(variableHeader.topicName());
+ mqttHeader.setPacketId(variableHeader.packetId());
requestCommand = RemotingCommand
.createRequestCommand(1000, mqttHeader);
- CodecHelper.makeCustomHeaderToNet(requestCommand);
-
- requestCommand.setBody(payload);
+ //invoke copy to generate a new ByteBuf or increase refCnt by 1 by invoking retain() method, because release method is invoked in Message2MessageEncodeDecode.channelRead
+ requestCommand.setPayload(((MqttPublishMessage) mqttMessage).payload().copy());
return requestCommand;
}
@Override
public MqttMessage encode(RemotingCommand remotingCommand) {
- return null;
+ MqttHeader mqttHeader = (MqttHeader) remotingCommand.readCustomHeader();
+ return new MqttPublishMessage(
+ new MqttFixedHeader(MqttMessageType.PUBLISH, mqttHeader.isDup(),
+ MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(),
+ mqttHeader.getRemainingLength()),
+ new MqttPublishVariableHeader(mqttHeader.getTopicName(), mqttHeader.getPacketId()), (ByteBuf) remotingCommand.getPayload());
}
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubackEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubackEncodeDecode.java
index 8c49e61cd787694fccaaa582814bfcc37bfa7768..c87bc14979e9baa27f5e49627eeadbfd1d4daaac 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubackEncodeDecode.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubackEncodeDecode.java
@@ -24,6 +24,7 @@ import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
+import java.io.UnsupportedEncodingException;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
@@ -36,14 +37,12 @@ public class MqttSubackEncodeDecode implements Message2MessageEncodeDecode {
}
@Override
- public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException {
- MqttHeader mqttHeader = (MqttHeader) remotingCommand
- .decodeCommandCustomHeader(MqttHeader.class);
-
+ public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException, UnsupportedEncodingException {
+ MqttHeader mqttHeader = (MqttHeader) remotingCommand.readCustomHeader();
return new MqttSubAckMessage(
new MqttFixedHeader(MqttMessageType.SUBACK, mqttHeader.isDup(),
MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(),
mqttHeader.getRemainingLength()),
- MqttMessageIdVariableHeader.from(mqttHeader.getMessageId()), new MqttSubAckPayload());
+ MqttMessageIdVariableHeader.from(mqttHeader.getMessageId()), (MqttSubAckPayload) remotingCommand.getPayload());
}
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubscribeEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubscribeEncodeDecode.java
index 338672810284714bdfa65ee1d03819d68d0a9a70..dec009cf49d8573c07fc3b0441e4174c8a770d25 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubscribeEncodeDecode.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubscribeEncodeDecode.java
@@ -21,19 +21,14 @@ import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
-import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
-import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttSubscribePayload;
public class MqttSubscribeEncodeDecode implements Message2MessageEncodeDecode {
@Override
public RemotingCommand decode(MqttMessage mqttMessage) {
- RocketMQMqttSubscribePayload payload = RocketMQMqttSubscribePayload
- .fromMqttSubscribePayload(((MqttSubscribeMessage) mqttMessage).payload());
-
- RemotingCommand requestCommand = null;
+ RemotingCommand requestCommand;
MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) mqttMessage
.variableHeader();
@@ -49,9 +44,7 @@ public class MqttSubscribeEncodeDecode implements Message2MessageEncodeDecode {
requestCommand = RemotingCommand
.createRequestCommand(1000, mqttHeader);
- CodecHelper.makeCustomHeaderToNet(requestCommand);
-
- requestCommand.setBody(payload.encode());
+ requestCommand.setPayload(((MqttSubscribeMessage) mqttMessage).payload());
return requestCommand;
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubackEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubackEncodeDecode.java
index 21f51b640baf67a75dd106c5e9ddbdeda434956d..df0a1d467bcc312049ed1954bce0bba0e6e4632f 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubackEncodeDecode.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubackEncodeDecode.java
@@ -36,8 +36,7 @@ public class MqttUnSubackEncodeDecode implements Message2MessageEncodeDecode {
@Override
public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException {
- MqttHeader mqttHeader = (MqttHeader) remotingCommand
- .decodeCommandCustomHeader(MqttHeader.class);
+ MqttHeader mqttHeader = (MqttHeader) remotingCommand.readCustomHeader();
return new MqttUnsubAckMessage(
new MqttFixedHeader(MqttMessageType.UNSUBACK, mqttHeader.isDup(),
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubscribeEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubscribeEncodeDecode.java
index 88c8518b8900f8c17d51bb33e0731cd4efda7d67..957754bbb9262e006516f839f1053672c0ea4d78 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubscribeEncodeDecode.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubscribeEncodeDecode.java
@@ -21,19 +21,14 @@ import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
-import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
-import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttUnSubscribePayload;
public class MqttUnSubscribeEncodeDecode implements Message2MessageEncodeDecode {
@Override
public RemotingCommand decode(MqttMessage mqttMessage) {
- RocketMQMqttUnSubscribePayload payload = RocketMQMqttUnSubscribePayload
- .fromMqttUnSubscribePayload(((MqttUnsubscribeMessage) mqttMessage).payload());
-
- RemotingCommand requestCommand = null;
+ RemotingCommand requestCommand;
MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) mqttMessage
.variableHeader();
@@ -44,14 +39,10 @@ public class MqttUnSubscribeEncodeDecode implements Message2MessageEncodeDecode
mqttHeader.setQosLevel(mqttFixedHeader.qosLevel().value());
mqttHeader.setRetain(mqttFixedHeader.isRetain());
mqttHeader.setRemainingLength(mqttFixedHeader.remainingLength());
-
mqttHeader.setMessageId(variableHeader.messageId());
- requestCommand = RemotingCommand
- .createRequestCommand(1000, mqttHeader);
- CodecHelper.makeCustomHeaderToNet(requestCommand);
-
- requestCommand.setBody(payload.encode());
+ requestCommand = RemotingCommand.createRequestCommand(1000, mqttHeader);
+ requestCommand.setPayload(((MqttUnsubscribeMessage) mqttMessage).payload());
return requestCommand;
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
index 1d6397c032c3d008982ce2246eb40edb837f639e..f06fa0f40ee5c031e848b713fb09b54353b5b8b2 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
@@ -72,6 +72,7 @@ import org.apache.rocketmq.snode.service.WillMessageService;
import org.apache.rocketmq.snode.service.impl.ClientServiceImpl;
import org.apache.rocketmq.snode.service.impl.LocalEnodeServiceImpl;
import org.apache.rocketmq.snode.service.impl.MetricsServiceImpl;
+import org.apache.rocketmq.snode.service.impl.MqttPushServiceImpl;
import org.apache.rocketmq.snode.service.impl.NnodeServiceImpl;
import org.apache.rocketmq.snode.service.impl.PushServiceImpl;
import org.apache.rocketmq.snode.service.impl.RemoteEnodeServiceImpl;
@@ -118,6 +119,7 @@ public class SnodeController {
private SlowConsumerService slowConsumerService;
private MetricsService metricsService;
private WillMessageService willMessageService;
+ private MqttPushServiceImpl mqttPushService;
private final ScheduledExecutorService scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
@@ -136,11 +138,15 @@ public class SnodeController {
}
this.nnodeService = new NnodeServiceImpl(this);
this.scheduledService = new ScheduledServiceImpl(this);
- this.remotingClient = RemotingClientFactory.getInstance().createRemotingClient()
- .init(this.getNettyClientConfig(), null);
+ this.remotingClient = RemotingClientFactory.getInstance().createRemotingClient();
+ if (this.remotingClient != null) {
+ this.remotingClient.init(this.getNettyClientConfig(), null);
+ }
this.mqttRemotingClient = RemotingClientFactory.getInstance()
- .createRemotingClient(RemotingUtil.MQTT_PROTOCOL)
- .init(this.getNettyClientConfig(), null);
+ .createRemotingClient(RemotingUtil.MQTT_PROTOCOL);
+ if (this.mqttRemotingClient != null) {
+ this.mqttRemotingClient.init(this.getNettyClientConfig(), null);
+ }
this.sendMessageExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeSendMessageMinPoolSize(),
@@ -211,6 +217,7 @@ public class SnodeController {
this.slowConsumerService = new SlowConsumerServiceImpl(this);
this.metricsService = new MetricsServiceImpl();
this.willMessageService = new WillMessageServiceImpl(this);
+ this.mqttPushService = new MqttPushServiceImpl(this);
}
public SnodeConfig getSnodeConfig() {
@@ -233,15 +240,21 @@ public class SnodeController {
}
public boolean initialize() {
- this.snodeServer = RemotingServerFactory.getInstance().createRemotingServer().init(this.nettyServerConfig, this.clientHousekeepingService);
- this.mqttRemotingServer = RemotingServerFactory.getInstance().createRemotingServer(RemotingUtil.MQTT_PROTOCOL)
- .init(this.nettyServerConfig, this.clientHousekeepingService);
- this.registerProcessor();
initSnodeInterceptorGroup();
initRemotingServerInterceptorGroup();
initAclInterceptorGroup();
- this.snodeServer.registerInterceptorGroup(this.remotingServerInterceptorGroup);
- this.mqttRemotingServer.registerInterceptorGroup(this.remotingServerInterceptorGroup);
+ this.snodeServer = RemotingServerFactory.getInstance().createRemotingServer();
+ if (this.snodeServer != null) {
+ this.snodeServer.init(this.nettyServerConfig, this.clientHousekeepingService);
+ this.snodeServer.registerInterceptorGroup(this.remotingServerInterceptorGroup);
+ }
+ this.mqttRemotingServer = RemotingServerFactory.getInstance().createRemotingServer(
+ RemotingUtil.MQTT_PROTOCOL);
+ if (this.mqttRemotingServer != null) {
+ this.mqttRemotingServer.init(this.nettyServerConfig, this.clientHousekeepingService);
+ this.mqttRemotingServer.registerInterceptorGroup(this.remotingServerInterceptorGroup);
+ }
+ registerProcessor();
return true;
}
@@ -313,30 +326,40 @@ public class SnodeController {
}
public void registerProcessor() {
- this.snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor);
- this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor);
- this.snodeServer.registerProcessor(RequestCode.HEART_BEAT, heartbeatProcessor, this.heartbeatExecutor);
- this.snodeServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, heartbeatProcessor, this.heartbeatExecutor);
- this.snodeServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, heartbeatProcessor, this.heartbeatExecutor);
- this.snodeServer.registerProcessor(RequestCode.PULL_MESSAGE, pullMessageProcessor, this.pullMessageExecutor);
- this.snodeServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
- this.snodeServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
- this.snodeServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
- this.snodeServer.registerProcessor(RequestCode.GET_MIN_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
- this.snodeServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
- this.snodeServer.registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor, this.consumerManageExecutor);
- this.snodeServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, consumerManageProcessor, this.consumerManageExecutor);
- this.snodeServer.registerProcessor(RequestCode.LOCK_BATCH_MQ, consumerManageProcessor, this.consumerManageExecutor);
- this.snodeServer.registerProcessor(RequestCode.UNLOCK_BATCH_MQ, consumerManageProcessor, this.consumerManageExecutor);
- this.mqttRemotingServer.registerProcessor(RequestCode.MQTT_MESSAGE, defaultMqttMessageProcessor, handleMqttMessageExecutor);
+ if (snodeServer != null) {
+ this.snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor);
+ this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor);
+ this.snodeServer.registerProcessor(RequestCode.HEART_BEAT, heartbeatProcessor, this.heartbeatExecutor);
+ this.snodeServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, heartbeatProcessor, this.heartbeatExecutor);
+ this.snodeServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, heartbeatProcessor, this.heartbeatExecutor);
+ this.snodeServer.registerProcessor(RequestCode.PULL_MESSAGE, pullMessageProcessor, this.pullMessageExecutor);
+ this.snodeServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
+ this.snodeServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
+ this.snodeServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
+ this.snodeServer.registerProcessor(RequestCode.GET_MIN_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
+ this.snodeServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
+ this.snodeServer.registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor, this.consumerManageExecutor);
+ this.snodeServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, consumerManageProcessor, this.consumerManageExecutor);
+ this.snodeServer.registerProcessor(RequestCode.LOCK_BATCH_MQ, consumerManageProcessor, this.consumerManageExecutor);
+ this.snodeServer.registerProcessor(RequestCode.UNLOCK_BATCH_MQ, consumerManageProcessor, this.consumerManageExecutor);
+ }
+ if (mqttRemotingServer != null) {
+ this.mqttRemotingServer.registerProcessor(RequestCode.MQTT_MESSAGE, defaultMqttMessageProcessor, handleMqttMessageExecutor);
+ }
}
public void start() {
initialize();
- this.snodeServer.start();
- this.mqttRemotingServer.start();
+ if (snodeServer != null) {
+ this.snodeServer.start();
+ }
+ if (mqttRemotingServer != null) {
+ this.mqttRemotingServer.start();
+ }
this.remotingClient.start();
- this.mqttRemotingClient.start();
+ if (mqttRemotingClient != null) {
+ this.mqttRemotingClient.start();
+ }
this.scheduledService.startScheduleTask();
this.clientHousekeepingService.start(this.snodeConfig.getHouseKeepingInterval());
this.metricsService.start(this.snodeConfig.getMetricsExportPort());
@@ -526,4 +549,12 @@ public class SnodeController {
WillMessageService willMessageService) {
this.willMessageService = willMessageService;
}
+
+ public MqttPushServiceImpl getMqttPushService() {
+ return mqttPushService;
+ }
+
+ public void setMqttPushService(MqttPushServiceImpl mqttPushService) {
+ this.mqttPushService = mqttPushService;
+ }
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java
index 70797711d8d9e11dd3ab32a03b3f33f7a1efe088..a8d5b7ddab9d5bbc2005f3ecb45ba18091defa33 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java
@@ -38,6 +38,7 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -119,6 +120,22 @@ public class SnodeStartup {
System.exit(-2);
}
+ MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), snodeConfig);
+ String namesrvAddr = snodeConfig.getNamesrvAddr();
+ if (null != namesrvAddr) {
+ try {
+ String[] addrArray = namesrvAddr.split(";");
+ for (String addr : addrArray) {
+ RemotingUtil.string2SocketAddress(addr);
+ }
+ } catch (Exception e) {
+ System.out.printf(
+ "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
+ namesrvAddr);
+ System.exit(-3);
+ }
+ }
+
MixAll.printObjectProperties(log, snodeConfig);
MixAll.printObjectProperties(log, snodeConfig.getNettyServerConfig());
MixAll.printObjectProperties(log, snodeConfig.getNettyClientConfig());
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/Client.java b/snode/src/main/java/org/apache/rocketmq/snode/client/Client.java
index 5ebe7454391c9b3fa30a2386879fcaeac8150052..abaf30dfd36f8f4ec91d5dfef7a133095829bd73 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/Client.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/Client.java
@@ -44,6 +44,8 @@ public class Client {
private boolean cleanSession;
+ private boolean willFlag;
+
private String snodeAddress;
public ClientRole getClientRole() {
@@ -71,13 +73,14 @@ public class Client {
language == client.language &&
isConnected == client.isConnected &&
cleanSession == client.cleanSession &&
+ willFlag == client.willFlag &&
snodeAddress == client.snodeAddress;
}
@Override
public int hashCode() {
return Objects.hash(clientRole, clientId, groups, remotingChannel, heartbeatInterval,
- lastUpdateTimestamp, version, language, isConnected, cleanSession, snodeAddress);
+ lastUpdateTimestamp, version, language, isConnected, cleanSession, willFlag, snodeAddress);
}
public RemotingChannel getRemotingChannel() {
@@ -144,6 +147,14 @@ public class Client {
this.cleanSession = cleanSession;
}
+ public boolean isWillFlag() {
+ return willFlag;
+ }
+
+ public void setWillFlag(boolean willFlag) {
+ this.willFlag = willFlag;
+ }
+
public String getSnodeAddress() {
return snodeAddress;
}
@@ -173,6 +184,7 @@ public class Client {
", language=" + language +
", isConnected=" + isConnected +
", cleanSession=" + cleanSession +
+ ", willFlag=" + willFlag +
", snodeAddress=" + snodeAddress +
'}';
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java b/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java
index 5e75322c4bca9cf4036dc6605746bce06aae0f3d..899a9ef24e0cf7d5474ab76c6cecbe58536ad681 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java
@@ -43,7 +43,7 @@ public class ClientHousekeepingService implements ChannelEventListener {
public void start(long interval) {
this.producerManager.startScan(interval);
this.consumerManager.startScan(interval);
- this.iotClientManager.startScan(interval);
+// this.iotClientManager.startScan(interval);
}
public void shutdown() {
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientManagerImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientManagerImpl.java
index 133f346a5bba3066fd7d38cd98165b94a38797a0..d0cddceca61d28d0caf3b855ba19a2a2d815cffe 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientManagerImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientManagerImpl.java
@@ -144,7 +144,7 @@ public abstract class ClientManagerImpl implements ClientManager {
return updated;
}
- private void removeClient(String groupId, RemotingChannel remotingChannel) {
+ protected void removeClient(String groupId, RemotingChannel remotingChannel) {
ConcurrentHashMap channelTable = groupClientTable.get(groupId);
if (channelTable != null) {
Client prev = channelTable.remove(remotingChannel);
@@ -211,6 +211,9 @@ public abstract class ClientManagerImpl implements ClientManager {
}
ConcurrentHashMap channelClientMap = groupClientTable
.get(groupId);
+ if (remotingChannel instanceof NettyChannelHandlerContextImpl) {
+ remotingChannel = new NettyChannelImpl(((NettyChannelHandlerContextImpl) remotingChannel).getChannelHandlerContext().channel());
+ }
return channelClientMap.get(remotingChannel);
}
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/IOTClientManagerImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/IOTClientManagerImpl.java
index 442850f0d941c0399ef2128d6ed2ecd8acda6085..66ec2f7ef9ef8083cef70681a65894af92166373 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/IOTClientManagerImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/IOTClientManagerImpl.java
@@ -19,9 +19,10 @@ package org.apache.rocketmq.snode.client.impl;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.protocol.heartbeat.MqttSubscriptionData;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
@@ -35,7 +36,7 @@ public class IOTClientManagerImpl extends ClientManagerImpl {
public static final String IOT_GROUP = "IOT_GROUP";
private final SnodeController snodeController;
- private final ConcurrentHashMap>> topic2SubscriptionTable = new ConcurrentHashMap<>(
+ private final ConcurrentHashMap>> topic2SubscriptionTable = new ConcurrentHashMap<>(
1024);
private final ConcurrentHashMap clientId2Subscription = new ConcurrentHashMap<>(1024);
@@ -47,9 +48,25 @@ public class IOTClientManagerImpl extends ClientManagerImpl {
//do the logic when client sends unsubscribe packet.
}
+ @Override public void onClose(Set groups, RemotingChannel remotingChannel) {
+ for (String groupId : groups) {
+ //remove client after invoking onClosed method(client may be used in onClosed)
+ onClosed(groupId, remotingChannel);
+ removeClient(groupId, remotingChannel);
+ }
+ }
+
@Override
public void onClosed(String group, RemotingChannel remotingChannel) {
//do the logic when connection is closed by any reason.
+ //step1. Clean subscription data if cleanSession=1
+ Client client = this.getClient(IOT_GROUP, remotingChannel);
+ if (client.isCleanSession()) {
+ cleanSessionState(client.getClientId());
+ }
+ //step2. Publish will message associated with current connection(Question: Does will message need to be deleted after publishing.)
+
+ //step3. If will retain is true, add the will message to retain message.
}
@Override
@@ -63,10 +80,10 @@ public class IOTClientManagerImpl extends ClientManagerImpl {
public void cleanSessionState(String clientId) {
clientId2Subscription.remove(clientId);
- for (Iterator>>> iterator = topic2SubscriptionTable.entrySet().iterator(); iterator.hasNext(); ) {
- Map.Entry>> next = iterator.next();
- for (Iterator>> iterator1 = next.getValue().entrySet().iterator(); iterator1.hasNext(); ) {
- Map.Entry> next1 = iterator1.next();
+ for (Iterator>>> iterator = topic2SubscriptionTable.entrySet().iterator(); iterator.hasNext(); ) {
+ Map.Entry>> next = iterator.next();
+ for (Iterator>> iterator1 = next.getValue().entrySet().iterator(); iterator1.hasNext(); ) {
+ Map.Entry> next1 = iterator1.next();
if (!next1.getKey().getClientId().equals(clientId)) {
continue;
}
@@ -84,7 +101,7 @@ public class IOTClientManagerImpl extends ClientManagerImpl {
return snodeController;
}
- public ConcurrentHashMap>> getTopic2SubscriptionTable() {
+ public ConcurrentHashMap>> getTopic2SubscriptionTable() {
return topic2SubscriptionTable;
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java
index 80ff2d1fb6c01b6b6a66732860ea9f2071631eae..8ae898443211684b8ad7f3c69f0dd17401eb5f4b 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java
@@ -17,15 +17,20 @@
package org.apache.rocketmq.snode.processor;
-import com.alibaba.fastjson.JSON;
+import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
+import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
+import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
+import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
+import io.netty.handler.codec.mqtt.MqttSubscribePayload;
import java.io.UnsupportedEncodingException;
-import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -33,11 +38,9 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RequestProcessor;
-import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
-import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.processor.mqtthandler.MessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler;
@@ -83,20 +86,31 @@ public class DefaultMqttMessageProcessor implements RequestProcessor {
@Override
public RemotingCommand processRequest(RemotingChannel remotingChannel, RemotingCommand message)
throws RemotingCommandException, UnsupportedEncodingException {
- MqttHeader mqttHeader = (MqttHeader) message.decodeCommandCustomHeader(MqttHeader.class);
+ MqttHeader mqttHeader = (MqttHeader) message.readCustomHeader();
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.valueOf(mqttHeader.getMessageType()),
mqttHeader.isDup(), MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(),
mqttHeader.getRemainingLength());
MqttMessage mqttMessage = null;
switch (fixedHeader.messageType()) {
case CONNECT:
- MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader(
+ MqttConnectVariableHeader mqttConnectVariableHeader = new MqttConnectVariableHeader(
mqttHeader.getName(), mqttHeader.getVersion(), mqttHeader.isHasUserName(),
mqttHeader.isHasPassword(), mqttHeader.isWillRetain(),
mqttHeader.getWillQos(), mqttHeader.isWillFlag(),
mqttHeader.isCleanSession(), mqttHeader.getKeepAliveTimeSeconds());
- RocketMQMqttConnectPayload payload = decode(message.getBody(), RocketMQMqttConnectPayload.class);
- mqttMessage = new MqttConnectMessage(fixedHeader, variableHeader, payload.toMqttConnectPayload());
+ MqttConnectPayload mqttConnectPayload = (MqttConnectPayload) message.getPayload();
+ mqttMessage = new MqttConnectMessage(fixedHeader, mqttConnectVariableHeader, mqttConnectPayload);
+ break;
+ case PUBLISH:
+ MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(mqttHeader.getTopicName(), mqttHeader.getPacketId());
+ mqttMessage = new MqttPublishMessage(fixedHeader, mqttPublishVariableHeader, (ByteBuf) message.getPayload());
+ break;
+ case SUBSCRIBE:
+ MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(mqttHeader.getMessageId());
+ mqttMessage = new MqttSubscribeMessage(fixedHeader, mqttMessageIdVariableHeader, (MqttSubscribePayload) message.getPayload());
+ break;
+ case UNSUBSCRIBE:
+ case PINGREQ:
case DISCONNECT:
}
return type2handler.get(MqttMessageType.valueOf(mqttHeader.getMessageType())).handleMessage(mqttMessage, remotingChannel);
@@ -107,11 +121,6 @@ public class DefaultMqttMessageProcessor implements RequestProcessor {
return false;
}
- private T decode(final byte[] data, Class classOfT) {
- final String json = new String(data, Charset.forName(RemotingUtil.REMOTING_CHARSET));
- return JSON.parseObject(json, classOfT);
- }
-
private void registerMessageHandler(MqttMessageType type, MessageHandler handler) {
type2handler.put(type, handler);
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttConnectMessageHandler.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttConnectMessageHandler.java
index 8f4a412faa27b2f8a586c934fce4967301bfc669..96f2843b9839da4071842aaa7d7fb9170c6457f2 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttConnectMessageHandler.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttConnectMessageHandler.java
@@ -23,6 +23,7 @@ import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
+import java.util.HashSet;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.mqtt.WillMessage;
@@ -92,6 +93,7 @@ public class MqttConnectMessageHandler implements MessageHandler {
}
//treat a second CONNECT packet as a protocol violation and disconnect
if (isConnected(remotingChannel, payload.clientIdentifier())) {
+ log.error("This client has been connected. The second CONNECT packet is treated as a protocol vialation and the connection will be closed.");
remotingChannel.close();
return null;
}
@@ -118,6 +120,7 @@ public class MqttConnectMessageHandler implements MessageHandler {
Client client = new Client();
client.setClientId(payload.clientIdentifier());
client.setClientRole(ClientRole.IOTCLIENT);
+ client.setGroups(new HashSet(){{add("IOT_GROUP");}});
client.setConnected(true);
client.setRemotingChannel(remotingChannel);
client.setLastUpdateTimestamp(System.currentTimeMillis());
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttDisconnectMessageHandler.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttDisconnectMessageHandler.java
index b5229478a96befc3b9f2227b673afed1324161ea..66affac46f45ad59ea5974f918df091495805249 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttDisconnectMessageHandler.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttDisconnectMessageHandler.java
@@ -62,7 +62,9 @@ public class MqttDisconnectMessageHandler implements MessageHandler {
snodeController.getWillMessageService().deleteWillMessage(client.getClientId());
}
client.setConnected(false);
- remotingChannel.close();
+ if (remotingChannel.isActive()) {
+ remotingChannel.close();
+ }
return null;
}
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPublishMessageHandler.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPublishMessageHandler.java
index 6fec9f4808852c6ef1ddbf67f8e2d1db3362cd69..17776ea0f452c7831d3babe2f2f7bb88cb6f6edc 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPublishMessageHandler.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPublishMessageHandler.java
@@ -17,13 +17,27 @@
package org.apache.rocketmq.snode.processor.mqtthandler;
+import io.netty.buffer.ByteBuf;
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
+import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.snode.SnodeController;
+import org.apache.rocketmq.snode.exception.WrongMessageTypeException;
+import org.apache.rocketmq.snode.util.MqttUtil;
public class MqttPublishMessageHandler implements MessageHandler {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
+
private final SnodeController snodeController;
public MqttPublishMessageHandler(SnodeController snodeController) {
@@ -32,7 +46,40 @@ public class MqttPublishMessageHandler implements MessageHandler {
@Override
public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
+ if (!(message instanceof MqttPublishMessage)) {
+ log.error("Wrong message type! Expected type: PUBLISH but {} was received.", message.fixedHeader().messageType());
+ throw new WrongMessageTypeException("Wrong message type exception.");
+ }
+ MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) message;
+ MqttFixedHeader fixedHeader = mqttPublishMessage.fixedHeader();
+ MqttPublishVariableHeader variableHeader = mqttPublishMessage.variableHeader();
+ if (MqttUtil.isQosLegal(fixedHeader.qosLevel())) {
+ log.error("The QoS level should be 0 or 1 or 2. The connection will be closed. remotingChannel={}, MqttMessage={}", remotingChannel.toString(), message.toString());
+ remotingChannel.close();
+ return null;
+ }
+
+ ByteBuf payload = mqttPublishMessage.payload();
+ if (fixedHeader.qosLevel().equals(MqttQoS.AT_MOST_ONCE)) {
+ snodeController.getMqttPushService().pushMessageQos0(variableHeader.topicName(), payload);
+ } else if (fixedHeader.qosLevel().equals(MqttQoS.AT_LEAST_ONCE)) {
+ //Push messages to subscribers and add it to IN-FLIGHT messages
+ }
+ if (fixedHeader.qosLevel().value() > 0) {
+ RemotingCommand command = RemotingCommand.createResponseCommand(MqttHeader.class);
+ MqttHeader mqttHeader = (MqttHeader) command.readCustomHeader();
+ if (fixedHeader.qosLevel().equals(MqttQoS.AT_MOST_ONCE)) {
+ mqttHeader.setMessageType(MqttMessageType.PUBACK.value());
+ mqttHeader.setDup(false);
+ mqttHeader.setQosLevel(MqttQoS.AT_MOST_ONCE.value());
+ mqttHeader.setRetain(false);
+ mqttHeader.setRemainingLength(2);
+ mqttHeader.setPacketId(0);
+ } else if (fixedHeader.qosLevel().equals(MqttQoS.AT_LEAST_ONCE)) {
+ //PUBREC/PUBREL/PUBCOMP
+ }
+ return command;
+ }
return null;
}
-
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttSubscribeMessageHandler.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttSubscribeMessageHandler.java
index 5bbec46db12d9945f4202950b1573fc91ae1de49..5999996701cb32ed83ffdad164c5aed4ba7a66fc 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttSubscribeMessageHandler.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttSubscribeMessageHandler.java
@@ -25,7 +25,9 @@ import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttSubscribePayload;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.ResponseCode;
@@ -36,7 +38,6 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
-import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttSubAckPayload;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.Client;
import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl;
@@ -80,6 +81,7 @@ public class MqttSubscribeMessageHandler implements MessageHandler {
if (payload.topicSubscriptions() == null || payload.topicSubscriptions().size() == 0) {
log.error("The payload of a SUBSCRIBE packet MUST contain at least one Topic Filter / QoS pair. This will be treated as protocol violation and the connection will be closed. remotingChannel={}, MqttMessage={}", remotingChannel.toString(), message.toString());
remotingChannel.close();
+ return null;
}
if (isQosLegal(payload.topicSubscriptions())) {
log.error("The QoS level of Topic Filter / QoS pairs should be 0 or 1 or 2. The connection will be closed. remotingChannel={}, MqttMessage={}", remotingChannel.toString(), message.toString());
@@ -97,12 +99,13 @@ public class MqttSubscribeMessageHandler implements MessageHandler {
mqttHeader.setDup(false);
mqttHeader.setQosLevel(MqttQoS.AT_MOST_ONCE.value());
mqttHeader.setRetain(false);
-// mqttHeader.setRemainingLength(0x02);
mqttHeader.setMessageId(mqttSubscribeMessage.variableHeader().messageId());
List grantQoss = doSubscribe(client, payload.topicSubscriptions(), iotClientManager);
- RocketMQMqttSubAckPayload ackPayload = RocketMQMqttSubAckPayload.fromMqttSubAckPayload(new MqttSubAckPayload(grantQoss));
- command.setBody(ackPayload.encode());
+ //Publish retained messages to subscribers.
+ MqttSubAckPayload mqttSubAckPayload = new MqttSubAckPayload(grantQoss);
+ command.setPayload(mqttSubAckPayload);
+ mqttHeader.setRemainingLength(0x02 + mqttSubAckPayload.grantedQoSLevels().size());
command.setRemark(null);
command.setCode(ResponseCode.SUCCESS);
return command;
@@ -111,9 +114,9 @@ public class MqttSubscribeMessageHandler implements MessageHandler {
private List doSubscribe(Client client, List mqttTopicSubscriptions,
IOTClientManagerImpl iotClientManager) {
//do the logic when client sends subscribe packet.
- //1.register clientId2Subscription
+ //1.update clientId2Subscription
ConcurrentHashMap clientId2Subscription = iotClientManager.getClientId2Subscription();
- ConcurrentHashMap>> topic2SubscriptionTable = iotClientManager.getTopic2SubscriptionTable();
+ ConcurrentHashMap>> topic2SubscriptionTable = iotClientManager.getTopic2SubscriptionTable();
Subscription subscription = null;
if (clientId2Subscription.containsKey(client.getClientId())) {
subscription = clientId2Subscription.get(client.getClientId());
@@ -128,7 +131,25 @@ public class MqttSubscribeMessageHandler implements MessageHandler {
grantQoss.add(actualQos);
SubscriptionData subscriptionData = new MqttSubscriptionData(mqttTopicSubscription.qualityOfService().value(), client.getClientId(), mqttTopicSubscription.topicName());
subscriptionDatas.put(mqttTopicSubscription.topicName(), subscriptionData);
- //2.register topic2SubscriptionTable
+ //2.update topic2SubscriptionTable
+ String rootTopic = MqttUtil.getRootTopic(mqttTopicSubscription.topicName());
+ ConcurrentHashMap> client2SubscriptionData = topic2SubscriptionTable.get(rootTopic);
+ if (client2SubscriptionData == null) {
+ client2SubscriptionData = new ConcurrentHashMap<>();
+ ConcurrentHashMap> prev = topic2SubscriptionTable.putIfAbsent(rootTopic, client2SubscriptionData);
+ if (prev != null) {
+ client2SubscriptionData = prev;
+ }
+ Set subscriptionDataSet = client2SubscriptionData.get(client);
+ if (subscriptionDataSet == null) {
+ subscriptionDataSet = new HashSet<>();
+ Set prevSubscriptionDataSet = client2SubscriptionData.putIfAbsent(client, subscriptionDataSet);
+ if (prevSubscriptionDataSet != null) {
+ subscriptionDataSet = prevSubscriptionDataSet;
+ }
+ subscriptionDataSet.add(subscriptionData);
+ }
+ }
}
return grantQoss;
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttUnsubscribeMessagHandler.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttUnsubscribeMessagHandler.java
index d2feaa58c975c39ab8e331cd56157105fdbdbdfd..38f6519dfbc3673a11572d6f5cdb69cbefdf3a25 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttUnsubscribeMessagHandler.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttUnsubscribeMessagHandler.java
@@ -17,34 +17,109 @@
package org.apache.rocketmq.snode.processor.mqtthandler;
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
+import io.netty.handler.codec.mqtt.MqttUnsubscribePayload;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.snode.SnodeController;
+import org.apache.rocketmq.snode.client.Client;
+import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl;
+import org.apache.rocketmq.snode.client.impl.Subscription;
+import org.apache.rocketmq.snode.exception.WrongMessageTypeException;
+import org.apache.rocketmq.snode.util.MqttUtil;
/**
* handle the UNSUBSCRIBE message from the client
- *
- * - extract topic filters to be un-subscribed
- * - get the topics matching with the topic filters
- * - verify the authorization of the client to the
- * - remove subscription from the SubscriptionStore
- *
*/
public class MqttUnsubscribeMessagHandler implements MessageHandler {
-/* private SubscriptionStore subscriptionStore;
-
- public MqttUnsubscribeMessagHandler(SubscriptionStore subscriptionStore) {
- this.subscriptionStore = subscriptionStore;
- }*/
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private final SnodeController snodeController;
public MqttUnsubscribeMessagHandler(SnodeController snodeController) {
this.snodeController = snodeController;
}
+
@Override
public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
- return null;
+ if (!(message instanceof MqttUnsubscribeMessage)) {
+ log.error("Wrong message type! Expected type: UNSUBSCRIBE but {} was received. MqttMessage={}", message.fixedHeader().messageType(), message.toString());
+ throw new WrongMessageTypeException("Wrong message type exception.");
+ }
+ MqttUnsubscribeMessage unsubscribeMessage = (MqttUnsubscribeMessage) message;
+ MqttFixedHeader fixedHeader = unsubscribeMessage.fixedHeader();
+ if (fixedHeader.isDup() || !fixedHeader.qosLevel().equals(MqttQoS.AT_LEAST_ONCE) || fixedHeader.isRetain()) {
+ log.error("Malformed value of reserved bits(bits 3,2,1,0) of fixed header. Expected=0010, received={}{}{}{}", fixedHeader.isDup() ? 1 : 0, Integer.toBinaryString(fixedHeader.qosLevel().value()), fixedHeader.isRetain() ? 1 : 0);
+ remotingChannel.close();
+ return null;
+ }
+ MqttUnsubscribePayload payload = unsubscribeMessage.payload();
+ if (payload.topics() == null || payload.topics().size() == 0) {
+ log.error("The payload of a UNSUBSCRIBE packet MUST contain at least one Topic Filter. This will be treated as protocol violation and the connection will be closed. remotingChannel={}, MqttMessage={}", remotingChannel.toString(), message.toString());
+ remotingChannel.close();
+ return null;
+ }
+ IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) snodeController.getIotClientManager();
+ Client client = iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel);
+ if (client == null) {
+ log.error("Can't find associated client, the connection will be closed. remotingChannel={}, MqttMessage={}", remotingChannel.toString(), message.toString());
+ remotingChannel.close();
+ return null;
+ }
+
+ RemotingCommand command = RemotingCommand.createResponseCommand(MqttHeader.class);
+ MqttHeader mqttHeader = (MqttHeader) command.readCustomHeader();
+ mqttHeader.setMessageType(MqttMessageType.SUBACK.value());
+ mqttHeader.setDup(false);
+ mqttHeader.setQosLevel(MqttQoS.AT_MOST_ONCE.value());
+ mqttHeader.setRetain(false);
+ mqttHeader.setMessageId(unsubscribeMessage.variableHeader().messageId());
+
+ doUnsubscribe(client, payload.topics(), iotClientManager);
+
+ mqttHeader.setRemainingLength(0x02);
+ command.setRemark(null);
+ command.setCode(ResponseCode.SUCCESS);
+ return command;
+ }
+
+ private void doUnsubscribe(Client client, List topics, IOTClientManagerImpl iotClientManager) {
+ ConcurrentHashMap clientId2Subscription = iotClientManager.getClientId2Subscription();
+ ConcurrentHashMap>> topic2SubscriptionTable = iotClientManager.getTopic2SubscriptionTable();
+
+ for (String topicFilter : topics) {
+ //1.update clientId2Subscription
+ if (clientId2Subscription.containsKey(client.getClientId())) {
+ Subscription subscription = clientId2Subscription.get(client.getClientId());
+ subscription.getSubscriptionTable().remove(topicFilter);
+ }
+ //2.update topic2SubscriptionTable
+ String rootTopic = MqttUtil.getRootTopic(topicFilter);
+ ConcurrentHashMap> client2SubscriptionData = topic2SubscriptionTable.get(rootTopic);
+ if (client2SubscriptionData != null) {
+ Set subscriptionDataSet = client2SubscriptionData.get(client);
+ if (subscriptionDataSet != null) {
+ Iterator iterator = subscriptionDataSet.iterator();
+ while (iterator.hasNext()) {
+ if (iterator.next().getTopic().equals(topicFilter))
+ iterator.remove();
+ }
+ }
+ }
+ }
}
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/MqttPushServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/MqttPushServiceImpl.java
new file mode 100644
index 0000000000000000000000000000000000000000..ea8b9734e4a3b82a1ae36616a775f5c47504de50
--- /dev/null
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/MqttPushServiceImpl.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.snode.service.impl;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.util.ReferenceCountUtil;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.RemotingChannel;
+import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
+import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
+import org.apache.rocketmq.snode.SnodeController;
+import org.apache.rocketmq.snode.client.Client;
+import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl;
+import org.apache.rocketmq.snode.constant.SnodeConstant;
+import org.apache.rocketmq.snode.util.MqttUtil;
+
+public class MqttPushServiceImpl {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
+
+ private SnodeController snodeController;
+ private ExecutorService pushMqttMessageExecutorService;
+
+ public MqttPushServiceImpl(final SnodeController snodeController) {
+ this.snodeController = snodeController;
+ pushMqttMessageExecutorService = ThreadUtils.newThreadPoolExecutor(
+ this.snodeController.getSnodeConfig().getSnodePushMqttMessageMinPoolSize(),
+ this.snodeController.getSnodeConfig().getSnodePushMqttMessageMaxPoolSize(),
+ 3000,
+ TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<>(this.snodeController.getSnodeConfig().getSnodePushMqttMessageThreadPoolQueueCapacity()),
+ "SnodePushMqttMessageThread",
+ false);
+ }
+
+ public class MqttPushTask implements Runnable {
+ private AtomicBoolean canceled = new AtomicBoolean(false);
+ private final ByteBuf message;
+ private final String topic;
+ private final Integer qos;
+ private boolean retain;
+ private Integer packetId;
+
+ public MqttPushTask(final String topic, final ByteBuf message, final Integer qos, boolean retain,
+ Integer packetId) {
+ this.message = message;
+ this.topic = topic;
+ this.qos = qos;
+ this.retain = retain;
+ this.packetId = packetId;
+ }
+
+ @Override
+ public void run() {
+ if (!canceled.get()) {
+ try {
+ RemotingCommand requestCommand = buildRequestCommand(topic, qos, retain, packetId);
+
+ //find those clients publishing the message to
+ IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) snodeController.getIotClientManager();
+ ConcurrentHashMap>> topic2SubscriptionTable = iotClientManager.getTopic2SubscriptionTable();
+ Set clients = new HashSet<>();
+ if (topic2SubscriptionTable.containsKey(MqttUtil.getRootTopic(topic))) {
+ ConcurrentHashMap> client2SubscriptionDatas = topic2SubscriptionTable.get(MqttUtil.getRootTopic(topic));
+ for (Map.Entry> entry : client2SubscriptionDatas.entrySet()) {
+ Set subscriptionDatas = entry.getValue();
+ for (SubscriptionData subscriptionData : subscriptionDatas) {
+ if (MqttUtil.isMatch(subscriptionData.getTopic(), topic)) {
+ clients.add(entry.getKey());
+ break;
+ }
+ }
+ }
+ }
+ for (Client client : clients) {
+ RemotingChannel remotingChannel = client.getRemotingChannel();
+ if (client.getRemotingChannel() instanceof NettyChannelHandlerContextImpl) {
+ remotingChannel = new NettyChannelImpl(((NettyChannelHandlerContextImpl) client.getRemotingChannel()).getChannelHandlerContext().channel());
+ }
+ requestCommand.setPayload(message.copy());
+ snodeController.getMqttRemotingServer().push(remotingChannel, requestCommand, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
+ }
+ } catch (Exception ex) {
+ log.warn("Exception was thrown when pushing MQTT message to topic: {}, exception={}", topic, ex.getMessage());
+ }finally {
+ System.out.println("Release Bytebuf");
+ ReferenceCountUtil.release(message);
+ }
+ } else {
+ log.info("Push message to topic: {} canceled!", topic);
+ }
+ }
+
+ private RemotingCommand buildRequestCommand(final String topic, final Integer qos, boolean retain,
+ Integer packetId) {
+ MqttHeader mqttHeader = new MqttHeader();
+ mqttHeader.setMessageType(MqttMessageType.PUBLISH.value());
+ if (qos == 0) {
+ mqttHeader.setDup(false);//DUP is always 0 for qos=0 messages
+ } else {
+ mqttHeader.setDup(false);//DUP is depending on whether it is a re-delivery of an earlier attempt.
+ }
+ mqttHeader.setQosLevel(qos);
+ mqttHeader.setRetain(retain);
+ mqttHeader.setPacketId(packetId);
+ mqttHeader.setTopicName(topic);
+ mqttHeader.setRemainingLength(4 + topic.getBytes().length + message.readableBytes());
+
+ RemotingCommand pushMessage = RemotingCommand.createRequestCommand(RequestCode.MQTT_MESSAGE, mqttHeader);
+// pushMessage.setPayload(message);
+ return pushMessage;
+ }
+
+ public void setCanceled(AtomicBoolean canceled) {
+ this.canceled = canceled;
+ }
+
+ }
+
+ public void pushMessageQos0(final String topic, final ByteBuf message) {
+ MqttPushTask pushTask = new MqttPushTask(topic, message, 0, false, 0);
+ pushMqttMessageExecutorService.submit(pushTask);
+ }
+
+ public void pushMessageQos1(final String topic, final ByteBuf message, final Integer qos, boolean retain,
+ Integer packetId) {
+ MqttPushTask pushTask = new MqttPushTask(topic, message, qos, retain, packetId);
+ pushMqttMessageExecutorService.submit(pushTask);
+ }
+
+ public void shutdown() {
+ this.pushMqttMessageExecutorService.shutdown();
+ }
+}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/util/MqttUtil.java b/snode/src/main/java/org/apache/rocketmq/snode/util/MqttUtil.java
index 6064491c4d8cc45e6e09cd2526785ac861af904c..ef44a7acdb4bf6ff080fe1c7fd3c762487ae7af3 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/util/MqttUtil.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/util/MqttUtil.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.snode.util;
+import io.netty.handler.codec.mqtt.MqttQoS;
import java.util.UUID;
import org.apache.rocketmq.snode.constant.MqttConstant;
@@ -33,4 +34,33 @@ public class MqttUtil {
public static int actualQos(int qos) {
return Math.min(MqttConstant.MAX_SUPPORTED_QOS, qos);
}
+
+ public static boolean isQosLegal(MqttQoS qos) {
+ if (!qos.equals(MqttQoS.AT_LEAST_ONCE) && !qos.equals(MqttQoS.AT_MOST_ONCE) && !qos.equals(MqttQoS.EXACTLY_ONCE)) {
+ return false;
+ }
+ return false;
+ }
+
+ public static boolean isMatch(String topicFiter, String topic) {
+ if (!topicFiter.contains(MqttConstant.SUBSCRIPTION_FLAG_PLUS) && !topicFiter.contains(MqttConstant.SUBSCRIPTION_FLAG_SHARP)) {
+ return topicFiter.equals(topic);
+ }
+ String[] filterTopics = topicFiter.split(MqttConstant.SUBSCRIPTION_SEPARATOR);
+ String[] actualTopics = topic.split(MqttConstant.SUBSCRIPTION_SEPARATOR);
+
+ int i = 0;
+ for (; i < filterTopics.length && i < actualTopics.length; i++) {
+ if (MqttConstant.SUBSCRIPTION_FLAG_PLUS.equals(filterTopics[i])) {
+ continue;
+ }
+ if (MqttConstant.SUBSCRIPTION_FLAG_SHARP.equals(filterTopics[i])) {
+ return true;
+ }
+ if (!filterTopics[i].equals(actualTopics[i])) {
+ return false;
+ }
+ }
+ return i == actualTopics.length;
+ }
}
diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java b/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java
index 85a862f4671e255ee6c0b3981aeb90d9a2136de4..6f75a3266aecd257d68c4e5cd3489cec6ff43988 100644
--- a/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java
+++ b/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java
@@ -16,23 +16,18 @@
*/
package org.apache.rocketmq.snode.processor;
-import com.alibaba.fastjson.JSON;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.io.UnsupportedEncodingException;
-import java.nio.charset.Charset;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.ServerConfig;
-import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
-import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload;
import org.apache.rocketmq.snode.SnodeController;
import org.junit.Before;
import org.junit.Test;
@@ -93,16 +88,7 @@ public class DefaultMqttMessageProcessorTest {
MqttHeader mqttHeader = createMqttConnectMesssageHeader();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.MQTT_MESSAGE, mqttHeader);
MqttConnectPayload payload = new MqttConnectPayload("1234567", "testTopic", "willMessage".getBytes(), null, "1234567".getBytes());
- request.setBody(RocketMQMqttConnectPayload.fromMqttConnectPayload(payload).encode());
- CodecHelper.makeCustomHeaderToNet(request);
+ request.setPayload(payload);
return request;
}
-
- private byte[] encode(Object obj) {
- String json = JSON.toJSONString(obj, false);
- if (json != null) {
- return json.getBytes(Charset.forName(RemotingUtil.REMOTING_CHARSET));
- }
- return null;
- }
}