From 5a3102d9f4fd74949d1d31c4c6b75bd9965725c8 Mon Sep 17 00:00:00 2001 From: chengxiangwang Date: Thu, 21 Feb 2019 01:18:24 +0800 Subject: [PATCH] completing qos=0 message pub and sub --- .../apache/rocketmq/common/SnodeConfig.java | 50 ++++-- .../heartbeat/MqttSubscriptionData.java | 2 - example/pom.xml | 4 + .../example/mqtt/MqttSampleConsumer.java | 86 +++++++++ .../example/mqtt/MqttSampleProducer.java | 68 ++++++++ pom.xml | 5 + .../remoting/protocol/RemotingCommand.java | 10 ++ .../MqttMessage2RemotingCommandHandler.java | 2 +- .../RemotingCommand2MqttMessageHandler.java | 4 +- .../mqtt/RocketMQMqttConnectPayload.java | 112 ------------ .../mqtt/RocketMQMqttSubAckPayload.java | 59 ------- .../mqtt/RocketMQMqttSubscribePayload.java | 67 ------- .../mqtt/RocketMQMqttUnSubscribePayload.java | 63 ------- .../dispatcher/EncodeDecodeDispatcher.java | 12 +- .../Message2MessageEncodeDecode.java | 3 +- .../dispatcher/MqttConnectEncodeDecode.java | 10 +- .../MqttConnectackEncodeDecode.java | 4 +- .../dispatcher/MqttPubackEncodeDecode.java | 3 +- .../dispatcher/MqttPublishEncodeDecode.java | 29 ++-- .../dispatcher/MqttSubackEncodeDecode.java | 9 +- .../dispatcher/MqttSubscribeEncodeDecode.java | 11 +- .../dispatcher/MqttUnSubackEncodeDecode.java | 3 +- .../MqttUnSubscribeEncodeDecode.java | 15 +- .../rocketmq/snode/SnodeController.java | 89 ++++++---- .../apache/rocketmq/snode/SnodeStartup.java | 17 ++ .../apache/rocketmq/snode/client/Client.java | 14 +- .../client/ClientHousekeepingService.java | 2 +- .../snode/client/impl/ClientManagerImpl.java | 5 +- .../client/impl/IOTClientManagerImpl.java | 31 +++- .../DefaultMqttMessageProcessor.java | 35 ++-- .../MqttConnectMessageHandler.java | 3 + .../MqttDisconnectMessageHandler.java | 4 +- .../MqttPublishMessageHandler.java | 49 +++++- .../MqttSubscribeMessageHandler.java | 35 +++- .../MqttUnsubscribeMessagHandler.java | 99 +++++++++-- .../service/impl/MqttPushServiceImpl.java | 163 ++++++++++++++++++ .../apache/rocketmq/snode/util/MqttUtil.java | 30 ++++ .../DefaultMqttMessageProcessorTest.java | 16 +- 38 files changed, 757 insertions(+), 466 deletions(-) create mode 100644 example/src/main/java/org/apache/rocketmq/example/mqtt/MqttSampleConsumer.java create mode 100644 example/src/main/java/org/apache/rocketmq/example/mqtt/MqttSampleProducer.java delete mode 100644 remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttConnectPayload.java delete mode 100644 remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttSubAckPayload.java delete mode 100644 remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttSubscribePayload.java delete mode 100644 remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttUnSubscribePayload.java create mode 100644 snode/src/main/java/org/apache/rocketmq/snode/service/impl/MqttPushServiceImpl.java diff --git a/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java b/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java index dd380759..fc1b7e6a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java @@ -56,12 +56,12 @@ public class SnodeConfig { private int snodeSendThreadPoolQueueCapacity = 10000; - private int snodeHandleMqttThreadPoolQueueCapacity = 10000; - private int snodeSendMessageMinPoolSize = 10; private int snodeSendMessageMaxPoolSize = 20; + private int snodeHandleMqttThreadPoolQueueCapacity = 10000; + private int snodeHandleMqttMessageMinPoolSize = 10; private int snodeHandleMqttMessageMaxPoolSize = 20; @@ -88,6 +88,12 @@ public class SnodeConfig { private int snodePushMessageThreadPoolQueueCapacity = 10000; + private int snodePushMqttMessageMinPoolSize = 10; + + private int snodePushMqttMessageMaxPoolSize = 20; + + private int snodePushMqttMessageThreadPoolQueueCapacity = 10000; + private int slowConsumerThreshold = 1024; private final String sendMessageInterceptorPath = "META-INF/service/org.apache.rocketmq.snode.interceptor.SendMessageInterceptor"; @@ -230,14 +236,6 @@ public class SnodeConfig { this.snodeSendThreadPoolQueueCapacity = snodeSendThreadPoolQueueCapacity; } - public int getSnodeHandleMqttThreadPoolQueueCapacity() { - return snodeHandleMqttThreadPoolQueueCapacity; - } - - public void setSnodeHandleMqttThreadPoolQueueCapacity( - int snodeHandleMqttThreadPoolQueueCapacity) { - this.snodeHandleMqttThreadPoolQueueCapacity = snodeHandleMqttThreadPoolQueueCapacity; - } public int getSnodeSendMessageMinPoolSize() { return snodeSendMessageMinPoolSize; @@ -279,6 +277,14 @@ public class SnodeConfig { this.snodeId = snodeId; } + public int getSnodeHandleMqttThreadPoolQueueCapacity() { + return snodeHandleMqttThreadPoolQueueCapacity; + } + + public void setSnodeHandleMqttThreadPoolQueueCapacity(int snodeHandleMqttThreadPoolQueueCapacity) { + this.snodeHandleMqttThreadPoolQueueCapacity = snodeHandleMqttThreadPoolQueueCapacity; + } + public int getSnodeHandleMqttMessageMinPoolSize() { return snodeHandleMqttMessageMinPoolSize; } @@ -359,6 +365,30 @@ public class SnodeConfig { this.snodePushMessageThreadPoolQueueCapacity = snodePushMessageThreadPoolQueueCapacity; } + public int getSnodePushMqttMessageMinPoolSize() { + return snodePushMqttMessageMinPoolSize; + } + + public void setSnodePushMqttMessageMinPoolSize(int snodePushMqttMessageMinPoolSize) { + this.snodePushMqttMessageMinPoolSize = snodePushMqttMessageMinPoolSize; + } + + public int getSnodePushMqttMessageMaxPoolSize() { + return snodePushMqttMessageMaxPoolSize; + } + + public void setSnodePushMqttMessageMaxPoolSize(int snodePushMqttMessageMaxPoolSize) { + this.snodePushMqttMessageMaxPoolSize = snodePushMqttMessageMaxPoolSize; + } + + public int getSnodePushMqttMessageThreadPoolQueueCapacity() { + return snodePushMqttMessageThreadPoolQueueCapacity; + } + + public void setSnodePushMqttMessageThreadPoolQueueCapacity(int snodePushMqttMessageThreadPoolQueueCapacity) { + this.snodePushMqttMessageThreadPoolQueueCapacity = snodePushMqttMessageThreadPoolQueueCapacity; + } + public String getSendMessageInterceptorPath() { return sendMessageInterceptorPath; } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/MqttSubscriptionData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/MqttSubscriptionData.java index 33fa1a87..f52c100e 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/MqttSubscriptionData.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/MqttSubscriptionData.java @@ -68,8 +68,6 @@ public class MqttSubscriptionData extends SubscriptionData { if (getClass() != obj.getClass()) return false; MqttSubscriptionData other = (MqttSubscriptionData) obj; - if (qos != other.qos) - return false; if (clientId != other.clientId) { return false; } diff --git a/example/pom.xml b/example/pom.xml index b192e03f..6706d07e 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -62,5 +62,9 @@ io.prometheus simpleclient_hotspot + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + diff --git a/example/src/main/java/org/apache/rocketmq/example/mqtt/MqttSampleConsumer.java b/example/src/main/java/org/apache/rocketmq/example/mqtt/MqttSampleConsumer.java new file mode 100644 index 00000000..8c52a8f1 --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/mqtt/MqttSampleConsumer.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.example.mqtt; + +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MqttSampleConsumer { + + private static Logger log = LoggerFactory.getLogger(MqttSampleConsumer.class); + + public static void main(String[] args) throws InterruptedException { + String topic = "mqtt-sample"; + int qos = 0; + String broker = "tcp://127.0.0.1:1883"; + String clinetId = "JavaSampleConsumer"; + + MemoryPersistence persistence = new MemoryPersistence(); + + { + try { + MqttClient sampleClient = new MqttClient(broker, clinetId, persistence); + MqttConnectOptions connectOptions = new MqttConnectOptions(); + connectOptions.setCleanSession(true); + connectOptions.setKeepAliveInterval(6000); + log.info("Connecting to broker: " + broker); + sampleClient.connect(connectOptions); + log.info("Connected"); + sampleClient.setCallback(new MqttCallback() { + @Override public void connectionLost(Throwable throwable) { + System.out.println("connection lost." + throwable.getLocalizedMessage()); + } + + @Override public void messageArrived(String s, MqttMessage message) throws Exception { + System.out.println(message.toString()); +// System.exit(0); + } + + @Override public void deliveryComplete(IMqttDeliveryToken token) { + try { + System.out.println("delivery complete." + token.getMessage().toString()); + } catch (MqttException e) { + e.printStackTrace(); + } + } + }); + log.info("Subscribing topic: " + topic); + sampleClient.subscribe(topic, qos); + log.info("Subsrcribe success."); + Thread.sleep(100000000); + } catch (MqttException me) { + log.error("reason " + me.getReasonCode()); + log.error("msg " + me.getMessage()); + log.error("loc " + me.getLocalizedMessage()); + log.error("cause " + me.getCause()); + log.error("excep " + me); + me.printStackTrace(); + me.printStackTrace(); + System.exit(1); + } + } + } + +} diff --git a/example/src/main/java/org/apache/rocketmq/example/mqtt/MqttSampleProducer.java b/example/src/main/java/org/apache/rocketmq/example/mqtt/MqttSampleProducer.java new file mode 100644 index 00000000..429a0187 --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/mqtt/MqttSampleProducer.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.example.mqtt; + +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MqttSampleProducer { + + private static Logger log = LoggerFactory.getLogger(MqttSampleProducer.class); + + public static void main(String[] args) throws InterruptedException { + String topic = "mqtt-sample"; + String messageContent = "hello mqtt"; + int qos = 0; + String broker = "tcp://127.0.0.1:1883"; + String clientId = "JavaSampleProducer"; + + MemoryPersistence persistence = new MemoryPersistence(); + + try { + MqttClient sampleClient = new MqttClient(broker, clientId, persistence); + MqttConnectOptions connOpts = new MqttConnectOptions(); + connOpts.setCleanSession(true); + connOpts.setKeepAliveInterval(6000); + log.info("Connecting to broker: " + broker); + sampleClient.connect(connOpts); + log.info("Connected"); + log.info("Publishing message: " + messageContent); + MqttMessage message = new MqttMessage(messageContent.getBytes()); + message.setQos(qos); + message.setRetained(true); + sampleClient.publish(topic, message); + log.info("Message published"); + /*sampleClient.disconnect(); + log.info("Disconnected");*/ + Thread.sleep(10000000); + } catch (MqttException me) { + log.error("reason " + me.getReasonCode()); + log.error("msg " + me.getMessage()); + log.error("loc " + me.getLocalizedMessage()); + log.error("cause " + me.getCause()); + log.error("excep " + me); + me.printStackTrace(); + System.exit(1); + } + } +} diff --git a/pom.xml b/pom.xml index e18627d2..8b0ce422 100644 --- a/pom.xml +++ b/pom.xml @@ -648,6 +648,11 @@ simpleclient_hotspot 0.6.0 + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.2.0 + diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java index e823a69c..7c008faf 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java @@ -80,6 +80,8 @@ public class RemotingCommand { private transient byte[] body; + private Object payload; + public RemotingCommand() { } @@ -259,6 +261,14 @@ public class RemotingCommand { this.body = body; } + public Object getPayload() { + return payload; + } + + public void setPayload(Object payload) { + this.payload = payload; + } + public HashMap getExtFields() { return extFields; } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttMessage2RemotingCommandHandler.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttMessage2RemotingCommandHandler.java index 2199c777..41c21741 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttMessage2RemotingCommandHandler.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttMessage2RemotingCommandHandler.java @@ -42,7 +42,7 @@ public class MqttMessage2RemotingCommandHandler extends MessageToMessageDecoder< if (!(msg instanceof MqttMessage)) { return; } - RemotingCommand requestCommand = null; + RemotingCommand requestCommand; Message2MessageEncodeDecode message2MessageEncodeDecode = EncodeDecodeDispatcher .getEncodeDecodeDispatcher().get(msg.fixedHeader().messageType()); if (message2MessageEncodeDecode == null) { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RemotingCommand2MqttMessageHandler.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RemotingCommand2MqttMessageHandler.java index af4dc914..e5c34c0e 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RemotingCommand2MqttMessageHandler.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RemotingCommand2MqttMessageHandler.java @@ -43,8 +43,8 @@ public class RemotingCommand2MqttMessageHandler extends MessageToMessageEncoder< if (!(msg instanceof RemotingCommand)) { return; } - MqttMessage mqttMessage = null; - MqttHeader mqttHeader = (MqttHeader) msg.decodeCommandCustomHeader(MqttHeader.class); + MqttMessage mqttMessage; + MqttHeader mqttHeader = (MqttHeader) msg.readCustomHeader(); Message2MessageEncodeDecode message2MessageEncodeDecode = EncodeDecodeDispatcher .getEncodeDecodeDispatcher().get( MqttMessageType.valueOf(mqttHeader.getMessageType())); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttConnectPayload.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttConnectPayload.java deleted file mode 100644 index 32735ccd..00000000 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttConnectPayload.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.remoting.transport.mqtt; - -import io.netty.handler.codec.mqtt.MqttConnectMessage; -import io.netty.handler.codec.mqtt.MqttConnectPayload; -import io.netty.util.internal.StringUtil; -import java.io.UnsupportedEncodingException; -import org.apache.rocketmq.remoting.common.RemotingUtil; -import org.apache.rocketmq.remoting.serialize.RemotingSerializable; - -/** - * Payload of {@link MqttConnectMessage} - */ -public final class RocketMQMqttConnectPayload extends RemotingSerializable { - - private String clientIdentifier; - private String willTopic; - private String willMessage; - private String userName; - private String password; - - public RocketMQMqttConnectPayload( - String clientIdentifier, - String willTopic, - String willMessage, - String userName, - String password) { - this.clientIdentifier = clientIdentifier; - this.willTopic = willTopic; - this.willMessage = willMessage; - this.userName = userName; - this.password = password; - } - - public static RocketMQMqttConnectPayload fromMqttConnectPayload(MqttConnectPayload payload) { - return new RocketMQMqttConnectPayload(payload.clientIdentifier(), payload.willTopic(), - payload.willMessage(), payload.userName(), payload.password()); - } - - public MqttConnectPayload toMqttConnectPayload() throws UnsupportedEncodingException { - return new MqttConnectPayload(this.clientIdentifier, this.willTopic, this.willMessage.getBytes( - RemotingUtil.REMOTING_CHARSET), this.userName, this.password.getBytes(RemotingUtil.REMOTING_CHARSET)); - } - public String getClientIdentifier() { - return clientIdentifier; - } - - public String getWillTopic() { - return willTopic; - } - - public String getWillMessage() { - return willMessage; - } - - public String getUserName() { - return userName; - } - - public String getPassword() { - return password; - } - - public void setClientIdentifier(String clientIdentifier) { - this.clientIdentifier = clientIdentifier; - } - - public void setWillTopic(String willTopic) { - this.willTopic = willTopic; - } - - public void setWillMessage(String willMessage) { - this.willMessage = willMessage; - } - - public void setUserName(String userName) { - this.userName = userName; - } - - public void setPassword(String password) { - this.password = password; - } - - @Override - public String toString() { - return new StringBuilder(StringUtil.simpleClassName(this)) - .append('[') - .append("clientIdentifier=").append(clientIdentifier) - .append(", willTopic=").append(willTopic) - .append(", willMessage=").append(willMessage) - .append(", userName=").append(userName) - .append(", password=").append(password) - .append(']') - .toString(); - } -} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttSubAckPayload.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttSubAckPayload.java deleted file mode 100644 index 495ffbcf..00000000 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttSubAckPayload.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.remoting.transport.mqtt; - -import io.netty.handler.codec.mqtt.MqttSubAckMessage; -import io.netty.handler.codec.mqtt.MqttSubAckPayload; -import io.netty.util.internal.StringUtil; -import java.io.UnsupportedEncodingException; -import java.util.Collections; -import java.util.List; -import org.apache.rocketmq.remoting.serialize.RemotingSerializable; - -/** - * Payload of {@link MqttSubAckMessage} - */ -public final class RocketMQMqttSubAckPayload extends RemotingSerializable { - - private List grantedQoSLevels; - - public RocketMQMqttSubAckPayload(List grantedQoSLevels) { - this.grantedQoSLevels = Collections.unmodifiableList(grantedQoSLevels); - } - - public List getGrantedQoSLevels() { - return grantedQoSLevels; - } - - public void setGrantedQoSLevels(List grantedQoSLevels) { - this.grantedQoSLevels = grantedQoSLevels; - } - - public static RocketMQMqttSubAckPayload fromMqttSubAckPayload(MqttSubAckPayload payload) { - return new RocketMQMqttSubAckPayload(payload.grantedQoSLevels()); - } - - public MqttSubAckPayload toMqttSubAckPayload() throws UnsupportedEncodingException { - return new MqttSubAckPayload(this.grantedQoSLevels); - } - - @Override - public String toString() { - return StringUtil.simpleClassName(this) + '[' + "grantedQoSLevels=" + this.grantedQoSLevels + ']'; - } -} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttSubscribePayload.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttSubscribePayload.java deleted file mode 100644 index 8b2a19d9..00000000 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttSubscribePayload.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.remoting.transport.mqtt; - -import io.netty.handler.codec.mqtt.MqttSubscribeMessage; -import io.netty.handler.codec.mqtt.MqttSubscribePayload; -import io.netty.handler.codec.mqtt.MqttTopicSubscription; -import io.netty.util.internal.StringUtil; -import java.io.UnsupportedEncodingException; -import java.util.Collections; -import java.util.List; -import org.apache.rocketmq.remoting.serialize.RemotingSerializable; - -/** - * Payload of {@link MqttSubscribeMessage} - */ -public final class RocketMQMqttSubscribePayload extends RemotingSerializable { - - private List topicSubscriptions; - - public RocketMQMqttSubscribePayload(List topicSubscriptions) { - this.topicSubscriptions = Collections.unmodifiableList(topicSubscriptions); - } - - public List getTopicSubscriptions() { - return topicSubscriptions; - } - - public void setTopicSubscriptions( - List topicSubscriptions) { - this.topicSubscriptions = topicSubscriptions; - } - - public static RocketMQMqttSubscribePayload fromMqttSubscribePayload(MqttSubscribePayload payload) { - return new RocketMQMqttSubscribePayload(payload.topicSubscriptions()); - } - - public MqttSubscribePayload toMqttSubscribePayload() throws UnsupportedEncodingException { - return new MqttSubscribePayload(this.topicSubscriptions); - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(StringUtil.simpleClassName(this)).append('['); - for (int i = 0; i < topicSubscriptions.size() - 1; i++) { - builder.append(topicSubscriptions.get(i)).append(", "); - } - builder.append(topicSubscriptions.get(topicSubscriptions.size() - 1)); - builder.append(']'); - return builder.toString(); - } -} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttUnSubscribePayload.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttUnSubscribePayload.java deleted file mode 100644 index 8e748438..00000000 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttUnSubscribePayload.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.remoting.transport.mqtt; - -import io.netty.handler.codec.mqtt.MqttUnsubscribePayload; -import io.netty.util.internal.StringUtil; -import java.io.UnsupportedEncodingException; -import java.util.Collections; -import java.util.List; -import org.apache.rocketmq.remoting.serialize.RemotingSerializable; - -/** - * Payload of the {@link io.netty.handler.codec.mqtt.MqttUnsubscribeMessage} - */ -public class RocketMQMqttUnSubscribePayload extends RemotingSerializable { - private List topics; - - public RocketMQMqttUnSubscribePayload(List topics) { - this.topics = topics; - } - - public List getTopics() { - return topics; - } - - public void setTopics(List topics) { - this.topics = Collections.unmodifiableList(topics); - } - - public static RocketMQMqttUnSubscribePayload fromMqttUnSubscribePayload(MqttUnsubscribePayload payload) { - return new RocketMQMqttUnSubscribePayload(payload.topics()); - } - - public MqttUnsubscribePayload toMqttUnsubscribePayload() throws UnsupportedEncodingException { - return new MqttUnsubscribePayload(this.topics); - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(StringUtil.simpleClassName(this)).append('['); - for (int i = 0; i < topics.size() - 1; i++) { - builder.append("topicName = ").append(topics.get(i)).append(", "); - } - builder.append("topicName = ").append(topics.get(topics.size() - 1)) - .append(']'); - return builder.toString(); - } -} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/EncodeDecodeDispatcher.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/EncodeDecodeDispatcher.java index 27e40cf2..6b3cb35c 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/EncodeDecodeDispatcher.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/EncodeDecodeDispatcher.java @@ -29,15 +29,15 @@ public class EncodeDecodeDispatcher { encodeDecodeDispatcher.put(MqttMessageType.CONNECT, new MqttConnectEncodeDecode()); encodeDecodeDispatcher.put(MqttMessageType.CONNACK, new MqttConnectackEncodeDecode()); encodeDecodeDispatcher.put(MqttMessageType.DISCONNECT, null); - encodeDecodeDispatcher.put(MqttMessageType.PUBLISH, null); - encodeDecodeDispatcher.put(MqttMessageType.PUBACK, null); + encodeDecodeDispatcher.put(MqttMessageType.PUBLISH, new MqttPublishEncodeDecode()); + encodeDecodeDispatcher.put(MqttMessageType.PUBACK, new MqttPubackEncodeDecode()); encodeDecodeDispatcher.put(MqttMessageType.PUBREC, null); encodeDecodeDispatcher.put(MqttMessageType.PUBREL, null); encodeDecodeDispatcher.put(MqttMessageType.PUBCOMP, null); - encodeDecodeDispatcher.put(MqttMessageType.SUBSCRIBE, null); - encodeDecodeDispatcher.put(MqttMessageType.SUBACK, null); - encodeDecodeDispatcher.put(MqttMessageType.UNSUBSCRIBE, null); - encodeDecodeDispatcher.put(MqttMessageType.UNSUBACK, null); + encodeDecodeDispatcher.put(MqttMessageType.SUBSCRIBE, new MqttSubscribeEncodeDecode()); + encodeDecodeDispatcher.put(MqttMessageType.SUBACK, new MqttSubackEncodeDecode()); + encodeDecodeDispatcher.put(MqttMessageType.UNSUBSCRIBE, new MqttUnSubscribeEncodeDecode()); + encodeDecodeDispatcher.put(MqttMessageType.UNSUBACK, new MqttUnSubackEncodeDecode()); encodeDecodeDispatcher.put(MqttMessageType.PINGREQ, null); encodeDecodeDispatcher.put(MqttMessageType.PINGRESP, null); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/Message2MessageEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/Message2MessageEncodeDecode.java index ad0570b2..97d0dfd2 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/Message2MessageEncodeDecode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/Message2MessageEncodeDecode.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.remoting.transport.mqtt.dispatcher; import io.netty.handler.codec.mqtt.MqttMessage; +import java.io.UnsupportedEncodingException; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; @@ -25,5 +26,5 @@ public interface Message2MessageEncodeDecode { RemotingCommand decode(MqttMessage mqttMessage); - MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException; + MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException, UnsupportedEncodingException; } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttConnectEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttConnectEncodeDecode.java index 0f0f3d79..61586742 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttConnectEncodeDecode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttConnectEncodeDecode.java @@ -21,18 +21,14 @@ import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; -import org.apache.rocketmq.remoting.netty.CodecHelper; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; -import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload; public class MqttConnectEncodeDecode implements Message2MessageEncodeDecode { @Override public RemotingCommand decode(MqttMessage mqttMessage) { - RocketMQMqttConnectPayload payload = RocketMQMqttConnectPayload - .fromMqttConnectPayload(((MqttConnectMessage) mqttMessage).payload()); - RemotingCommand requestCommand = null; + RemotingCommand requestCommand; MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader(); MqttConnectVariableHeader variableHeader = (MqttConnectVariableHeader) mqttMessage .variableHeader(); @@ -57,9 +53,7 @@ public class MqttConnectEncodeDecode implements Message2MessageEncodeDecode { requestCommand = RemotingCommand .createRequestCommand(1000, mqttHeader); - CodecHelper.makeCustomHeaderToNet(requestCommand); - - requestCommand.setBody(payload.encode()); + requestCommand.setPayload(((MqttConnectMessage) mqttMessage).payload()); return requestCommand; } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttConnectackEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttConnectackEncodeDecode.java index f7c60190..1f3dc2a4 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttConnectackEncodeDecode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttConnectackEncodeDecode.java @@ -37,9 +37,7 @@ public class MqttConnectackEncodeDecode implements Message2MessageEncodeDecode { @Override public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException { - MqttHeader mqttHeader = (MqttHeader) remotingCommand - .decodeCommandCustomHeader(MqttHeader.class); - + MqttHeader mqttHeader = (MqttHeader)remotingCommand.readCustomHeader(); return new MqttConnAckMessage( new MqttFixedHeader(MqttMessageType.CONNACK, mqttHeader.isDup(), MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(), diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPubackEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPubackEncodeDecode.java index fde2fcc9..0f11a6b8 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPubackEncodeDecode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPubackEncodeDecode.java @@ -36,8 +36,7 @@ public class MqttPubackEncodeDecode implements Message2MessageEncodeDecode { @Override public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException { - MqttHeader mqttHeader = (MqttHeader) remotingCommand - .decodeCommandCustomHeader(MqttHeader.class); + MqttHeader mqttHeader = (MqttHeader) remotingCommand.readCustomHeader(); return new MqttPubAckMessage( new MqttFixedHeader(MqttMessageType.PUBACK, mqttHeader.isDup(), diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPublishEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPublishEncodeDecode.java index ee76e95d..1b30acd1 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPublishEncodeDecode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPublishEncodeDecode.java @@ -18,11 +18,12 @@ package org.apache.rocketmq.remoting.transport.mqtt.dispatcher; import io.netty.buffer.ByteBuf; -import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader; import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttPublishMessage; -import org.apache.rocketmq.remoting.netty.CodecHelper; +import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; +import io.netty.handler.codec.mqtt.MqttQoS; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; @@ -30,13 +31,9 @@ public class MqttPublishEncodeDecode implements Message2MessageEncodeDecode { @Override public RemotingCommand decode(MqttMessage mqttMessage) { - ByteBuf byteBuf = ((MqttPublishMessage) mqttMessage).payload(); - byte[] payload = new byte[byteBuf.readableBytes()]; - byteBuf.readBytes(payload); - - RemotingCommand requestCommand = null; + RemotingCommand requestCommand; MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader(); - MqttConnAckVariableHeader variableHeader = (MqttConnAckVariableHeader) mqttMessage + MqttPublishVariableHeader variableHeader = (MqttPublishVariableHeader) mqttMessage .variableHeader(); MqttHeader mqttHeader = new MqttHeader(); @@ -46,19 +43,23 @@ public class MqttPublishEncodeDecode implements Message2MessageEncodeDecode { mqttHeader.setRetain(mqttFixedHeader.isRetain()); mqttHeader.setRemainingLength(mqttFixedHeader.remainingLength()); - mqttHeader.setConnectReturnCode(variableHeader.connectReturnCode().name()); - mqttHeader.setSessionPresent(variableHeader.isSessionPresent()); + mqttHeader.setTopicName(variableHeader.topicName()); + mqttHeader.setPacketId(variableHeader.packetId()); requestCommand = RemotingCommand .createRequestCommand(1000, mqttHeader); - CodecHelper.makeCustomHeaderToNet(requestCommand); - - requestCommand.setBody(payload); + //invoke copy to generate a new ByteBuf or increase refCnt by 1 by invoking retain() method, because release method is invoked in Message2MessageEncodeDecode.channelRead + requestCommand.setPayload(((MqttPublishMessage) mqttMessage).payload().copy()); return requestCommand; } @Override public MqttMessage encode(RemotingCommand remotingCommand) { - return null; + MqttHeader mqttHeader = (MqttHeader) remotingCommand.readCustomHeader(); + return new MqttPublishMessage( + new MqttFixedHeader(MqttMessageType.PUBLISH, mqttHeader.isDup(), + MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(), + mqttHeader.getRemainingLength()), + new MqttPublishVariableHeader(mqttHeader.getTopicName(), mqttHeader.getPacketId()), (ByteBuf) remotingCommand.getPayload()); } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubackEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubackEncodeDecode.java index 8c49e61c..c87bc149 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubackEncodeDecode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubackEncodeDecode.java @@ -24,6 +24,7 @@ import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttSubAckMessage; import io.netty.handler.codec.mqtt.MqttSubAckPayload; +import java.io.UnsupportedEncodingException; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; @@ -36,14 +37,12 @@ public class MqttSubackEncodeDecode implements Message2MessageEncodeDecode { } @Override - public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException { - MqttHeader mqttHeader = (MqttHeader) remotingCommand - .decodeCommandCustomHeader(MqttHeader.class); - + public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException, UnsupportedEncodingException { + MqttHeader mqttHeader = (MqttHeader) remotingCommand.readCustomHeader(); return new MqttSubAckMessage( new MqttFixedHeader(MqttMessageType.SUBACK, mqttHeader.isDup(), MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(), mqttHeader.getRemainingLength()), - MqttMessageIdVariableHeader.from(mqttHeader.getMessageId()), new MqttSubAckPayload()); + MqttMessageIdVariableHeader.from(mqttHeader.getMessageId()), (MqttSubAckPayload) remotingCommand.getPayload()); } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubscribeEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubscribeEncodeDecode.java index 33867281..dec009cf 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubscribeEncodeDecode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubscribeEncodeDecode.java @@ -21,19 +21,14 @@ import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; import io.netty.handler.codec.mqtt.MqttSubscribeMessage; -import org.apache.rocketmq.remoting.netty.CodecHelper; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; -import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttSubscribePayload; public class MqttSubscribeEncodeDecode implements Message2MessageEncodeDecode { @Override public RemotingCommand decode(MqttMessage mqttMessage) { - RocketMQMqttSubscribePayload payload = RocketMQMqttSubscribePayload - .fromMqttSubscribePayload(((MqttSubscribeMessage) mqttMessage).payload()); - - RemotingCommand requestCommand = null; + RemotingCommand requestCommand; MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader(); MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) mqttMessage .variableHeader(); @@ -49,9 +44,7 @@ public class MqttSubscribeEncodeDecode implements Message2MessageEncodeDecode { requestCommand = RemotingCommand .createRequestCommand(1000, mqttHeader); - CodecHelper.makeCustomHeaderToNet(requestCommand); - - requestCommand.setBody(payload.encode()); + requestCommand.setPayload(((MqttSubscribeMessage) mqttMessage).payload()); return requestCommand; } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubackEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubackEncodeDecode.java index 21f51b64..df0a1d46 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubackEncodeDecode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubackEncodeDecode.java @@ -36,8 +36,7 @@ public class MqttUnSubackEncodeDecode implements Message2MessageEncodeDecode { @Override public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException { - MqttHeader mqttHeader = (MqttHeader) remotingCommand - .decodeCommandCustomHeader(MqttHeader.class); + MqttHeader mqttHeader = (MqttHeader) remotingCommand.readCustomHeader(); return new MqttUnsubAckMessage( new MqttFixedHeader(MqttMessageType.UNSUBACK, mqttHeader.isDup(), diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubscribeEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubscribeEncodeDecode.java index 88c8518b..957754bb 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubscribeEncodeDecode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubscribeEncodeDecode.java @@ -21,19 +21,14 @@ import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage; -import org.apache.rocketmq.remoting.netty.CodecHelper; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; -import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttUnSubscribePayload; public class MqttUnSubscribeEncodeDecode implements Message2MessageEncodeDecode { @Override public RemotingCommand decode(MqttMessage mqttMessage) { - RocketMQMqttUnSubscribePayload payload = RocketMQMqttUnSubscribePayload - .fromMqttUnSubscribePayload(((MqttUnsubscribeMessage) mqttMessage).payload()); - - RemotingCommand requestCommand = null; + RemotingCommand requestCommand; MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader(); MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) mqttMessage .variableHeader(); @@ -44,14 +39,10 @@ public class MqttUnSubscribeEncodeDecode implements Message2MessageEncodeDecode mqttHeader.setQosLevel(mqttFixedHeader.qosLevel().value()); mqttHeader.setRetain(mqttFixedHeader.isRetain()); mqttHeader.setRemainingLength(mqttFixedHeader.remainingLength()); - mqttHeader.setMessageId(variableHeader.messageId()); - requestCommand = RemotingCommand - .createRequestCommand(1000, mqttHeader); - CodecHelper.makeCustomHeaderToNet(requestCommand); - - requestCommand.setBody(payload.encode()); + requestCommand = RemotingCommand.createRequestCommand(1000, mqttHeader); + requestCommand.setPayload(((MqttUnsubscribeMessage) mqttMessage).payload()); return requestCommand; } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java index 1d6397c0..f06fa0f4 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java @@ -72,6 +72,7 @@ import org.apache.rocketmq.snode.service.WillMessageService; import org.apache.rocketmq.snode.service.impl.ClientServiceImpl; import org.apache.rocketmq.snode.service.impl.LocalEnodeServiceImpl; import org.apache.rocketmq.snode.service.impl.MetricsServiceImpl; +import org.apache.rocketmq.snode.service.impl.MqttPushServiceImpl; import org.apache.rocketmq.snode.service.impl.NnodeServiceImpl; import org.apache.rocketmq.snode.service.impl.PushServiceImpl; import org.apache.rocketmq.snode.service.impl.RemoteEnodeServiceImpl; @@ -118,6 +119,7 @@ public class SnodeController { private SlowConsumerService slowConsumerService; private MetricsService metricsService; private WillMessageService willMessageService; + private MqttPushServiceImpl mqttPushService; private final ScheduledExecutorService scheduledExecutorService = Executors .newSingleThreadScheduledExecutor(new ThreadFactoryImpl( @@ -136,11 +138,15 @@ public class SnodeController { } this.nnodeService = new NnodeServiceImpl(this); this.scheduledService = new ScheduledServiceImpl(this); - this.remotingClient = RemotingClientFactory.getInstance().createRemotingClient() - .init(this.getNettyClientConfig(), null); + this.remotingClient = RemotingClientFactory.getInstance().createRemotingClient(); + if (this.remotingClient != null) { + this.remotingClient.init(this.getNettyClientConfig(), null); + } this.mqttRemotingClient = RemotingClientFactory.getInstance() - .createRemotingClient(RemotingUtil.MQTT_PROTOCOL) - .init(this.getNettyClientConfig(), null); + .createRemotingClient(RemotingUtil.MQTT_PROTOCOL); + if (this.mqttRemotingClient != null) { + this.mqttRemotingClient.init(this.getNettyClientConfig(), null); + } this.sendMessageExecutor = ThreadUtils.newThreadPoolExecutor( snodeConfig.getSnodeSendMessageMinPoolSize(), @@ -211,6 +217,7 @@ public class SnodeController { this.slowConsumerService = new SlowConsumerServiceImpl(this); this.metricsService = new MetricsServiceImpl(); this.willMessageService = new WillMessageServiceImpl(this); + this.mqttPushService = new MqttPushServiceImpl(this); } public SnodeConfig getSnodeConfig() { @@ -233,15 +240,21 @@ public class SnodeController { } public boolean initialize() { - this.snodeServer = RemotingServerFactory.getInstance().createRemotingServer().init(this.nettyServerConfig, this.clientHousekeepingService); - this.mqttRemotingServer = RemotingServerFactory.getInstance().createRemotingServer(RemotingUtil.MQTT_PROTOCOL) - .init(this.nettyServerConfig, this.clientHousekeepingService); - this.registerProcessor(); initSnodeInterceptorGroup(); initRemotingServerInterceptorGroup(); initAclInterceptorGroup(); - this.snodeServer.registerInterceptorGroup(this.remotingServerInterceptorGroup); - this.mqttRemotingServer.registerInterceptorGroup(this.remotingServerInterceptorGroup); + this.snodeServer = RemotingServerFactory.getInstance().createRemotingServer(); + if (this.snodeServer != null) { + this.snodeServer.init(this.nettyServerConfig, this.clientHousekeepingService); + this.snodeServer.registerInterceptorGroup(this.remotingServerInterceptorGroup); + } + this.mqttRemotingServer = RemotingServerFactory.getInstance().createRemotingServer( + RemotingUtil.MQTT_PROTOCOL); + if (this.mqttRemotingServer != null) { + this.mqttRemotingServer.init(this.nettyServerConfig, this.clientHousekeepingService); + this.mqttRemotingServer.registerInterceptorGroup(this.remotingServerInterceptorGroup); + } + registerProcessor(); return true; } @@ -313,30 +326,40 @@ public class SnodeController { } public void registerProcessor() { - this.snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor); - this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor); - this.snodeServer.registerProcessor(RequestCode.HEART_BEAT, heartbeatProcessor, this.heartbeatExecutor); - this.snodeServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, heartbeatProcessor, this.heartbeatExecutor); - this.snodeServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, heartbeatProcessor, this.heartbeatExecutor); - this.snodeServer.registerProcessor(RequestCode.PULL_MESSAGE, pullMessageProcessor, this.pullMessageExecutor); - this.snodeServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor); - this.snodeServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); - this.snodeServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); - this.snodeServer.registerProcessor(RequestCode.GET_MIN_OFFSET, consumerManageProcessor, this.consumerManageExecutor); - this.snodeServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManageProcessor, this.consumerManageExecutor); - this.snodeServer.registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor, this.consumerManageExecutor); - this.snodeServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, consumerManageProcessor, this.consumerManageExecutor); - this.snodeServer.registerProcessor(RequestCode.LOCK_BATCH_MQ, consumerManageProcessor, this.consumerManageExecutor); - this.snodeServer.registerProcessor(RequestCode.UNLOCK_BATCH_MQ, consumerManageProcessor, this.consumerManageExecutor); - this.mqttRemotingServer.registerProcessor(RequestCode.MQTT_MESSAGE, defaultMqttMessageProcessor, handleMqttMessageExecutor); + if (snodeServer != null) { + this.snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor); + this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor); + this.snodeServer.registerProcessor(RequestCode.HEART_BEAT, heartbeatProcessor, this.heartbeatExecutor); + this.snodeServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, heartbeatProcessor, this.heartbeatExecutor); + this.snodeServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, heartbeatProcessor, this.heartbeatExecutor); + this.snodeServer.registerProcessor(RequestCode.PULL_MESSAGE, pullMessageProcessor, this.pullMessageExecutor); + this.snodeServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor); + this.snodeServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); + this.snodeServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); + this.snodeServer.registerProcessor(RequestCode.GET_MIN_OFFSET, consumerManageProcessor, this.consumerManageExecutor); + this.snodeServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManageProcessor, this.consumerManageExecutor); + this.snodeServer.registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor, this.consumerManageExecutor); + this.snodeServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, consumerManageProcessor, this.consumerManageExecutor); + this.snodeServer.registerProcessor(RequestCode.LOCK_BATCH_MQ, consumerManageProcessor, this.consumerManageExecutor); + this.snodeServer.registerProcessor(RequestCode.UNLOCK_BATCH_MQ, consumerManageProcessor, this.consumerManageExecutor); + } + if (mqttRemotingServer != null) { + this.mqttRemotingServer.registerProcessor(RequestCode.MQTT_MESSAGE, defaultMqttMessageProcessor, handleMqttMessageExecutor); + } } public void start() { initialize(); - this.snodeServer.start(); - this.mqttRemotingServer.start(); + if (snodeServer != null) { + this.snodeServer.start(); + } + if (mqttRemotingServer != null) { + this.mqttRemotingServer.start(); + } this.remotingClient.start(); - this.mqttRemotingClient.start(); + if (mqttRemotingClient != null) { + this.mqttRemotingClient.start(); + } this.scheduledService.startScheduleTask(); this.clientHousekeepingService.start(this.snodeConfig.getHouseKeepingInterval()); this.metricsService.start(this.snodeConfig.getMetricsExportPort()); @@ -526,4 +549,12 @@ public class SnodeController { WillMessageService willMessageService) { this.willMessageService = willMessageService; } + + public MqttPushServiceImpl getMqttPushService() { + return mqttPushService; + } + + public void setMqttPushService(MqttPushServiceImpl mqttPushService) { + this.mqttPushService = mqttPushService; + } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java index 70797711..a8d5b7dd 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java @@ -38,6 +38,7 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.ServerConfig; +import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.remoting.common.TlsMode; import org.apache.rocketmq.remoting.netty.TlsSystemConfig; import org.apache.rocketmq.remoting.protocol.RemotingCommand; @@ -119,6 +120,22 @@ public class SnodeStartup { System.exit(-2); } + MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), snodeConfig); + String namesrvAddr = snodeConfig.getNamesrvAddr(); + if (null != namesrvAddr) { + try { + String[] addrArray = namesrvAddr.split(";"); + for (String addr : addrArray) { + RemotingUtil.string2SocketAddress(addr); + } + } catch (Exception e) { + System.out.printf( + "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n", + namesrvAddr); + System.exit(-3); + } + } + MixAll.printObjectProperties(log, snodeConfig); MixAll.printObjectProperties(log, snodeConfig.getNettyServerConfig()); MixAll.printObjectProperties(log, snodeConfig.getNettyClientConfig()); diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/Client.java b/snode/src/main/java/org/apache/rocketmq/snode/client/Client.java index 5ebe7454..abaf30df 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/Client.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/Client.java @@ -44,6 +44,8 @@ public class Client { private boolean cleanSession; + private boolean willFlag; + private String snodeAddress; public ClientRole getClientRole() { @@ -71,13 +73,14 @@ public class Client { language == client.language && isConnected == client.isConnected && cleanSession == client.cleanSession && + willFlag == client.willFlag && snodeAddress == client.snodeAddress; } @Override public int hashCode() { return Objects.hash(clientRole, clientId, groups, remotingChannel, heartbeatInterval, - lastUpdateTimestamp, version, language, isConnected, cleanSession, snodeAddress); + lastUpdateTimestamp, version, language, isConnected, cleanSession, willFlag, snodeAddress); } public RemotingChannel getRemotingChannel() { @@ -144,6 +147,14 @@ public class Client { this.cleanSession = cleanSession; } + public boolean isWillFlag() { + return willFlag; + } + + public void setWillFlag(boolean willFlag) { + this.willFlag = willFlag; + } + public String getSnodeAddress() { return snodeAddress; } @@ -173,6 +184,7 @@ public class Client { ", language=" + language + ", isConnected=" + isConnected + ", cleanSession=" + cleanSession + + ", willFlag=" + willFlag + ", snodeAddress=" + snodeAddress + '}'; } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java b/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java index 5e75322c..899a9ef2 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java @@ -43,7 +43,7 @@ public class ClientHousekeepingService implements ChannelEventListener { public void start(long interval) { this.producerManager.startScan(interval); this.consumerManager.startScan(interval); - this.iotClientManager.startScan(interval); +// this.iotClientManager.startScan(interval); } public void shutdown() { diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientManagerImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientManagerImpl.java index 133f346a..d0cddcec 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientManagerImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientManagerImpl.java @@ -144,7 +144,7 @@ public abstract class ClientManagerImpl implements ClientManager { return updated; } - private void removeClient(String groupId, RemotingChannel remotingChannel) { + protected void removeClient(String groupId, RemotingChannel remotingChannel) { ConcurrentHashMap channelTable = groupClientTable.get(groupId); if (channelTable != null) { Client prev = channelTable.remove(remotingChannel); @@ -211,6 +211,9 @@ public abstract class ClientManagerImpl implements ClientManager { } ConcurrentHashMap channelClientMap = groupClientTable .get(groupId); + if (remotingChannel instanceof NettyChannelHandlerContextImpl) { + remotingChannel = new NettyChannelImpl(((NettyChannelHandlerContextImpl) remotingChannel).getChannelHandlerContext().channel()); + } return channelClientMap.get(remotingChannel); } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/IOTClientManagerImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/IOTClientManagerImpl.java index 442850f0..66ec2f7e 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/IOTClientManagerImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/IOTClientManagerImpl.java @@ -19,9 +19,10 @@ package org.apache.rocketmq.snode.client.impl; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.common.protocol.heartbeat.MqttSubscriptionData; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.RemotingChannel; @@ -35,7 +36,7 @@ public class IOTClientManagerImpl extends ClientManagerImpl { public static final String IOT_GROUP = "IOT_GROUP"; private final SnodeController snodeController; - private final ConcurrentHashMap>> topic2SubscriptionTable = new ConcurrentHashMap<>( + private final ConcurrentHashMap>> topic2SubscriptionTable = new ConcurrentHashMap<>( 1024); private final ConcurrentHashMap clientId2Subscription = new ConcurrentHashMap<>(1024); @@ -47,9 +48,25 @@ public class IOTClientManagerImpl extends ClientManagerImpl { //do the logic when client sends unsubscribe packet. } + @Override public void onClose(Set groups, RemotingChannel remotingChannel) { + for (String groupId : groups) { + //remove client after invoking onClosed method(client may be used in onClosed) + onClosed(groupId, remotingChannel); + removeClient(groupId, remotingChannel); + } + } + @Override public void onClosed(String group, RemotingChannel remotingChannel) { //do the logic when connection is closed by any reason. + //step1. Clean subscription data if cleanSession=1 + Client client = this.getClient(IOT_GROUP, remotingChannel); + if (client.isCleanSession()) { + cleanSessionState(client.getClientId()); + } + //step2. Publish will message associated with current connection(Question: Does will message need to be deleted after publishing.) + + //step3. If will retain is true, add the will message to retain message. } @Override @@ -63,10 +80,10 @@ public class IOTClientManagerImpl extends ClientManagerImpl { public void cleanSessionState(String clientId) { clientId2Subscription.remove(clientId); - for (Iterator>>> iterator = topic2SubscriptionTable.entrySet().iterator(); iterator.hasNext(); ) { - Map.Entry>> next = iterator.next(); - for (Iterator>> iterator1 = next.getValue().entrySet().iterator(); iterator1.hasNext(); ) { - Map.Entry> next1 = iterator1.next(); + for (Iterator>>> iterator = topic2SubscriptionTable.entrySet().iterator(); iterator.hasNext(); ) { + Map.Entry>> next = iterator.next(); + for (Iterator>> iterator1 = next.getValue().entrySet().iterator(); iterator1.hasNext(); ) { + Map.Entry> next1 = iterator1.next(); if (!next1.getKey().getClientId().equals(clientId)) { continue; } @@ -84,7 +101,7 @@ public class IOTClientManagerImpl extends ClientManagerImpl { return snodeController; } - public ConcurrentHashMap>> getTopic2SubscriptionTable() { + public ConcurrentHashMap>> getTopic2SubscriptionTable() { return topic2SubscriptionTable; } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java index 80ff2d1f..8ae89844 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java @@ -17,15 +17,20 @@ package org.apache.rocketmq.snode.processor; -import com.alibaba.fastjson.JSON; +import io.netty.buffer.ByteBuf; import io.netty.handler.codec.mqtt.MqttConnectMessage; +import io.netty.handler.codec.mqtt.MqttConnectPayload; import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttPublishMessage; +import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttSubscribeMessage; +import io.netty.handler.codec.mqtt.MqttSubscribePayload; import java.io.UnsupportedEncodingException; -import java.nio.charset.Charset; import java.util.HashMap; import java.util.Map; import org.apache.rocketmq.common.constant.LoggerName; @@ -33,11 +38,9 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RequestProcessor; -import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; -import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload; import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.processor.mqtthandler.MessageHandler; import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler; @@ -83,20 +86,31 @@ public class DefaultMqttMessageProcessor implements RequestProcessor { @Override public RemotingCommand processRequest(RemotingChannel remotingChannel, RemotingCommand message) throws RemotingCommandException, UnsupportedEncodingException { - MqttHeader mqttHeader = (MqttHeader) message.decodeCommandCustomHeader(MqttHeader.class); + MqttHeader mqttHeader = (MqttHeader) message.readCustomHeader(); MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.valueOf(mqttHeader.getMessageType()), mqttHeader.isDup(), MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(), mqttHeader.getRemainingLength()); MqttMessage mqttMessage = null; switch (fixedHeader.messageType()) { case CONNECT: - MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader( + MqttConnectVariableHeader mqttConnectVariableHeader = new MqttConnectVariableHeader( mqttHeader.getName(), mqttHeader.getVersion(), mqttHeader.isHasUserName(), mqttHeader.isHasPassword(), mqttHeader.isWillRetain(), mqttHeader.getWillQos(), mqttHeader.isWillFlag(), mqttHeader.isCleanSession(), mqttHeader.getKeepAliveTimeSeconds()); - RocketMQMqttConnectPayload payload = decode(message.getBody(), RocketMQMqttConnectPayload.class); - mqttMessage = new MqttConnectMessage(fixedHeader, variableHeader, payload.toMqttConnectPayload()); + MqttConnectPayload mqttConnectPayload = (MqttConnectPayload) message.getPayload(); + mqttMessage = new MqttConnectMessage(fixedHeader, mqttConnectVariableHeader, mqttConnectPayload); + break; + case PUBLISH: + MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(mqttHeader.getTopicName(), mqttHeader.getPacketId()); + mqttMessage = new MqttPublishMessage(fixedHeader, mqttPublishVariableHeader, (ByteBuf) message.getPayload()); + break; + case SUBSCRIBE: + MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(mqttHeader.getMessageId()); + mqttMessage = new MqttSubscribeMessage(fixedHeader, mqttMessageIdVariableHeader, (MqttSubscribePayload) message.getPayload()); + break; + case UNSUBSCRIBE: + case PINGREQ: case DISCONNECT: } return type2handler.get(MqttMessageType.valueOf(mqttHeader.getMessageType())).handleMessage(mqttMessage, remotingChannel); @@ -107,11 +121,6 @@ public class DefaultMqttMessageProcessor implements RequestProcessor { return false; } - private T decode(final byte[] data, Class classOfT) { - final String json = new String(data, Charset.forName(RemotingUtil.REMOTING_CHARSET)); - return JSON.parseObject(json, classOfT); - } - private void registerMessageHandler(MqttMessageType type, MessageHandler handler) { type2handler.put(type, handler); } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttConnectMessageHandler.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttConnectMessageHandler.java index 8f4a412f..96f2843b 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttConnectMessageHandler.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttConnectMessageHandler.java @@ -23,6 +23,7 @@ import io.netty.handler.codec.mqtt.MqttConnectReturnCode; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttQoS; +import java.util.HashSet; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.mqtt.WillMessage; @@ -92,6 +93,7 @@ public class MqttConnectMessageHandler implements MessageHandler { } //treat a second CONNECT packet as a protocol violation and disconnect if (isConnected(remotingChannel, payload.clientIdentifier())) { + log.error("This client has been connected. The second CONNECT packet is treated as a protocol vialation and the connection will be closed."); remotingChannel.close(); return null; } @@ -118,6 +120,7 @@ public class MqttConnectMessageHandler implements MessageHandler { Client client = new Client(); client.setClientId(payload.clientIdentifier()); client.setClientRole(ClientRole.IOTCLIENT); + client.setGroups(new HashSet(){{add("IOT_GROUP");}}); client.setConnected(true); client.setRemotingChannel(remotingChannel); client.setLastUpdateTimestamp(System.currentTimeMillis()); diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttDisconnectMessageHandler.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttDisconnectMessageHandler.java index b5229478..66affac4 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttDisconnectMessageHandler.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttDisconnectMessageHandler.java @@ -62,7 +62,9 @@ public class MqttDisconnectMessageHandler implements MessageHandler { snodeController.getWillMessageService().deleteWillMessage(client.getClientId()); } client.setConnected(false); - remotingChannel.close(); + if (remotingChannel.isActive()) { + remotingChannel.close(); + } return null; } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPublishMessageHandler.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPublishMessageHandler.java index 6fec9f48..17776ea0 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPublishMessageHandler.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPublishMessageHandler.java @@ -17,13 +17,27 @@ package org.apache.rocketmq.snode.processor.mqtthandler; +import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttPublishMessage; +import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; +import io.netty.handler.codec.mqtt.MqttQoS; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; import org.apache.rocketmq.snode.SnodeController; +import org.apache.rocketmq.snode.exception.WrongMessageTypeException; +import org.apache.rocketmq.snode.util.MqttUtil; public class MqttPublishMessageHandler implements MessageHandler { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); + private final SnodeController snodeController; public MqttPublishMessageHandler(SnodeController snodeController) { @@ -32,7 +46,40 @@ public class MqttPublishMessageHandler implements MessageHandler { @Override public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) { + if (!(message instanceof MqttPublishMessage)) { + log.error("Wrong message type! Expected type: PUBLISH but {} was received.", message.fixedHeader().messageType()); + throw new WrongMessageTypeException("Wrong message type exception."); + } + MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) message; + MqttFixedHeader fixedHeader = mqttPublishMessage.fixedHeader(); + MqttPublishVariableHeader variableHeader = mqttPublishMessage.variableHeader(); + if (MqttUtil.isQosLegal(fixedHeader.qosLevel())) { + log.error("The QoS level should be 0 or 1 or 2. The connection will be closed. remotingChannel={}, MqttMessage={}", remotingChannel.toString(), message.toString()); + remotingChannel.close(); + return null; + } + + ByteBuf payload = mqttPublishMessage.payload(); + if (fixedHeader.qosLevel().equals(MqttQoS.AT_MOST_ONCE)) { + snodeController.getMqttPushService().pushMessageQos0(variableHeader.topicName(), payload); + } else if (fixedHeader.qosLevel().equals(MqttQoS.AT_LEAST_ONCE)) { + //Push messages to subscribers and add it to IN-FLIGHT messages + } + if (fixedHeader.qosLevel().value() > 0) { + RemotingCommand command = RemotingCommand.createResponseCommand(MqttHeader.class); + MqttHeader mqttHeader = (MqttHeader) command.readCustomHeader(); + if (fixedHeader.qosLevel().equals(MqttQoS.AT_MOST_ONCE)) { + mqttHeader.setMessageType(MqttMessageType.PUBACK.value()); + mqttHeader.setDup(false); + mqttHeader.setQosLevel(MqttQoS.AT_MOST_ONCE.value()); + mqttHeader.setRetain(false); + mqttHeader.setRemainingLength(2); + mqttHeader.setPacketId(0); + } else if (fixedHeader.qosLevel().equals(MqttQoS.AT_LEAST_ONCE)) { + //PUBREC/PUBREL/PUBCOMP + } + return command; + } return null; } - } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttSubscribeMessageHandler.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttSubscribeMessageHandler.java index 5bbec46d..59999967 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttSubscribeMessageHandler.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttSubscribeMessageHandler.java @@ -25,7 +25,9 @@ import io.netty.handler.codec.mqtt.MqttSubscribeMessage; import io.netty.handler.codec.mqtt.MqttSubscribePayload; import io.netty.handler.codec.mqtt.MqttTopicSubscription; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.protocol.ResponseCode; @@ -36,7 +38,6 @@ import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; -import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttSubAckPayload; import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.client.Client; import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl; @@ -80,6 +81,7 @@ public class MqttSubscribeMessageHandler implements MessageHandler { if (payload.topicSubscriptions() == null || payload.topicSubscriptions().size() == 0) { log.error("The payload of a SUBSCRIBE packet MUST contain at least one Topic Filter / QoS pair. This will be treated as protocol violation and the connection will be closed. remotingChannel={}, MqttMessage={}", remotingChannel.toString(), message.toString()); remotingChannel.close(); + return null; } if (isQosLegal(payload.topicSubscriptions())) { log.error("The QoS level of Topic Filter / QoS pairs should be 0 or 1 or 2. The connection will be closed. remotingChannel={}, MqttMessage={}", remotingChannel.toString(), message.toString()); @@ -97,12 +99,13 @@ public class MqttSubscribeMessageHandler implements MessageHandler { mqttHeader.setDup(false); mqttHeader.setQosLevel(MqttQoS.AT_MOST_ONCE.value()); mqttHeader.setRetain(false); -// mqttHeader.setRemainingLength(0x02); mqttHeader.setMessageId(mqttSubscribeMessage.variableHeader().messageId()); List grantQoss = doSubscribe(client, payload.topicSubscriptions(), iotClientManager); - RocketMQMqttSubAckPayload ackPayload = RocketMQMqttSubAckPayload.fromMqttSubAckPayload(new MqttSubAckPayload(grantQoss)); - command.setBody(ackPayload.encode()); + //Publish retained messages to subscribers. + MqttSubAckPayload mqttSubAckPayload = new MqttSubAckPayload(grantQoss); + command.setPayload(mqttSubAckPayload); + mqttHeader.setRemainingLength(0x02 + mqttSubAckPayload.grantedQoSLevels().size()); command.setRemark(null); command.setCode(ResponseCode.SUCCESS); return command; @@ -111,9 +114,9 @@ public class MqttSubscribeMessageHandler implements MessageHandler { private List doSubscribe(Client client, List mqttTopicSubscriptions, IOTClientManagerImpl iotClientManager) { //do the logic when client sends subscribe packet. - //1.register clientId2Subscription + //1.update clientId2Subscription ConcurrentHashMap clientId2Subscription = iotClientManager.getClientId2Subscription(); - ConcurrentHashMap>> topic2SubscriptionTable = iotClientManager.getTopic2SubscriptionTable(); + ConcurrentHashMap>> topic2SubscriptionTable = iotClientManager.getTopic2SubscriptionTable(); Subscription subscription = null; if (clientId2Subscription.containsKey(client.getClientId())) { subscription = clientId2Subscription.get(client.getClientId()); @@ -128,7 +131,25 @@ public class MqttSubscribeMessageHandler implements MessageHandler { grantQoss.add(actualQos); SubscriptionData subscriptionData = new MqttSubscriptionData(mqttTopicSubscription.qualityOfService().value(), client.getClientId(), mqttTopicSubscription.topicName()); subscriptionDatas.put(mqttTopicSubscription.topicName(), subscriptionData); - //2.register topic2SubscriptionTable + //2.update topic2SubscriptionTable + String rootTopic = MqttUtil.getRootTopic(mqttTopicSubscription.topicName()); + ConcurrentHashMap> client2SubscriptionData = topic2SubscriptionTable.get(rootTopic); + if (client2SubscriptionData == null) { + client2SubscriptionData = new ConcurrentHashMap<>(); + ConcurrentHashMap> prev = topic2SubscriptionTable.putIfAbsent(rootTopic, client2SubscriptionData); + if (prev != null) { + client2SubscriptionData = prev; + } + Set subscriptionDataSet = client2SubscriptionData.get(client); + if (subscriptionDataSet == null) { + subscriptionDataSet = new HashSet<>(); + Set prevSubscriptionDataSet = client2SubscriptionData.putIfAbsent(client, subscriptionDataSet); + if (prevSubscriptionDataSet != null) { + subscriptionDataSet = prevSubscriptionDataSet; + } + subscriptionDataSet.add(subscriptionData); + } + } } return grantQoss; } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttUnsubscribeMessagHandler.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttUnsubscribeMessagHandler.java index d2feaa58..38f6519d 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttUnsubscribeMessagHandler.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttUnsubscribeMessagHandler.java @@ -17,34 +17,109 @@ package org.apache.rocketmq.snode.processor.mqtthandler; +import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage; +import io.netty.handler.codec.mqtt.MqttUnsubscribePayload; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; import org.apache.rocketmq.snode.SnodeController; +import org.apache.rocketmq.snode.client.Client; +import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl; +import org.apache.rocketmq.snode.client.impl.Subscription; +import org.apache.rocketmq.snode.exception.WrongMessageTypeException; +import org.apache.rocketmq.snode.util.MqttUtil; /** * handle the UNSUBSCRIBE message from the client - *
    - *
  1. extract topic filters to be un-subscribed
  2. - *
  3. get the topics matching with the topic filters
  4. - *
  5. verify the authorization of the client to the
  6. - *
  7. remove subscription from the SubscriptionStore
  8. - *
*/ public class MqttUnsubscribeMessagHandler implements MessageHandler { -/* private SubscriptionStore subscriptionStore; - - public MqttUnsubscribeMessagHandler(SubscriptionStore subscriptionStore) { - this.subscriptionStore = subscriptionStore; - }*/ + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); private final SnodeController snodeController; public MqttUnsubscribeMessagHandler(SnodeController snodeController) { this.snodeController = snodeController; } + @Override public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) { - return null; + if (!(message instanceof MqttUnsubscribeMessage)) { + log.error("Wrong message type! Expected type: UNSUBSCRIBE but {} was received. MqttMessage={}", message.fixedHeader().messageType(), message.toString()); + throw new WrongMessageTypeException("Wrong message type exception."); + } + MqttUnsubscribeMessage unsubscribeMessage = (MqttUnsubscribeMessage) message; + MqttFixedHeader fixedHeader = unsubscribeMessage.fixedHeader(); + if (fixedHeader.isDup() || !fixedHeader.qosLevel().equals(MqttQoS.AT_LEAST_ONCE) || fixedHeader.isRetain()) { + log.error("Malformed value of reserved bits(bits 3,2,1,0) of fixed header. Expected=0010, received={}{}{}{}", fixedHeader.isDup() ? 1 : 0, Integer.toBinaryString(fixedHeader.qosLevel().value()), fixedHeader.isRetain() ? 1 : 0); + remotingChannel.close(); + return null; + } + MqttUnsubscribePayload payload = unsubscribeMessage.payload(); + if (payload.topics() == null || payload.topics().size() == 0) { + log.error("The payload of a UNSUBSCRIBE packet MUST contain at least one Topic Filter. This will be treated as protocol violation and the connection will be closed. remotingChannel={}, MqttMessage={}", remotingChannel.toString(), message.toString()); + remotingChannel.close(); + return null; + } + IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) snodeController.getIotClientManager(); + Client client = iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel); + if (client == null) { + log.error("Can't find associated client, the connection will be closed. remotingChannel={}, MqttMessage={}", remotingChannel.toString(), message.toString()); + remotingChannel.close(); + return null; + } + + RemotingCommand command = RemotingCommand.createResponseCommand(MqttHeader.class); + MqttHeader mqttHeader = (MqttHeader) command.readCustomHeader(); + mqttHeader.setMessageType(MqttMessageType.SUBACK.value()); + mqttHeader.setDup(false); + mqttHeader.setQosLevel(MqttQoS.AT_MOST_ONCE.value()); + mqttHeader.setRetain(false); + mqttHeader.setMessageId(unsubscribeMessage.variableHeader().messageId()); + + doUnsubscribe(client, payload.topics(), iotClientManager); + + mqttHeader.setRemainingLength(0x02); + command.setRemark(null); + command.setCode(ResponseCode.SUCCESS); + return command; + } + + private void doUnsubscribe(Client client, List topics, IOTClientManagerImpl iotClientManager) { + ConcurrentHashMap clientId2Subscription = iotClientManager.getClientId2Subscription(); + ConcurrentHashMap>> topic2SubscriptionTable = iotClientManager.getTopic2SubscriptionTable(); + + for (String topicFilter : topics) { + //1.update clientId2Subscription + if (clientId2Subscription.containsKey(client.getClientId())) { + Subscription subscription = clientId2Subscription.get(client.getClientId()); + subscription.getSubscriptionTable().remove(topicFilter); + } + //2.update topic2SubscriptionTable + String rootTopic = MqttUtil.getRootTopic(topicFilter); + ConcurrentHashMap> client2SubscriptionData = topic2SubscriptionTable.get(rootTopic); + if (client2SubscriptionData != null) { + Set subscriptionDataSet = client2SubscriptionData.get(client); + if (subscriptionDataSet != null) { + Iterator iterator = subscriptionDataSet.iterator(); + while (iterator.hasNext()) { + if (iterator.next().getTopic().equals(topicFilter)) + iterator.remove(); + } + } + } + } } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/MqttPushServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/MqttPushServiceImpl.java new file mode 100644 index 00000000..ea8b9734 --- /dev/null +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/MqttPushServiceImpl.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.snode.service.impl; + +import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.util.ReferenceCountUtil; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.common.utils.ThreadUtils; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.remoting.RemotingChannel; +import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl; +import org.apache.rocketmq.remoting.netty.NettyChannelImpl; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; +import org.apache.rocketmq.snode.SnodeController; +import org.apache.rocketmq.snode.client.Client; +import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl; +import org.apache.rocketmq.snode.constant.SnodeConstant; +import org.apache.rocketmq.snode.util.MqttUtil; + +public class MqttPushServiceImpl { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); + + private SnodeController snodeController; + private ExecutorService pushMqttMessageExecutorService; + + public MqttPushServiceImpl(final SnodeController snodeController) { + this.snodeController = snodeController; + pushMqttMessageExecutorService = ThreadUtils.newThreadPoolExecutor( + this.snodeController.getSnodeConfig().getSnodePushMqttMessageMinPoolSize(), + this.snodeController.getSnodeConfig().getSnodePushMqttMessageMaxPoolSize(), + 3000, + TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(this.snodeController.getSnodeConfig().getSnodePushMqttMessageThreadPoolQueueCapacity()), + "SnodePushMqttMessageThread", + false); + } + + public class MqttPushTask implements Runnable { + private AtomicBoolean canceled = new AtomicBoolean(false); + private final ByteBuf message; + private final String topic; + private final Integer qos; + private boolean retain; + private Integer packetId; + + public MqttPushTask(final String topic, final ByteBuf message, final Integer qos, boolean retain, + Integer packetId) { + this.message = message; + this.topic = topic; + this.qos = qos; + this.retain = retain; + this.packetId = packetId; + } + + @Override + public void run() { + if (!canceled.get()) { + try { + RemotingCommand requestCommand = buildRequestCommand(topic, qos, retain, packetId); + + //find those clients publishing the message to + IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) snodeController.getIotClientManager(); + ConcurrentHashMap>> topic2SubscriptionTable = iotClientManager.getTopic2SubscriptionTable(); + Set clients = new HashSet<>(); + if (topic2SubscriptionTable.containsKey(MqttUtil.getRootTopic(topic))) { + ConcurrentHashMap> client2SubscriptionDatas = topic2SubscriptionTable.get(MqttUtil.getRootTopic(topic)); + for (Map.Entry> entry : client2SubscriptionDatas.entrySet()) { + Set subscriptionDatas = entry.getValue(); + for (SubscriptionData subscriptionData : subscriptionDatas) { + if (MqttUtil.isMatch(subscriptionData.getTopic(), topic)) { + clients.add(entry.getKey()); + break; + } + } + } + } + for (Client client : clients) { + RemotingChannel remotingChannel = client.getRemotingChannel(); + if (client.getRemotingChannel() instanceof NettyChannelHandlerContextImpl) { + remotingChannel = new NettyChannelImpl(((NettyChannelHandlerContextImpl) client.getRemotingChannel()).getChannelHandlerContext().channel()); + } + requestCommand.setPayload(message.copy()); + snodeController.getMqttRemotingServer().push(remotingChannel, requestCommand, SnodeConstant.DEFAULT_TIMEOUT_MILLS); + } + } catch (Exception ex) { + log.warn("Exception was thrown when pushing MQTT message to topic: {}, exception={}", topic, ex.getMessage()); + }finally { + System.out.println("Release Bytebuf"); + ReferenceCountUtil.release(message); + } + } else { + log.info("Push message to topic: {} canceled!", topic); + } + } + + private RemotingCommand buildRequestCommand(final String topic, final Integer qos, boolean retain, + Integer packetId) { + MqttHeader mqttHeader = new MqttHeader(); + mqttHeader.setMessageType(MqttMessageType.PUBLISH.value()); + if (qos == 0) { + mqttHeader.setDup(false);//DUP is always 0 for qos=0 messages + } else { + mqttHeader.setDup(false);//DUP is depending on whether it is a re-delivery of an earlier attempt. + } + mqttHeader.setQosLevel(qos); + mqttHeader.setRetain(retain); + mqttHeader.setPacketId(packetId); + mqttHeader.setTopicName(topic); + mqttHeader.setRemainingLength(4 + topic.getBytes().length + message.readableBytes()); + + RemotingCommand pushMessage = RemotingCommand.createRequestCommand(RequestCode.MQTT_MESSAGE, mqttHeader); +// pushMessage.setPayload(message); + return pushMessage; + } + + public void setCanceled(AtomicBoolean canceled) { + this.canceled = canceled; + } + + } + + public void pushMessageQos0(final String topic, final ByteBuf message) { + MqttPushTask pushTask = new MqttPushTask(topic, message, 0, false, 0); + pushMqttMessageExecutorService.submit(pushTask); + } + + public void pushMessageQos1(final String topic, final ByteBuf message, final Integer qos, boolean retain, + Integer packetId) { + MqttPushTask pushTask = new MqttPushTask(topic, message, qos, retain, packetId); + pushMqttMessageExecutorService.submit(pushTask); + } + + public void shutdown() { + this.pushMqttMessageExecutorService.shutdown(); + } +} diff --git a/snode/src/main/java/org/apache/rocketmq/snode/util/MqttUtil.java b/snode/src/main/java/org/apache/rocketmq/snode/util/MqttUtil.java index 6064491c..ef44a7ac 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/util/MqttUtil.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/util/MqttUtil.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.snode.util; +import io.netty.handler.codec.mqtt.MqttQoS; import java.util.UUID; import org.apache.rocketmq.snode.constant.MqttConstant; @@ -33,4 +34,33 @@ public class MqttUtil { public static int actualQos(int qos) { return Math.min(MqttConstant.MAX_SUPPORTED_QOS, qos); } + + public static boolean isQosLegal(MqttQoS qos) { + if (!qos.equals(MqttQoS.AT_LEAST_ONCE) && !qos.equals(MqttQoS.AT_MOST_ONCE) && !qos.equals(MqttQoS.EXACTLY_ONCE)) { + return false; + } + return false; + } + + public static boolean isMatch(String topicFiter, String topic) { + if (!topicFiter.contains(MqttConstant.SUBSCRIPTION_FLAG_PLUS) && !topicFiter.contains(MqttConstant.SUBSCRIPTION_FLAG_SHARP)) { + return topicFiter.equals(topic); + } + String[] filterTopics = topicFiter.split(MqttConstant.SUBSCRIPTION_SEPARATOR); + String[] actualTopics = topic.split(MqttConstant.SUBSCRIPTION_SEPARATOR); + + int i = 0; + for (; i < filterTopics.length && i < actualTopics.length; i++) { + if (MqttConstant.SUBSCRIPTION_FLAG_PLUS.equals(filterTopics[i])) { + continue; + } + if (MqttConstant.SUBSCRIPTION_FLAG_SHARP.equals(filterTopics[i])) { + return true; + } + if (!filterTopics[i].equals(actualTopics[i])) { + return false; + } + } + return i == actualTopics.length; + } } diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java b/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java index 85a862f4..6f75a326 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java @@ -16,23 +16,18 @@ */ package org.apache.rocketmq.snode.processor; -import com.alibaba.fastjson.JSON; import io.netty.handler.codec.mqtt.MqttConnectPayload; import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttQoS; import java.io.UnsupportedEncodingException; -import java.nio.charset.Charset; import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.ServerConfig; -import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -import org.apache.rocketmq.remoting.netty.CodecHelper; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; -import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload; import org.apache.rocketmq.snode.SnodeController; import org.junit.Before; import org.junit.Test; @@ -93,16 +88,7 @@ public class DefaultMqttMessageProcessorTest { MqttHeader mqttHeader = createMqttConnectMesssageHeader(); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.MQTT_MESSAGE, mqttHeader); MqttConnectPayload payload = new MqttConnectPayload("1234567", "testTopic", "willMessage".getBytes(), null, "1234567".getBytes()); - request.setBody(RocketMQMqttConnectPayload.fromMqttConnectPayload(payload).encode()); - CodecHelper.makeCustomHeaderToNet(request); + request.setPayload(payload); return request; } - - private byte[] encode(Object obj) { - String json = JSON.toJSONString(obj, false); - if (json != null) { - return json.getBytes(Charset.forName(RemotingUtil.REMOTING_CHARSET)); - } - return null; - } } -- GitLab