From 5b561494177c33b5d6e034a6bc3917a88623bebf Mon Sep 17 00:00:00 2001 From: chengxiangwang Date: Mon, 11 Feb 2019 22:11:46 +0800 Subject: [PATCH] add SessionManager, WillMessageService;finish CONNECT/DISCONNECT logic --- .../common/message/mqtt/RetainMessage.java | 47 ++++ .../common/message/mqtt/WillMessage.java | 64 ++++++ .../remoting/transport/mqtt/MqttHeader.java | 7 +- .../transport/mqtt/MqttRemotingServer.java | 12 +- .../RemotingCommand2MqttMessageHandler.java | 17 +- .../mqtt/RocketMQMqttSubscribePayload.java | 65 ++++++ .../mqtt/RocketMQMqttUnSubscribePayload.java | 61 ++++++ .../Message2MessageEncodeDecode.java | 3 +- .../MqttConnectackEncodeDecode.java | 20 +- .../dispatcher/MqttPubackEncodeDecode.java | 48 ++++ .../dispatcher/MqttPublishEncodeDecode.java | 64 ++++++ .../dispatcher/MqttSubackEncodeDecode.java | 49 +++++ .../dispatcher/MqttSubscribeEncodeDecode.java | 62 ++++++ .../dispatcher/MqttUnSubackEncodeDecode.java | 48 ++++ .../MqttUnSubscribeEncodeDecode.java | 62 ++++++ .../rocketmq/snode/SnodeController.java | 205 +++++++++--------- .../apache/rocketmq/snode/client/Client.java | 54 +++-- .../snode/exception/MqttConnectException.java | 25 +++ .../DefaultMqttMessageProcessor.java | 34 ++- .../MqttConnectMessageHandler.java | 112 +++++++++- .../MqttDisconnectMessageHandler.java | 36 ++- .../snode/service/WillMessageService.java | 29 +++ .../snode/service/impl/ClientServiceImpl.java | 1 + .../service/impl/WillMessageServiceImpl.java | 48 ++++ .../rocketmq/snode/session/Session.java | 68 ++++++ .../snode/session/SessionManagerImpl.java | 70 ++++++ .../rocketmq/snode/util/MessageUtil.java | 178 --------------- .../apache/rocketmq/snode/util/MqttUtil.java | 28 +++ .../DefaultMqttMessageProcessorTest.java | 4 +- .../MqttDisconnectMessageHandlerTest.java | 60 +++++ .../service/WillMessageServiceImplTest.java | 57 +++++ 31 files changed, 1308 insertions(+), 330 deletions(-) create mode 100644 common/src/main/java/org/apache/rocketmq/common/message/mqtt/RetainMessage.java create mode 100644 common/src/main/java/org/apache/rocketmq/common/message/mqtt/WillMessage.java create mode 100644 remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttSubscribePayload.java create mode 100644 remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttUnSubscribePayload.java create mode 100644 remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPubackEncodeDecode.java create mode 100644 remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPublishEncodeDecode.java create mode 100644 remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubackEncodeDecode.java create mode 100644 remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubscribeEncodeDecode.java create mode 100644 remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubackEncodeDecode.java create mode 100644 remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubscribeEncodeDecode.java create mode 100644 snode/src/main/java/org/apache/rocketmq/snode/exception/MqttConnectException.java create mode 100644 snode/src/main/java/org/apache/rocketmq/snode/service/WillMessageService.java create mode 100644 snode/src/main/java/org/apache/rocketmq/snode/service/impl/WillMessageServiceImpl.java create mode 100644 snode/src/main/java/org/apache/rocketmq/snode/session/Session.java create mode 100644 snode/src/main/java/org/apache/rocketmq/snode/session/SessionManagerImpl.java delete mode 100644 snode/src/main/java/org/apache/rocketmq/snode/util/MessageUtil.java create mode 100644 snode/src/main/java/org/apache/rocketmq/snode/util/MqttUtil.java create mode 100644 snode/src/test/java/org/apache/rocketmq/snode/processor/MqttDisconnectMessageHandlerTest.java create mode 100644 snode/src/test/java/org/apache/rocketmq/snode/service/WillMessageServiceImplTest.java diff --git a/common/src/main/java/org/apache/rocketmq/common/message/mqtt/RetainMessage.java b/common/src/main/java/org/apache/rocketmq/common/message/mqtt/RetainMessage.java new file mode 100644 index 00000000..15e04ca0 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/message/mqtt/RetainMessage.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.message.mqtt; + +import io.netty.handler.codec.mqtt.MqttQoS; + +public class RetainMessage { + + private byte[] byteBuf; + + private MqttQoS qoS; + + public byte[] getByteBuf() { + return byteBuf; + } + + public void setByteBuf(byte[] byteBuf) { + this.byteBuf = byteBuf; + } + + public MqttQoS getQoS() { + return qoS; + } + + public void setQoS(MqttQoS qoS) { + this.qoS = qoS; + } + + public String getString() { + return new String(byteBuf); + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/message/mqtt/WillMessage.java b/common/src/main/java/org/apache/rocketmq/common/message/mqtt/WillMessage.java new file mode 100644 index 00000000..763e0d74 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/message/mqtt/WillMessage.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.message.mqtt; + +public class WillMessage { + + private String willTopic; + + private byte[] body; + + private boolean isRetain; + + private int qos; + + public String getWillTopic() { + return willTopic; + } + + public void setWillTopic(String willTopic) { + this.willTopic = willTopic; + } + + public byte[] getBody() { + return body; + } + + public void setBody(byte[] body) { + this.body = body; + } + + public boolean isRetain() { + return isRetain; + } + + public void setRetain(boolean retain) { + isRetain = retain; + } + + public int getQos() { + return qos; + } + + public void setQos(int qos) { + this.qos = qos; + } + public String getString() { + return new String(body); + } +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttHeader.java index a2327d06..d9c31b70 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttHeader.java @@ -20,7 +20,6 @@ */ package org.apache.rocketmq.remoting.transport.mqtt; -import io.netty.handler.codec.mqtt.MqttConnectReturnCode; import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; @@ -40,7 +39,7 @@ public class MqttHeader implements CommandCustomHeader { private int remainingLength; //variable header members - private MqttConnectReturnCode connectReturnCode; + private String connectReturnCode; private boolean sessionPresent; private String name; private Integer version; @@ -95,11 +94,11 @@ public class MqttHeader implements CommandCustomHeader { this.remainingLength = remainingLength; } - public MqttConnectReturnCode getConnectReturnCode() { + public String getConnectReturnCode() { return connectReturnCode; } - public void setConnectReturnCode(MqttConnectReturnCode connectReturnCode) { + public void setConnectReturnCode(String connectReturnCode) { this.connectReturnCode = connectReturnCode; } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttRemotingServer.java index 924ddca2..24ec854a 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttRemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttRemotingServer.java @@ -113,31 +113,31 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R } this.publicExecutor = ThreadUtils.newFixedThreadPool( publicThreadNums, - 10000, "Remoting-PublicExecutor", true); + 10000, "MqttRemoting-PublicExecutor", true); if (JvmUtils.isUseEpoll() && this.nettyServerConfig.isUseEpollNativeSelector()) { this.eventLoopGroupSelector = new EpollEventLoopGroup( serverConfig.getServerSelectorThreads(), - ThreadUtils.newGenericThreadFactory("NettyEpollIoThreads", + ThreadUtils.newGenericThreadFactory("MqttNettyEpollIoThreads", serverConfig.getServerSelectorThreads())); this.eventLoopGroupBoss = new EpollEventLoopGroup( serverConfig.getServerAcceptorThreads(), - ThreadUtils.newGenericThreadFactory("NettyBossThreads", + ThreadUtils.newGenericThreadFactory("MqttNettyBossThreads", serverConfig.getServerAcceptorThreads())); this.socketChannelClass = EpollServerSocketChannel.class; } else { this.eventLoopGroupBoss = new NioEventLoopGroup(serverConfig.getServerAcceptorThreads(), - ThreadUtils.newGenericThreadFactory("NettyBossThreads", + ThreadUtils.newGenericThreadFactory("MqttNettyBossThreads", serverConfig.getServerAcceptorThreads())); this.eventLoopGroupSelector = new NioEventLoopGroup( serverConfig.getServerSelectorThreads(), - ThreadUtils.newGenericThreadFactory("NettyNioIoThreads", + ThreadUtils.newGenericThreadFactory("MqttNettyNioIoThreads", serverConfig.getServerSelectorThreads())); this.socketChannelClass = NioServerSocketChannel.class; } this.port = nettyServerConfig.getMqttListenPort(); this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( serverConfig.getServerWorkerThreads(), - ThreadUtils.newGenericThreadFactory("NettyWorkerThreads", + ThreadUtils.newGenericThreadFactory("MqttNettyWorkerThreads", serverConfig.getServerWorkerThreads())); loadSslContext(); return this; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RemotingCommand2MqttMessageHandler.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RemotingCommand2MqttMessageHandler.java index bfd57e1b..3116d8c1 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RemotingCommand2MqttMessageHandler.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RemotingCommand2MqttMessageHandler.java @@ -19,8 +19,12 @@ package org.apache.rocketmq.remoting.transport.mqtt; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageEncoder; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageType; import java.util.List; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.transport.mqtt.dispatcher.EncodeDecodeDispatcher; +import org.apache.rocketmq.remoting.transport.mqtt.dispatcher.Message2MessageEncodeDecode; public class RemotingCommand2MqttMessageHandler extends MessageToMessageEncoder { @@ -38,6 +42,17 @@ public class RemotingCommand2MqttMessageHandler extends MessageToMessageEncoder< @Override protected void encode(ChannelHandlerContext ctx, RemotingCommand msg, List out) throws Exception { - + if (!(msg instanceof RemotingCommand)) { + return; + } + MqttMessage mqttMessage = null; + MqttHeader mqttHeader = (MqttHeader) msg.decodeCommandCustomHeader(MqttHeader.class); + Message2MessageEncodeDecode message2MessageEncodeDecode = EncodeDecodeDispatcher + .getEncodeDecodeDispatcher().get( + MqttMessageType.valueOf(mqttHeader.getMessageType())); + if (message2MessageEncodeDecode != null) { + mqttMessage = message2MessageEncodeDecode.encode(msg); + } + out.add(mqttMessage); } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttSubscribePayload.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttSubscribePayload.java new file mode 100644 index 00000000..fceca774 --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttSubscribePayload.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.transport.mqtt; + +import io.netty.handler.codec.mqtt.MqttSubscribeMessage; +import io.netty.handler.codec.mqtt.MqttSubscribePayload; +import io.netty.handler.codec.mqtt.MqttTopicSubscription; +import io.netty.util.internal.StringUtil; +import java.io.UnsupportedEncodingException; +import java.util.Collections; +import java.util.List; +import org.apache.rocketmq.remoting.serialize.RemotingSerializable; + +/** + * Payload of {@link MqttSubscribeMessage} + */ +public final class RocketMQMqttSubscribePayload extends RemotingSerializable { + + private List topicSubscriptions; + + public RocketMQMqttSubscribePayload(List topicSubscriptions) { + this.topicSubscriptions = Collections.unmodifiableList(topicSubscriptions); + } + + public List getTopicSubscriptions() { + return topicSubscriptions; + } + + public void setTopicSubscriptions( + List topicSubscriptions) { + this.topicSubscriptions = topicSubscriptions; + } + public static RocketMQMqttSubscribePayload fromMqttSubscribePayload(MqttSubscribePayload payload) { + return new RocketMQMqttSubscribePayload(payload.topicSubscriptions()); + } + + public MqttSubscribePayload toMqttSubscribePayload() throws UnsupportedEncodingException { + return new MqttSubscribePayload(this.topicSubscriptions); + } + @Override + public String toString() { + StringBuilder builder = new StringBuilder(StringUtil.simpleClassName(this)).append('['); + for (int i = 0; i < topicSubscriptions.size() - 1; i++) { + builder.append(topicSubscriptions.get(i)).append(", "); + } + builder.append(topicSubscriptions.get(topicSubscriptions.size() - 1)); + builder.append(']'); + return builder.toString(); + } +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttUnSubscribePayload.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttUnSubscribePayload.java new file mode 100644 index 00000000..dee32dd0 --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttUnSubscribePayload.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.transport.mqtt; + +import io.netty.handler.codec.mqtt.MqttUnsubscribePayload; +import io.netty.util.internal.StringUtil; +import java.io.UnsupportedEncodingException; +import java.util.Collections; +import java.util.List; +import org.apache.rocketmq.remoting.serialize.RemotingSerializable; + +/** + * Payload of the {@link io.netty.handler.codec.mqtt.MqttUnsubscribeMessage} + */ +public class RocketMQMqttUnSubscribePayload extends RemotingSerializable { + private List topics; + + public RocketMQMqttUnSubscribePayload(List topics) { + this.topics = topics; + } + + public List getTopics() { + return topics; + } + + public void setTopics(List topics) { + this.topics = Collections.unmodifiableList(topics); + } + public static RocketMQMqttUnSubscribePayload fromMqttUnSubscribePayload(MqttUnsubscribePayload payload) { + return new RocketMQMqttUnSubscribePayload(payload.topics()); + } + + public MqttUnsubscribePayload toMqttUnsubscribePayload() throws UnsupportedEncodingException { + return new MqttUnsubscribePayload(this.topics); + } + @Override + public String toString() { + StringBuilder builder = new StringBuilder(StringUtil.simpleClassName(this)).append('['); + for (int i = 0; i < topics.size() - 1; i++) { + builder.append("topicName = ").append(topics.get(i)).append(", "); + } + builder.append("topicName = ").append(topics.get(topics.size() - 1)) + .append(']'); + return builder.toString(); + } +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/Message2MessageEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/Message2MessageEncodeDecode.java index 2c373583..ad0570b2 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/Message2MessageEncodeDecode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/Message2MessageEncodeDecode.java @@ -18,11 +18,12 @@ package org.apache.rocketmq.remoting.transport.mqtt.dispatcher; import io.netty.handler.codec.mqtt.MqttMessage; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; public interface Message2MessageEncodeDecode { RemotingCommand decode(MqttMessage mqttMessage); - MqttMessage encode(RemotingCommand remotingCommand); + MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException; } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttConnectackEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttConnectackEncodeDecode.java index 0c603bf3..34a66423 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttConnectackEncodeDecode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttConnectackEncodeDecode.java @@ -17,8 +17,16 @@ package org.apache.rocketmq.remoting.transport.mqtt.dispatcher; +import io.netty.handler.codec.mqtt.MqttConnAckMessage; +import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader; +import io.netty.handler.codec.mqtt.MqttConnectReturnCode; +import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttQoS; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; public class MqttConnectackEncodeDecode implements Message2MessageEncodeDecode { @@ -28,7 +36,15 @@ public class MqttConnectackEncodeDecode implements Message2MessageEncodeDecode { } @Override - public MqttMessage encode(RemotingCommand remotingCommand) { - return null; + public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException { + MqttHeader mqttHeader = (MqttHeader) remotingCommand + .decodeCommandCustomHeader(MqttHeader.class); + + return new MqttConnAckMessage( + new MqttFixedHeader(MqttMessageType.CONNACK, mqttHeader.isDup(), + MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(), + mqttHeader.getRemainingLength()), new MqttConnAckVariableHeader( + MqttConnectReturnCode.valueOf(mqttHeader.getConnectReturnCode()), + mqttHeader.isSessionPresent())); } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPubackEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPubackEncodeDecode.java new file mode 100644 index 00000000..30c4562a --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPubackEncodeDecode.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.transport.mqtt.dispatcher; + +import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttPubAckMessage; +import io.netty.handler.codec.mqtt.MqttQoS; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; + +public class MqttPubackEncodeDecode implements Message2MessageEncodeDecode { + + @Override + public RemotingCommand decode(MqttMessage mqttMessage) { + return null; + } + + @Override + public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException { + MqttHeader mqttHeader = (MqttHeader) remotingCommand + .decodeCommandCustomHeader(MqttHeader.class); + + return new MqttPubAckMessage( + new MqttFixedHeader(MqttMessageType.PUBACK, mqttHeader.isDup(), + MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(), + mqttHeader.getRemainingLength()), + MqttMessageIdVariableHeader.from(mqttHeader.getMessageId())); + } +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPublishEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPublishEncodeDecode.java new file mode 100644 index 00000000..af2d58ec --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPublishEncodeDecode.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.transport.mqtt.dispatcher; + +import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader; +import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttPublishMessage; +import org.apache.rocketmq.remoting.netty.CodecHelper; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; + +public class MqttPublishEncodeDecode implements Message2MessageEncodeDecode { + + @Override + public RemotingCommand decode(MqttMessage mqttMessage) { + ByteBuf byteBuf = ((MqttPublishMessage) mqttMessage).payload(); + byte[] payload = new byte[byteBuf.readableBytes()]; + byteBuf.readBytes(payload); + + RemotingCommand requestCommand = null; + MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader(); + MqttConnAckVariableHeader variableHeader = (MqttConnAckVariableHeader) mqttMessage + .variableHeader(); + + MqttHeader mqttHeader = new MqttHeader(); + mqttHeader.setMessageType(mqttFixedHeader.messageType().value()); + mqttHeader.setDup(mqttFixedHeader.isDup()); + mqttHeader.setQosLevel(mqttFixedHeader.qosLevel().value()); + mqttHeader.setRetain(mqttFixedHeader.isRetain()); + mqttHeader.setRemainingLength(mqttFixedHeader.remainingLength()); + + mqttHeader.setConnectReturnCode(variableHeader.connectReturnCode().name()); + mqttHeader.setSessionPresent(variableHeader.isSessionPresent()); + + requestCommand = RemotingCommand + .createRequestCommand(1000, mqttHeader); + CodecHelper.makeCustomHeaderToNet(requestCommand); + + requestCommand.setBody(payload); + return requestCommand; + } + + @Override + public MqttMessage encode(RemotingCommand remotingCommand) { + return null; + } +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubackEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubackEncodeDecode.java new file mode 100644 index 00000000..a27fa4c5 --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubackEncodeDecode.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.transport.mqtt.dispatcher; + +import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttSubAckMessage; +import io.netty.handler.codec.mqtt.MqttSubAckPayload; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; + +public class MqttSubackEncodeDecode implements Message2MessageEncodeDecode { + + @Override + public RemotingCommand decode(MqttMessage mqttMessage) { + return null; + } + + @Override + public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException { + MqttHeader mqttHeader = (MqttHeader) remotingCommand + .decodeCommandCustomHeader(MqttHeader.class); + + return new MqttSubAckMessage( + new MqttFixedHeader(MqttMessageType.SUBACK, mqttHeader.isDup(), + MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(), + mqttHeader.getRemainingLength()), + MqttMessageIdVariableHeader.from(mqttHeader.getMessageId()),new MqttSubAckPayload()); + } +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubscribeEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubscribeEncodeDecode.java new file mode 100644 index 00000000..645a52f0 --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubscribeEncodeDecode.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.transport.mqtt.dispatcher; + +import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; +import io.netty.handler.codec.mqtt.MqttSubscribeMessage; +import org.apache.rocketmq.remoting.netty.CodecHelper; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; +import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttSubscribePayload; + +public class MqttSubscribeEncodeDecode implements Message2MessageEncodeDecode { + + @Override + public RemotingCommand decode(MqttMessage mqttMessage) { + RocketMQMqttSubscribePayload payload = RocketMQMqttSubscribePayload + .fromMqttSubscribePayload(((MqttSubscribeMessage) mqttMessage).payload()); + + RemotingCommand requestCommand = null; + MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader(); + MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) mqttMessage + .variableHeader(); + + MqttHeader mqttHeader = new MqttHeader(); + mqttHeader.setMessageType(mqttFixedHeader.messageType().value()); + mqttHeader.setDup(mqttFixedHeader.isDup()); + mqttHeader.setQosLevel(mqttFixedHeader.qosLevel().value()); + mqttHeader.setRetain(mqttFixedHeader.isRetain()); + mqttHeader.setRemainingLength(mqttFixedHeader.remainingLength()); + + mqttHeader.setMessageId(variableHeader.messageId()); + + requestCommand = RemotingCommand + .createRequestCommand(1000, mqttHeader); + CodecHelper.makeCustomHeaderToNet(requestCommand); + + requestCommand.setBody(payload.encode()); + return requestCommand; + } + + @Override + public MqttMessage encode(RemotingCommand remotingCommand) { + return null; + } +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubackEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubackEncodeDecode.java new file mode 100644 index 00000000..863777a9 --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubackEncodeDecode.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.transport.mqtt.dispatcher; + +import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttUnsubAckMessage; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; + +public class MqttUnSubackEncodeDecode implements Message2MessageEncodeDecode { + + @Override + public RemotingCommand decode(MqttMessage mqttMessage) { + return null; + } + + @Override + public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException { + MqttHeader mqttHeader = (MqttHeader) remotingCommand + .decodeCommandCustomHeader(MqttHeader.class); + + return new MqttUnsubAckMessage( + new MqttFixedHeader(MqttMessageType.UNSUBACK, mqttHeader.isDup(), + MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(), + mqttHeader.getRemainingLength()), + MqttMessageIdVariableHeader.from(mqttHeader.getMessageId())); + } +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubscribeEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubscribeEncodeDecode.java new file mode 100644 index 00000000..37b6fe12 --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubscribeEncodeDecode.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.transport.mqtt.dispatcher; + +import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; +import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage; +import org.apache.rocketmq.remoting.netty.CodecHelper; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; +import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttUnSubscribePayload; + +public class MqttUnSubscribeEncodeDecode implements Message2MessageEncodeDecode { + + @Override + public RemotingCommand decode(MqttMessage mqttMessage) { + RocketMQMqttUnSubscribePayload payload = RocketMQMqttUnSubscribePayload + .fromMqttUnSubscribePayload(((MqttUnsubscribeMessage) mqttMessage).payload()); + + RemotingCommand requestCommand = null; + MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader(); + MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) mqttMessage + .variableHeader(); + + MqttHeader mqttHeader = new MqttHeader(); + mqttHeader.setMessageType(mqttFixedHeader.messageType().value()); + mqttHeader.setDup(mqttFixedHeader.isDup()); + mqttHeader.setQosLevel(mqttFixedHeader.qosLevel().value()); + mqttHeader.setRetain(mqttFixedHeader.isRetain()); + mqttHeader.setRemainingLength(mqttFixedHeader.remainingLength()); + + mqttHeader.setMessageId(variableHeader.messageId()); + + requestCommand = RemotingCommand + .createRequestCommand(1000, mqttHeader); + CodecHelper.makeCustomHeaderToNet(requestCommand); + + requestCommand.setBody(payload.encode()); + return requestCommand; + } + + @Override + public MqttMessage encode(RemotingCommand remotingCommand) { + return null; + } +} diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java index e3ed0e77..7f53849d 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.snode; -import io.netty.handler.codec.mqtt.MqttMessageType; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; @@ -62,33 +61,26 @@ import org.apache.rocketmq.snode.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.snode.processor.HeartbeatProcessor; import org.apache.rocketmq.snode.processor.PullMessageProcessor; import org.apache.rocketmq.snode.processor.SendMessageProcessor; -import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler; -import org.apache.rocketmq.snode.processor.mqtthandler.MqttDisconnectMessageHandler; -import org.apache.rocketmq.snode.processor.mqtthandler.MqttPingreqMessageHandler; -import org.apache.rocketmq.snode.processor.mqtthandler.MqttPubackMessageHandler; -import org.apache.rocketmq.snode.processor.mqtthandler.MqttPubcompMessageHandler; -import org.apache.rocketmq.snode.processor.mqtthandler.MqttPublishMessageHandler; -import org.apache.rocketmq.snode.processor.mqtthandler.MqttPubrecMessageHandler; -import org.apache.rocketmq.snode.processor.mqtthandler.MqttPubrelMessageHandler; -import org.apache.rocketmq.snode.processor.mqtthandler.MqttSubscribeMessageHandler; -import org.apache.rocketmq.snode.processor.mqtthandler.MqttUnsubscribeMessagHandler; import org.apache.rocketmq.snode.service.ClientService; import org.apache.rocketmq.snode.service.EnodeService; import org.apache.rocketmq.snode.service.MetricsService; import org.apache.rocketmq.snode.service.NnodeService; import org.apache.rocketmq.snode.service.PushService; import org.apache.rocketmq.snode.service.ScheduledService; +import org.apache.rocketmq.snode.service.WillMessageService; import org.apache.rocketmq.snode.service.impl.ClientServiceImpl; import org.apache.rocketmq.snode.service.impl.EnodeServiceImpl; import org.apache.rocketmq.snode.service.impl.MetricsServiceImpl; import org.apache.rocketmq.snode.service.impl.NnodeServiceImpl; import org.apache.rocketmq.snode.service.impl.PushServiceImpl; import org.apache.rocketmq.snode.service.impl.ScheduledServiceImpl; +import org.apache.rocketmq.snode.service.impl.WillMessageServiceImpl; +import org.apache.rocketmq.snode.session.SessionManagerImpl; public class SnodeController { private static final InternalLogger log = InternalLoggerFactory - .getLogger(LoggerName.SNODE_LOGGER_NAME); + .getLogger(LoggerName.SNODE_LOGGER_NAME); private final SnodeConfig snodeConfig; private final ServerConfig nettyServerConfig; @@ -108,6 +100,7 @@ public class SnodeController { private ClientManager producerManager; private ClientManager consumerManager; private ClientManager iotClientManager; + private SessionManagerImpl sessionManager; private SubscriptionManager subscriptionManager; private ClientHousekeepingService clientHousekeepingService; private SubscriptionGroupManager subscriptionGroupManager; @@ -124,14 +117,15 @@ public class SnodeController { private ClientService clientService; private SlowConsumerService slowConsumerService; private MetricsService metricsService; + private WillMessageService willMessageService; private final ScheduledExecutorService scheduledExecutorService = Executors - .newSingleThreadScheduledExecutor(new ThreadFactoryImpl( - "SnodeControllerScheduledThread")); + .newSingleThreadScheduledExecutor(new ThreadFactoryImpl( + "SnodeControllerScheduledThread")); public SnodeController(ServerConfig nettyServerConfig, - ClientConfig nettyClientConfig, - SnodeConfig snodeConfig) { + ClientConfig nettyClientConfig, + SnodeConfig snodeConfig) { this.nettyClientConfig = nettyClientConfig; this.nettyServerConfig = nettyServerConfig; this.snodeConfig = snodeConfig; @@ -139,37 +133,37 @@ public class SnodeController { this.nnodeService = new NnodeServiceImpl(this); this.scheduledService = new ScheduledServiceImpl(this); this.remotingClient = RemotingClientFactory.getInstance().createRemotingClient() - .init(this.getNettyClientConfig(), null); + .init(this.getNettyClientConfig(), null); this.mqttRemotingClient = RemotingClientFactory.getInstance() - .createRemotingClient(RemotingUtil.MQTT_PROTOCOL) - .init(this.getNettyClientConfig(), null); + .createRemotingClient(RemotingUtil.MQTT_PROTOCOL) + .init(this.getNettyClientConfig(), null); this.sendMessageExecutor = ThreadUtils.newThreadPoolExecutor( - snodeConfig.getSnodeSendMessageMinPoolSize(), - snodeConfig.getSnodeSendMessageMaxPoolSize(), - 3000, - TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()), - "SnodeSendMessageThread", - false); + snodeConfig.getSnodeSendMessageMinPoolSize(), + snodeConfig.getSnodeSendMessageMaxPoolSize(), + 3000, + TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()), + "SnodeSendMessageThread", + false); this.pullMessageExecutor = ThreadUtils.newThreadPoolExecutor( - snodeConfig.getSnodeSendMessageMinPoolSize(), - snodeConfig.getSnodeSendMessageMaxPoolSize(), - 3000, - TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()), - "SnodePullMessageThread", - false); + snodeConfig.getSnodeSendMessageMinPoolSize(), + snodeConfig.getSnodeSendMessageMaxPoolSize(), + 3000, + TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()), + "SnodePullMessageThread", + false); this.heartbeatExecutor = ThreadUtils.newThreadPoolExecutor( - snodeConfig.getSnodeHeartBeatCorePoolSize(), - snodeConfig.getSnodeHeartBeatMaxPoolSize(), - 1000 * 60, - TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(snodeConfig.getSnodeHeartBeatThreadPoolQueueCapacity()), - "SnodeHeartbeatThread", - true); + snodeConfig.getSnodeHeartBeatCorePoolSize(), + snodeConfig.getSnodeHeartBeatMaxPoolSize(), + 1000 * 60, + TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(snodeConfig.getSnodeHeartBeatThreadPoolQueueCapacity()), + "SnodeHeartbeatThread", + true); // this.consumerManagerExecutor = ThreadUtils.newThreadPoolExecutor( // snodeConfig.getSnodeSendMessageMinPoolSize(), @@ -181,27 +175,27 @@ public class SnodeController { // false); this.consumerManageExecutor = ThreadUtils.newThreadPoolExecutor( - snodeConfig.getSnodeSendMessageMinPoolSize(), - snodeConfig.getSnodeSendMessageMaxPoolSize(), - 3000, - TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()), - "ConsumerManagerThread", - false); + snodeConfig.getSnodeSendMessageMinPoolSize(), + snodeConfig.getSnodeSendMessageMaxPoolSize(), + 3000, + TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()), + "ConsumerManagerThread", + false); this.handleMqttMessageExecutor = ThreadUtils.newThreadPoolExecutor( - snodeConfig.getSnodeHandleMqttMessageMinPoolSize(), - snodeConfig.getSnodeHandleMqttMessageMaxPoolSize(), - 3000, - TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(snodeConfig.getSnodeHandleMqttThreadPoolQueueCapacity()), - "SnodeHandleMqttMessageThread", - false); + snodeConfig.getSnodeHandleMqttMessageMinPoolSize(), + snodeConfig.getSnodeHandleMqttMessageMaxPoolSize(), + 3000, + TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(snodeConfig.getSnodeHandleMqttThreadPoolQueueCapacity()), + "SnodeHandleMqttMessageThread", + false); if (this.snodeConfig.getNamesrvAddr() != null) { this.nnodeService.updateNnodeAddressList(this.snodeConfig.getNamesrvAddr()); log.info("Set user specified name server address: {}", - this.snodeConfig.getNamesrvAddr()); + this.snodeConfig.getNamesrvAddr()); } this.subscriptionGroupManager = new SubscriptionGroupManager(this); @@ -210,17 +204,19 @@ public class SnodeController { this.sendMessageProcessor = new SendMessageProcessor(this); this.heartbeatProcessor = new HeartbeatProcessor(this); this.pullMessageProcessor = new PullMessageProcessor(this); - this.defaultMqttMessageProcessor = new DefaultMqttMessageProcessor(); + this.defaultMqttMessageProcessor = new DefaultMqttMessageProcessor(this); this.pushService = new PushServiceImpl(this); this.clientService = new ClientServiceImpl(this); this.subscriptionManager = new SubscriptionManagerImpl(); this.producerManager = new ProducerManagerImpl(); this.consumerManager = new ConsumerManagerImpl(this); this.iotClientManager = new IOTClientManagerImpl(this); + this.sessionManager = new SessionManagerImpl(this); this.clientHousekeepingService = new ClientHousekeepingService(this.producerManager, - this.consumerManager, this.iotClientManager); + this.consumerManager, this.iotClientManager); this.slowConsumerService = new SlowConsumerServiceImpl(this); this.metricsService = new MetricsServiceImpl(); + this.willMessageService = new WillMessageServiceImpl(this); } public SnodeConfig getSnodeConfig() { @@ -229,7 +225,7 @@ public class SnodeController { private void initRemotingServerInterceptorGroup() { List remotingServerInterceptors = InterceptorFactory.getInstance() - .loadInterceptors(this.snodeConfig.getRemotingServerInterceptorPath()); + .loadInterceptors(this.snodeConfig.getRemotingServerInterceptorPath()); if (remotingServerInterceptors != null && remotingServerInterceptors.size() > 0) { if (this.remotingServerInterceptorGroup == null) { this.remotingServerInterceptorGroup = new InterceptorGroup(); @@ -237,17 +233,17 @@ public class SnodeController { for (Interceptor interceptor : remotingServerInterceptors) { this.remotingServerInterceptorGroup.registerInterceptor(interceptor); log.warn("Remoting server interceptor: {} registered!", - interceptor.interceptorName()); + interceptor.interceptorName()); } } } public boolean initialize() { this.snodeServer = RemotingServerFactory.getInstance().createRemotingServer() - .init(this.nettyServerConfig, this.clientHousekeepingService); + .init(this.nettyServerConfig, this.clientHousekeepingService); this.mqttRemotingServer = RemotingServerFactory.getInstance().createRemotingServer( - RemotingUtil.MQTT_PROTOCOL) - .init(this.nettyServerConfig, this.clientHousekeepingService); + RemotingUtil.MQTT_PROTOCOL) + .init(this.nettyServerConfig, this.clientHousekeepingService); this.registerProcessor(); initSnodeInterceptorGroup(); initRemotingServerInterceptorGroup(); @@ -265,7 +261,7 @@ public class SnodeController { } List accessValidators = ServiceProvider - .loadServiceList(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class); + .loadServiceList(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class); if (accessValidators == null || accessValidators.isEmpty()) { log.info("The snode dose not load the AccessValidator"); return; @@ -285,7 +281,7 @@ public class SnodeController { //Do not catch the exception RemotingCommand request = requestContext.getRequest(); String remoteAddr = RemotingUtil.socketAddress2IpString( - requestContext.getRemotingChannel().remoteAddress()); + requestContext.getRemotingChannel().remoteAddress()); validator.validate(validator.parse(request, remoteAddr)); } @@ -303,17 +299,17 @@ public class SnodeController { private void initSnodeInterceptorGroup() { List consumeMessageInterceptors = InterceptorFactory.getInstance() - .loadInterceptors(this.snodeConfig.getConsumeMessageInterceptorPath()); + .loadInterceptors(this.snodeConfig.getConsumeMessageInterceptorPath()); if (consumeMessageInterceptors != null && consumeMessageInterceptors.size() > 0) { this.consumeMessageInterceptorGroup = new InterceptorGroup(); for (Interceptor interceptor : consumeMessageInterceptors) { this.consumeMessageInterceptorGroup.registerInterceptor(interceptor); log.warn("Consume message interceptor: {} registered!", - interceptor.interceptorName()); + interceptor.interceptorName()); } } List sendMessageInterceptors = InterceptorFactory.getInstance() - .loadInterceptors(this.snodeConfig.getSendMessageInterceptorPath()); + .loadInterceptors(this.snodeConfig.getSendMessageInterceptorPath()); if (sendMessageInterceptors != null && sendMessageInterceptors.size() > 0) { this.sendMessageInterceptorGroup = new InterceptorGroup(); for (Interceptor interceptor : sendMessageInterceptors) { @@ -326,57 +322,37 @@ public class SnodeController { public void registerProcessor() { this.snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, - this.sendMessageExecutor); + this.sendMessageExecutor); this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, - this.sendMessageExecutor); + this.sendMessageExecutor); this.snodeServer.registerProcessor(RequestCode.HEART_BEAT, heartbeatProcessor, - this.heartbeatExecutor); + this.heartbeatExecutor); this.snodeServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, heartbeatProcessor, - this.heartbeatExecutor); + this.heartbeatExecutor); this.snodeServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, heartbeatProcessor, - this.heartbeatExecutor); + this.heartbeatExecutor); this.snodeServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, pullMessageProcessor, - this.pullMessageExecutor); + this.pullMessageExecutor); this.snodeServer - .registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, - this.consumerManageExecutor); + .registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, + this.consumerManageExecutor); this.snodeServer - .registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, - this.consumerManageExecutor); + .registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, + this.consumerManageExecutor); this.snodeServer - .registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, - this.consumerManageExecutor); + .registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, + this.consumerManageExecutor); this.snodeServer.registerProcessor(RequestCode.GET_MIN_OFFSET, consumerManageProcessor, - this.consumerManageExecutor); + this.consumerManageExecutor); this.snodeServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManageProcessor, - this.consumerManageExecutor); + this.consumerManageExecutor); this.snodeServer - .registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor, + .registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor, + this.consumerManageExecutor); + this.snodeServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, consumerManageProcessor, this.consumerManageExecutor); - this.snodeServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, consumerManageProcessor, this.consumerManageExecutor); this.mqttRemotingServer.registerProcessor(RequestCode.MQTT_MESSAGE, - defaultMqttMessageProcessor, handleMqttMessageExecutor); - - defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.CONNECT, - new MqttConnectMessageHandler(this)); - defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.DISCONNECT, - new MqttDisconnectMessageHandler(this)); - defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.PINGREQ, - new MqttPingreqMessageHandler(this)); - defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.PUBLISH, - new MqttPublishMessageHandler(this)); - defaultMqttMessageProcessor - .registerMessageHanlder(MqttMessageType.PUBACK, new MqttPubackMessageHandler(this)); - defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.PUBCOMP, - new MqttPubcompMessageHandler(this)); - defaultMqttMessageProcessor - .registerMessageHanlder(MqttMessageType.PUBREC, new MqttPubrecMessageHandler(this)); - defaultMqttMessageProcessor - .registerMessageHanlder(MqttMessageType.PUBREL, new MqttPubrelMessageHandler(this)); - defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.SUBSCRIBE, - new MqttSubscribeMessageHandler(this)); - defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.UNSUBSCRIBE, - new MqttUnsubscribeMessagHandler(this)); + defaultMqttMessageProcessor, handleMqttMessageExecutor); } @@ -503,7 +479,7 @@ public class SnodeController { } public void setRemotingServerInterceptorGroup( - InterceptorGroup remotingServerInterceptorGroup) { + InterceptorGroup remotingServerInterceptorGroup) { this.remotingServerInterceptorGroup = remotingServerInterceptorGroup; } @@ -531,6 +507,14 @@ public class SnodeController { this.iotClientManager = iotClientManager; } + public SessionManagerImpl getSessionManager() { + return sessionManager; + } + + public void setSessionManager(SessionManagerImpl sessionManager) { + this.sessionManager = sessionManager; + } + public SubscriptionManager getSubscriptionManager() { return subscriptionManager; } @@ -566,4 +550,13 @@ public class SnodeController { public void setMetricsService(MetricsService metricsService) { this.metricsService = metricsService; } + + public WillMessageService getWillMessageService() { + return willMessageService; + } + + public void setWillMessageService( + WillMessageService willMessageService) { + this.willMessageService = willMessageService; + } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/Client.java b/snode/src/main/java/org/apache/rocketmq/snode/client/Client.java index e2bc0da2..e23844e2 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/Client.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/Client.java @@ -21,8 +21,10 @@ import java.util.Set; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.serialize.LanguageCode; import org.apache.rocketmq.snode.client.impl.ClientRole; +import org.apache.rocketmq.snode.session.Session; public class Client { + private ClientRole clientRole; private String clientId; @@ -41,6 +43,8 @@ public class Client { private boolean isConnected; + private Session session; + public ClientRole getClientRole() { return clientRole; } @@ -49,14 +53,16 @@ public class Client { this.clientRole = clientRole; } - @Override public boolean equals(Object o) { - if (this == o) + @Override + public boolean equals(Object o) { + if (this == o) { return true; - if (!(o instanceof Client)) + } + if (!(o instanceof Client)) { return false; + } Client client = (Client) o; - return - version == client.version && + return version == client.version && clientRole == client.clientRole && Objects.equals(clientId, client.clientId) && Objects.equals(groups, client.groups) && @@ -65,8 +71,10 @@ public class Client { isConnected == client.isConnected(); } - @Override public int hashCode() { - return Objects.hash(clientRole, clientId, groups, remotingChannel, heartbeatInterval, lastUpdateTimestamp, version, language, isConnected); + @Override + public int hashCode() { + return Objects.hash(clientRole, clientId, groups, remotingChannel, heartbeatInterval, + lastUpdateTimestamp, version, language, isConnected); } public RemotingChannel getRemotingChannel() { @@ -125,6 +133,14 @@ public class Client { isConnected = connected; } + public Session getSession() { + return session; + } + + public void setSession(Session session) { + session = session; + } + public Set getGroups() { return groups; } @@ -133,18 +149,20 @@ public class Client { this.groups = groups; } - @Override public String toString() { + @Override + public String toString() { return "Client{" + - "clientRole=" + clientRole + - ", clientId='" + clientId + '\'' + - ", groups=" + groups + - ", remotingChannel=" + remotingChannel + - ", heartbeatInterval=" + heartbeatInterval + - ", lastUpdateTimestamp=" + lastUpdateTimestamp + - ", version=" + version + - ", language=" + language + - ", isConnected=" + isConnected + - '}'; + "clientRole=" + clientRole + + ", clientId='" + clientId + '\'' + + ", groups=" + groups + + ", remotingChannel=" + remotingChannel + + ", heartbeatInterval=" + heartbeatInterval + + ", lastUpdateTimestamp=" + lastUpdateTimestamp + + ", version=" + version + + ", language=" + language + + ", isConnected=" + isConnected + + ", session=" + session + + '}'; } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/exception/MqttConnectException.java b/snode/src/main/java/org/apache/rocketmq/snode/exception/MqttConnectException.java new file mode 100644 index 00000000..e1e3480e --- /dev/null +++ b/snode/src/main/java/org/apache/rocketmq/snode/exception/MqttConnectException.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.snode.exception; + +public class MqttConnectException extends RuntimeException{ + + public MqttConnectException(String message) { + super(message); + } +} diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java index 1ca672ff..f48465c8 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java @@ -38,15 +38,47 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload; +import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.processor.mqtthandler.MessageHandler; +import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler; +import org.apache.rocketmq.snode.processor.mqtthandler.MqttDisconnectMessageHandler; +import org.apache.rocketmq.snode.processor.mqtthandler.MqttPingreqMessageHandler; +import org.apache.rocketmq.snode.processor.mqtthandler.MqttPubackMessageHandler; +import org.apache.rocketmq.snode.processor.mqtthandler.MqttPubcompMessageHandler; +import org.apache.rocketmq.snode.processor.mqtthandler.MqttPublishMessageHandler; +import org.apache.rocketmq.snode.processor.mqtthandler.MqttPubrecMessageHandler; +import org.apache.rocketmq.snode.processor.mqtthandler.MqttPubrelMessageHandler; +import org.apache.rocketmq.snode.processor.mqtthandler.MqttSubscribeMessageHandler; +import org.apache.rocketmq.snode.processor.mqtthandler.MqttUnsubscribeMessagHandler; public class DefaultMqttMessageProcessor implements RequestProcessor { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); private Map type2handler = new HashMap<>(); + private final SnodeController snodeController; private static final int MIN_AVAILABLE_VERSION = 3; private static final int MAX_AVAILABLE_VERSION = 4; + public DefaultMqttMessageProcessor(SnodeController snodeController) { + this.snodeController = snodeController; + registerMessageHandler(MqttMessageType.CONNECT, + new MqttConnectMessageHandler(this.snodeController)); + registerMessageHandler(MqttMessageType.DISCONNECT, + new MqttDisconnectMessageHandler(this.snodeController)); + registerMessageHandler(MqttMessageType.PINGREQ, + new MqttPingreqMessageHandler(this.snodeController)); + registerMessageHandler(MqttMessageType.PUBLISH, + new MqttPublishMessageHandler(this.snodeController)); + registerMessageHandler(MqttMessageType.PUBACK, new MqttPubackMessageHandler(this.snodeController)); + registerMessageHandler(MqttMessageType.PUBCOMP, + new MqttPubcompMessageHandler(this.snodeController)); + registerMessageHandler(MqttMessageType.PUBREC, new MqttPubrecMessageHandler(this.snodeController)); + registerMessageHandler(MqttMessageType.PUBREL, new MqttPubrelMessageHandler(this.snodeController)); + registerMessageHandler(MqttMessageType.SUBSCRIBE, + new MqttSubscribeMessageHandler(this.snodeController)); + registerMessageHandler(MqttMessageType.UNSUBSCRIBE, + new MqttUnsubscribeMessagHandler(this.snodeController)); + } @Override public RemotingCommand processRequest(RemotingChannel remotingChannel, RemotingCommand message) @@ -80,7 +112,7 @@ public class DefaultMqttMessageProcessor implements RequestProcessor { return JSON.parseObject(json, classOfT); } - public void registerMessageHanlder(MqttMessageType type, MessageHandler handler) { + private void registerMessageHandler(MqttMessageType type, MessageHandler handler) { type2handler.put(type, handler); } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttConnectMessageHandler.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttConnectMessageHandler.java index fb8691e7..82c52165 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttConnectMessageHandler.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttConnectMessageHandler.java @@ -17,20 +17,33 @@ package org.apache.rocketmq.snode.processor.mqtthandler; -import io.netty.handler.codec.mqtt.MqttConnAckMessage; import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttConnectPayload; import io.netty.handler.codec.mqtt.MqttConnectReturnCode; import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttQoS; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.message.mqtt.WillMessage; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.client.Client; import org.apache.rocketmq.snode.client.ClientManager; +import org.apache.rocketmq.snode.client.impl.ClientRole; import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl; +import org.apache.rocketmq.snode.exception.MqttConnectException; +import org.apache.rocketmq.snode.session.Session; +import org.apache.rocketmq.snode.session.SessionManagerImpl; public class MqttConnectMessageHandler implements MessageHandler { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); private final SnodeController snodeController; private static final int MIN_AVAILABLE_VERSION = 3; private static final int MAX_AVAILABLE_VERSION = 4; @@ -47,14 +60,99 @@ public class MqttConnectMessageHandler implements MessageHandler { MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) message; MqttConnectPayload payload = mqttConnectMessage.payload(); - MqttConnectReturnCode returnCode; - MqttConnAckMessage ackMessage; + RemotingCommand command = RemotingCommand.createResponseCommand(MqttHeader.class); + MqttHeader mqttHeader = (MqttHeader) command.readCustomHeader(); + mqttHeader.setMessageType(MqttMessageType.CONNACK.value()); + mqttHeader.setDup(false); + mqttHeader.setQosLevel(MqttQoS.AT_MOST_ONCE.value()); + mqttHeader.setRetain(false); + mqttHeader.setRemainingLength(0x02); + /* TODO when clientId.length=0 and cleanSession=0, the server should assign a unique clientId to the client.*/ + //validate clientId + if (StringUtils.isBlank(payload.clientIdentifier()) && !mqttConnectMessage.variableHeader() + .isCleanSession()) { + mqttHeader.setConnectReturnCode( + MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED.name()); + mqttHeader.setSessionPresent(false); + command.setCode(ResponseCode.SYSTEM_ERROR); + command.setRemark("CONNECTION_REFUSED_IDENTIFIER_REJECTED"); + return command; + } + //authentication + if (mqttConnectMessage.variableHeader().hasPassword() && mqttConnectMessage.variableHeader() + .hasUserName() + && !authorized(payload.userName(), payload.password())) { + mqttHeader.setConnectReturnCode( + MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD.name()); + mqttHeader.setSessionPresent(false); + command.setCode(ResponseCode.SYSTEM_ERROR); + command.setRemark("CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD"); + return command; + } + //process a second CONNECT packet as a protocol violation and disconnect + if (isConnected(remotingChannel, payload.clientIdentifier())) { + remotingChannel.close(); + return null; + } + //set Session Present according to whether the server has already stored Session State for the clientId + if (mqttConnectMessage.variableHeader().isCleanSession()) { + mqttHeader.setSessionPresent(false); + } else { + + if (alreadyStoredSession(payload.clientIdentifier())) { + mqttHeader.setSessionPresent(true); + } else { + mqttHeader.setSessionPresent(false); + } + } + ClientManager iotClientManager = snodeController.getIotClientManager(); + SessionManagerImpl sessionManager = snodeController.getSessionManager(); + Client client = new Client(); + client.setClientId(payload.clientIdentifier()); + client.setClientRole(ClientRole.IOTCLIENT); + client.setConnected(true); + client.setRemotingChannel(remotingChannel); + client.setLastUpdateTimestamp(System.currentTimeMillis()); + //register remotingChannel<--->client + iotClientManager.register(IOTClientManagerImpl.IOTGROUP, client); + + Session session = new Session(); + session.setClientId(client.getClientId()); + //register client<--->session + sessionManager.register(client.getClientId(), session); - if (isConnected(remotingChannel, mqttConnectMessage.payload().clientIdentifier())) { + //save will message if have + if (mqttConnectMessage.variableHeader().isWillFlag()) { + if (payload.willTopic() == null || payload.willMessageInBytes() == null) { + log.error("Will message and will topic can not be null."); + throw new MqttConnectException("Will message and will topic can not be null."); + } + WillMessage willMessage = new WillMessage(); + willMessage.setQos(mqttConnectMessage.variableHeader().willQos()); + willMessage.setWillTopic(payload.willTopic()); + willMessage.setRetain(mqttConnectMessage.variableHeader().isWillRetain()); + willMessage.setBody(payload.willMessageInBytes()); + snodeController.getWillMessageService().saveWillMessage(client.getClientId(), willMessage); + } + + mqttHeader.setConnectReturnCode(MqttConnectReturnCode.CONNECTION_ACCEPTED.name()); + command.setCode(ResponseCode.SUCCESS); + command.setRemark(null); + return command; + } + private boolean alreadyStoredSession(String clientId) { + SessionManagerImpl sessionManager = snodeController.getSessionManager(); + Session session = sessionManager.getSession(clientId); + if (session != null && session.getClientId().equals(clientId)) { + return true; } -// ChannelHandlerContext ctx = client.getCtx(); - return null; + return false; + } + + private boolean authorized(String username, String password) { + //TODO + return true; } private boolean isConnected(RemotingChannel remotingChannel, String clientId) { @@ -66,7 +164,7 @@ public class MqttConnectMessageHandler implements MessageHandler { return false; } - private boolean isServiceAviable(MqttConnectMessage connectMessage) { + private boolean isServiceAvailable(MqttConnectMessage connectMessage) { int version = connectMessage.variableHeader().version(); return version >= MIN_AVAILABLE_VERSION && version <= MAX_AVAILABLE_VERSION; } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttDisconnectMessageHandler.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttDisconnectMessageHandler.java index d07ec9de..1b6deeeb 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttDisconnectMessageHandler.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttDisconnectMessageHandler.java @@ -17,13 +17,22 @@ package org.apache.rocketmq.snode.processor.mqtthandler; +import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttQoS; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.snode.SnodeController; +import org.apache.rocketmq.snode.client.Client; +import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl; public class MqttDisconnectMessageHandler implements MessageHandler { + private static final InternalLogger log = InternalLoggerFactory + .getLogger(LoggerName.SNODE_LOGGER_NAME); private final SnodeController snodeController; @@ -32,13 +41,34 @@ public class MqttDisconnectMessageHandler implements MessageHandler { } /** - * handle the DISCONNECT message from the client
  1. discard the Will Message and Will - * Topic
  2. remove the client from the ClientManager
  3. disconnect the - * connection
+ * handle the DISCONNECT message from the client + *
    + *
  1. discard the Will Message and Will Topic
  2. + *
  3. remove the client from the IOTClientManager
  4. + *
  5. disconnect the connection
  6. + *
*/ @Override public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) { // TODO discard the Will Message and Will Topic + MqttFixedHeader fixedHeader = message.fixedHeader(); + if (fixedHeader.qosLevel() != MqttQoS.AT_MOST_ONCE || !fixedHeader.isDup() || !fixedHeader + .isRetain()) { + log.error( + "The reserved bits(qos/isDup/isRetain) are not zero. Qos={}, isDup={}, isRetain={}", + fixedHeader.qosLevel(), fixedHeader.isDup(), fixedHeader.isRetain()); + remotingChannel.close(); + return null; + } + + //discard will message associated with the current connection(client) + Client client = snodeController.getIotClientManager() + .getClient(IOTClientManagerImpl.IOTGROUP, remotingChannel); + if (client != null) { + snodeController.getWillMessageService().deleteWillMessage(client.getClientId()); + } + client.setConnected(false); + remotingChannel.close(); return null; } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/WillMessageService.java b/snode/src/main/java/org/apache/rocketmq/snode/service/WillMessageService.java new file mode 100644 index 00000000..516bde5f --- /dev/null +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/WillMessageService.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.snode.service; + +import org.apache.rocketmq.common.message.mqtt.WillMessage; + +public interface WillMessageService { + + void saveWillMessage(String clientId, WillMessage willMessage); + + void sendWillMessage(String clientId); + + void deleteWillMessage(String clientId); +} diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ClientServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ClientServiceImpl.java index 6e49c6a5..f170ae11 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ClientServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ClientServiceImpl.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.rocketmq.snode.service.impl; import java.util.List; import org.apache.rocketmq.common.constant.LoggerName; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/WillMessageServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/WillMessageServiceImpl.java new file mode 100644 index 00000000..78b4c1e3 --- /dev/null +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/WillMessageServiceImpl.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.snode.service.impl; + +import java.util.concurrent.ConcurrentHashMap; +import org.apache.rocketmq.common.message.mqtt.WillMessage; +import org.apache.rocketmq.snode.SnodeController; +import org.apache.rocketmq.snode.service.WillMessageService; + +public class WillMessageServiceImpl implements WillMessageService { + + private static ConcurrentHashMap willMessageTable = new ConcurrentHashMap<>(); + private final SnodeController snodeController; + + public WillMessageServiceImpl(SnodeController snodeController) { + this.snodeController = snodeController; + } + + @Override + public void saveWillMessage(String clientId, WillMessage willMessage) { + willMessageTable.put(clientId, willMessage); + } + + @Override + public void sendWillMessage(String clientId) { + + } + + @Override + public void deleteWillMessage(String clientId) { + willMessageTable.remove(clientId); + } +} diff --git a/snode/src/main/java/org/apache/rocketmq/snode/session/Session.java b/snode/src/main/java/org/apache/rocketmq/snode/session/Session.java new file mode 100644 index 00000000..9c1b7c98 --- /dev/null +++ b/snode/src/main/java/org/apache/rocketmq/snode/session/Session.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.snode.session; + +import java.util.Objects; + +public class Session { + + private String clientId; + private volatile long lastUpdateTimestamp = System.currentTimeMillis(); + + public String getClientId() { + return clientId; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + public long getLastUpdateTimestamp() { + return lastUpdateTimestamp; + } + + public void setLastUpdateTimestamp(long lastUpdateTimestamp) { + this.lastUpdateTimestamp = lastUpdateTimestamp; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof Session)) { + return false; + } + Session session = (Session) o; + return Objects.equals(clientId, session.clientId); + } + + @Override + public int hashCode() { + return Objects.hash(clientId, lastUpdateTimestamp); + } + + @Override + public String toString() { + return "Session{" + + "clientId='" + clientId + '\'' + + ", lastUpdateTimestamp=" + lastUpdateTimestamp + + '}'; + } +} + + diff --git a/snode/src/main/java/org/apache/rocketmq/snode/session/SessionManagerImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/session/SessionManagerImpl.java new file mode 100644 index 00000000..561e8bfc --- /dev/null +++ b/snode/src/main/java/org/apache/rocketmq/snode/session/SessionManagerImpl.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.snode.session; + +import java.util.concurrent.ConcurrentHashMap; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.snode.SnodeController; + +public class SessionManagerImpl { + + private static final InternalLogger log = InternalLoggerFactory + .getLogger(LoggerName.SNODE_LOGGER_NAME); + + private final ConcurrentHashMap clientSessionTable = new ConcurrentHashMap<>( + 1024); + + private final SnodeController snodeController; + + public SessionManagerImpl(SnodeController snodeController) { + this.snodeController = snodeController; + } + + public boolean register(String clientId, Session session) { + boolean updated = false; + if (clientId != null && session != null) { + Session prev = clientSessionTable.put(clientId, session); + if (prev != null) { + log.info("Session updated, clientId: {} session: {}", clientId, + session); + updated = true; + } else { + log.info("New session registered, clientId: {} session: {}", clientId, + session); + } + session.setLastUpdateTimestamp(System.currentTimeMillis()); + } + return updated; + } + + public void unRegister(String clientId) { + Session prev = clientSessionTable.remove(clientId); + if (prev != null) { + log.info("Unregister session: {} of client, {}", prev, clientId); + } + } + + public Session getSession(String clientId) { + return clientSessionTable.get(clientId); + } + + public SnodeController getSnodeController() { + return snodeController; + } +} diff --git a/snode/src/main/java/org/apache/rocketmq/snode/util/MessageUtil.java b/snode/src/main/java/org/apache/rocketmq/snode/util/MessageUtil.java deleted file mode 100644 index 1eaee029..00000000 --- a/snode/src/main/java/org/apache/rocketmq/snode/util/MessageUtil.java +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.snode.util; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.handler.codec.mqtt.MqttConnAckMessage; -import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader; -import io.netty.handler.codec.mqtt.MqttConnectMessage; -import io.netty.handler.codec.mqtt.MqttConnectReturnCode; -import io.netty.handler.codec.mqtt.MqttFixedHeader; -import io.netty.handler.codec.mqtt.MqttMessage; -import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; -import io.netty.handler.codec.mqtt.MqttMessageType; -import io.netty.handler.codec.mqtt.MqttPubAckMessage; -import io.netty.handler.codec.mqtt.MqttPublishMessage; -import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; -import io.netty.handler.codec.mqtt.MqttSubAckMessage; -import io.netty.handler.codec.mqtt.MqttSubAckPayload; -import io.netty.handler.codec.mqtt.MqttSubscribeMessage; -import io.netty.handler.codec.mqtt.MqttUnsubAckMessage; -import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage; -import org.apache.rocketmq.snode.constant.MqttConstant; - -public class MessageUtil { - - public static final String MQTT_QOS_LEVEL = "MQTT_QOS_LEVEL"; - public static final String MQTT_IS_RETAIN = "MQTT_IS_RETAIN"; - public static final String MQTT_PACKET_ID = "MQTT_PACKET_ID"; - public static final String MQTT_TOPIC_NAME = "MQTT_TOPIC_NAME"; - public static final String MQTT_REMAINING_LENGTH = "MQTT_REMAINING_LENGTH"; - public static final String MQTT_IS_DUP = "MQTT_IS_DUP"; - public static final String MQTT_CLIENT_NAME = "MQTT_CLIENT_NAME"; - public static final String MQTT_IS_CLEAN_SESSION = "MQTT_IS_CLEAN_SESSION"; - public static final String MQTT_KEEP_ALIVE_TIME = "MQTT_KEEP_ALIVE_TIME"; - public static final String MQTT_PROTOCOL_VERSION = "MQTT_PROTOCOL_VERSION"; - - public static MqttSubAckMessage getMqttSubackMessage(MqttSubscribeMessage message, - MqttSubAckPayload payload) { - MqttFixedHeader fixedHeader = new MqttFixedHeader( - MqttMessageType.SUBACK, - false, - message.fixedHeader().qosLevel(), - message.fixedHeader().isRetain(), - 0 - ); - MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader - .from(message.variableHeader().messageId()); - - return new MqttSubAckMessage(fixedHeader, variableHeader, payload); - } - - public static MqttPublishMessage getMqttPublishMessage(MqttMessage message, boolean isDup) { - MqttFixedHeader fixedHeader = new MqttFixedHeader( - MqttMessageType.PUBLISH, - isDup, - message.fixedHeader().qosLevel(), - message.fixedHeader().isRetain(), - message.fixedHeader().remainingLength() - ); - MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader( - ((MqttPublishVariableHeader)message.variableHeader()).topicName(), - ((MqttPublishVariableHeader)message.variableHeader()).packetId() - ); - ByteBuf buf = Unpooled.buffer(); - buf.writeBytes((byte[]) message.payload()); - return new MqttPublishMessage(fixedHeader, variableHeader, buf); - } - - public static MqttConnAckMessage getMqttConnackMessage(MqttConnectMessage message, - MqttConnectReturnCode returnCode) { - assert message.fixedHeader().messageType() == MqttMessageType.CONNECT; - MqttConnAckVariableHeader variableHeader = new MqttConnAckVariableHeader( - returnCode, - message.variableHeader().isCleanSession() - ); - MqttFixedHeader fixedHeader = new MqttFixedHeader( - MqttMessageType.CONNACK, - message.fixedHeader().isDup(), - message.fixedHeader().qosLevel(), - message.fixedHeader().isRetain(), - 0); - return new MqttConnAckMessage(fixedHeader, variableHeader); - } - - public static MqttPubAckMessage getMqttPubackMessage(MqttPublishMessage message) { - MqttFixedHeader fixedHeader = new MqttFixedHeader( - MqttMessageType.PUBACK, - message.fixedHeader().isDup(), - message.fixedHeader().qosLevel(), - message.fixedHeader().isRetain(), - message.fixedHeader().remainingLength() - ); - - return new MqttPubAckMessage(fixedHeader, - MqttMessageIdVariableHeader.from(message.variableHeader().packetId())); - - } - - public static MqttMessage getMqttPubrecMessage(MqttPublishMessage message) { - MqttFixedHeader fixedHeader = new MqttFixedHeader( - MqttMessageType.PUBREC, - message.fixedHeader().isDup(), - message.fixedHeader().qosLevel(), - message.fixedHeader().isRetain(), - message.fixedHeader().remainingLength() - ); - return new MqttMessage(fixedHeader); - } - - public static MqttMessage getMqttPubrelMessage(MqttMessage message) { - assert message.fixedHeader().messageType() == MqttMessageType.PUBREC; - MqttFixedHeader fixedHeader = new MqttFixedHeader( - MqttMessageType.PUBREL, - message.fixedHeader().isDup(), - message.fixedHeader().qosLevel(), - message.fixedHeader().isRetain(), - message.fixedHeader().remainingLength() - ); - return new MqttMessage(fixedHeader); - } - - public static MqttMessage getMqttPubcompMessage(MqttMessage message) { - assert message.fixedHeader().messageType() == MqttMessageType.PUBREL; - MqttFixedHeader fixedHeader = new MqttFixedHeader( - MqttMessageType.PUBCOMP, - message.fixedHeader().isDup(), - message.fixedHeader().qosLevel(), - message.fixedHeader().isRetain(), - message.fixedHeader().remainingLength() - ); - return new MqttMessage(fixedHeader); - } - - public static MqttMessage getMqttPingrespMessage(MqttMessage message) { - assert message.fixedHeader().messageType() == MqttMessageType.PINGREQ; - MqttFixedHeader fixedHeader = new MqttFixedHeader( - MqttMessageType.PINGRESP, - message.fixedHeader().isDup(), - message.fixedHeader().qosLevel(), - message.fixedHeader().isRetain(), - 0 - ); - return new MqttMessage(fixedHeader); - } - - public static MqttUnsubAckMessage getMqttUnsubackMessage(MqttUnsubscribeMessage message) { - MqttFixedHeader fixedHeader = new MqttFixedHeader( - MqttMessageType.UNSUBACK, - message.fixedHeader().isDup(), - message.fixedHeader().qosLevel(), - message.fixedHeader().isRetain(), - 0 - ); - MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader - .from(message.variableHeader().messageId()); - return new MqttUnsubAckMessage(fixedHeader, variableHeader); - } - - public static int actualQos(int qos) { - return Math.min(MqttConstant.MAX_SUPPORTED_QOS, qos); - } -} diff --git a/snode/src/main/java/org/apache/rocketmq/snode/util/MqttUtil.java b/snode/src/main/java/org/apache/rocketmq/snode/util/MqttUtil.java new file mode 100644 index 00000000..fc40b552 --- /dev/null +++ b/snode/src/main/java/org/apache/rocketmq/snode/util/MqttUtil.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.snode.util; + +import java.util.UUID; + +public class MqttUtil { + + public static String generateClientId() { + return UUID.randomUUID().toString(); + } + +} diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java b/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java index 71863068..1c4e8fde 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java @@ -34,7 +34,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload; import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -60,13 +59,12 @@ public class DefaultMqttMessageProcessorTest { @Before public void init() { - defaultMqttMessageProcessor = new DefaultMqttMessageProcessor(); + defaultMqttMessageProcessor = new DefaultMqttMessageProcessor(snodeController); } @Test public void testProcessRequest() throws RemotingCommandException, UnsupportedEncodingException { RemotingCommand request = createMqttConnectMesssageCommand(); - defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.CONNECT, new MqttConnectMessageHandler(snodeController)); defaultMqttMessageProcessor.processRequest(remotingChannel, request); } diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttDisconnectMessageHandlerTest.java b/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttDisconnectMessageHandlerTest.java new file mode 100644 index 00000000..b691342f --- /dev/null +++ b/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttDisconnectMessageHandlerTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.snode.processor; + +import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttQoS; +import org.apache.rocketmq.common.SnodeConfig; +import org.apache.rocketmq.common.message.mqtt.WillMessage; +import org.apache.rocketmq.remoting.ClientConfig; +import org.apache.rocketmq.remoting.RemotingChannel; +import org.apache.rocketmq.remoting.ServerConfig; +import org.apache.rocketmq.snode.SnodeController; +import org.apache.rocketmq.snode.client.Client; +import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl; +import org.apache.rocketmq.snode.processor.mqtthandler.MqttDisconnectMessageHandler; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class MqttDisconnectMessageHandlerTest { + + @Mock + private RemotingChannel remotingChannel; + + @Test + public void testHandlerMessage() throws Exception { + + SnodeController snodeController = new SnodeController(new ServerConfig(), + new ClientConfig(), new SnodeConfig()); + MqttDisconnectMessageHandler mqttDisconnectMessageHandler = new MqttDisconnectMessageHandler( + snodeController); + Client client = new Client(); + client.setRemotingChannel(remotingChannel); + client.setClientId("123456"); + snodeController.getIotClientManager().register(IOTClientManagerImpl.IOTGROUP, client); + snodeController.getWillMessageService().saveWillMessage("123456", new WillMessage()); + MqttMessage mqttDisconnectMessage = new MqttMessage(new MqttFixedHeader( + MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 200)); + + mqttDisconnectMessageHandler.handleMessage(mqttDisconnectMessage, remotingChannel); + } +} diff --git a/snode/src/test/java/org/apache/rocketmq/snode/service/WillMessageServiceImplTest.java b/snode/src/test/java/org/apache/rocketmq/snode/service/WillMessageServiceImplTest.java new file mode 100644 index 00000000..12a7fe75 --- /dev/null +++ b/snode/src/test/java/org/apache/rocketmq/snode/service/WillMessageServiceImplTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.snode.service; + +import org.apache.rocketmq.common.SnodeConfig; +import org.apache.rocketmq.common.message.mqtt.WillMessage; +import org.apache.rocketmq.remoting.ClientConfig; +import org.apache.rocketmq.remoting.ServerConfig; +import org.apache.rocketmq.snode.SnodeController; +import org.apache.rocketmq.snode.SnodeTestBase; +import org.apache.rocketmq.snode.service.impl.WillMessageServiceImpl; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Spy; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class WillMessageServiceImplTest extends SnodeTestBase { + + @Spy + private SnodeController snodeController = new SnodeController(new ServerConfig(), + new ClientConfig(), new SnodeConfig()); + + private WillMessageService willMessageService; + + @Before + public void init() { + willMessageService = new WillMessageServiceImpl(snodeController); + } + + @Test + public void saveWillMessageTest() { + willMessageService.saveWillMessage("testClientId", new WillMessage()); + } + + @Test + public void deleteWillMessageTest() { + willMessageService.saveWillMessage("testClientId", new WillMessage()); + willMessageService.deleteWillMessage("testClientId"); + } + +} -- GitLab