diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java index d287160218c644a57abb96adeae4b2f87bd7a211..d392e79ad7eba9aac9803eab82fa2bf187861105 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java @@ -17,8 +17,6 @@ package org.apache.rocketmq.remoting.protocol; import com.alibaba.fastjson.annotation.JSONField; -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -176,34 +174,6 @@ public class RemotingCommand { return CodecHelper.decodeCommandCustomHeader(this, classHeader); } - public void makeCustomHeaderToNet() { - if (this.customHeader != null) { - Field[] fields = customHeader.getClass().getDeclaredFields(); - if (null == this.extFields) { - this.extFields = new HashMap(); - } - - for (Field field : fields) { - if (!Modifier.isStatic(field.getModifiers())) { - String name = field.getName(); - if (!name.startsWith("this")) { - Object value = null; - try { - field.setAccessible(true); - value = field.get(this.customHeader); - } catch (Exception e) { - log.error("Failed to access field [{}]", name, e); - } - - if (value != null) { - this.extFields.put(name, value.toString()); - } - } - } - } - } - } - public ByteBuffer encodeHeader(final int bodyLength) { return CodecHelper.encodeHeader(bodyLength, this); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/Mqtt2RemotingCommandHandler.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/Mqtt2RemotingCommandHandler.java deleted file mode 100644 index f999752e13bcf140014efdd2cc7124be9e6e5567..0000000000000000000000000000000000000000 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/Mqtt2RemotingCommandHandler.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.remoting.transport.mqtt; - -import com.alibaba.fastjson.JSON; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToMessageDecoder; -import io.netty.handler.codec.mqtt.MqttConnectMessage; -import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; -import io.netty.handler.codec.mqtt.MqttFixedHeader; -import io.netty.handler.codec.mqtt.MqttMessage; -import java.nio.charset.Charset; -import java.util.List; -import org.apache.rocketmq.remoting.common.RemotingUtil; -import org.apache.rocketmq.remoting.protocol.RemotingCommand; - -public class Mqtt2RemotingCommandHandler extends MessageToMessageDecoder { - - /** - * Decode from one message to an other. This method will be called for each written message that - * can be handled by this encoder. - * - * @param ctx the {@link ChannelHandlerContext} which this {@link MessageToMessageDecoder} - * belongs to - * @param msg the message to decode to an other one - * @param out the {@link List} to which decoded messages should be added - * @throws Exception is thrown if an error occurs - */ - @Override - protected void decode(ChannelHandlerContext ctx, MqttMessage msg, List out) - throws Exception { - if (!(msg instanceof MqttMessage)) { - return; - } - RemotingCommand requestCommand = null; - MqttFixedHeader mqttFixedHeader = msg.fixedHeader(); - Object variableHeader = msg.variableHeader(); - - switch (msg.fixedHeader().messageType()) { - case CONNECT: - RocketMQMqttConnectPayload payload = RocketMQMqttConnectPayload - .fromMqttConnectPayload(((MqttConnectMessage) msg).payload()); - MqttHeader mqttHeader = new MqttHeader(); - mqttHeader.setMessageType(mqttFixedHeader.messageType().value()); - mqttHeader.setDup(mqttFixedHeader.isDup()); - mqttHeader.setQosLevel(mqttFixedHeader.qosLevel().value()); - mqttHeader.setRetain(mqttFixedHeader.isRetain()); - mqttHeader.setRemainingLength(mqttFixedHeader.remainingLength()); - - MqttConnectVariableHeader mqttConnectVariableHeader = (MqttConnectVariableHeader) variableHeader; - mqttHeader.setName(mqttConnectVariableHeader.name()); - mqttHeader.setVersion(mqttConnectVariableHeader.version()); - mqttHeader.setHasUserName(mqttConnectVariableHeader.hasUserName()); - mqttHeader.setHasPassword(mqttConnectVariableHeader.hasPassword()); - mqttHeader.setWillRetain(mqttConnectVariableHeader.isWillRetain()); - mqttHeader.setWillQos(mqttConnectVariableHeader.willQos()); - mqttHeader.setWillFlag(mqttConnectVariableHeader.isWillFlag()); - mqttHeader.setCleanSession(mqttConnectVariableHeader.isCleanSession()); - mqttHeader - .setKeepAliveTimeSeconds(mqttConnectVariableHeader.keepAliveTimeSeconds()); - - requestCommand = RemotingCommand - .createRequestCommand(1000, mqttHeader); - requestCommand.makeCustomHeaderToNet(); - - requestCommand.setBody(payload.encode()); - out.add(requestCommand); - case CONNACK: - case DISCONNECT: - case PUBLISH: - case PUBACK: - case PUBREC: - case PUBREL: - case PUBCOMP: - case SUBSCRIBE: - case SUBACK: - case UNSUBSCRIBE: - case UNSUBACK: - case PINGREQ: - case PINGRESP: - } - } - - private byte[] encode(Object obj) { - String json = JSON.toJSONString(obj, false); - if (json != null) { - return json.getBytes(Charset.forName(RemotingUtil.REMOTING_CHARSET)); - } - return null; - } -} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttMessage2RemotingCommandHandler.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttMessage2RemotingCommandHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..e190695913445476d92879cee5ae39c0e73c20ab --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttMessage2RemotingCommandHandler.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.transport.mqtt; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.handler.codec.mqtt.MqttMessage; +import java.util.List; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.transport.mqtt.dispatcher.EncodeDecodeDispatcher; +import org.apache.rocketmq.remoting.transport.mqtt.dispatcher.Message2MessageEncodeDecode; + +public class MqttMessage2RemotingCommandHandler extends MessageToMessageDecoder { + + /** + * Decode from one message to an other. This method will be called for each written message that + * can be handled by this encoder. + * + * @param ctx the {@link ChannelHandlerContext} which this {@link MessageToMessageDecoder} + * belongs to + * @param msg the message to decode to an other one + * @param out the {@link List} to which decoded messages should be added + * @throws Exception is thrown if an error occurs + */ + @Override + protected void decode(ChannelHandlerContext ctx, MqttMessage msg, List out) + throws Exception { + if (!(msg instanceof MqttMessage)) { + return; + } + RemotingCommand requestCommand = null; + Message2MessageEncodeDecode message2MessageEncodeDecode = EncodeDecodeDispatcher + .getEncodeDecodeDispatcher().get(msg.fixedHeader().messageType()); + if (message2MessageEncodeDecode != null) { + requestCommand = message2MessageEncodeDecode.decode(msg); + } + out.add(requestCommand); + } +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttRemotingClient.java index 8f9862d43930d0d0f73b452787cece77f3cc760f..05dd1b258cd3f1212921a3ed5d3da201f36c857e 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttRemotingClient.java @@ -188,22 +188,22 @@ public class MqttRemotingClient extends NettyRemotingClientAbstract implements R try { long costTime = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTime) { - throw new RemotingTimeoutException("invokeSync call timeout"); + throw new RemotingTimeoutException("InvokeSync call timeout"); } RemotingCommand response = this.invokeSyncWithInterceptor(remotingChannel, request, timeoutMillis - costTime); return response; } catch (RemotingException remotingException) { if (remotingException instanceof RemotingSendRequestException) { - log.warn("invokeSync: send request exception, so close the channel[{}]", addr); + log.warn("InvokeSync: send request exception, so close the channel[{}]", addr); this.closeRemotingChannel(addr, remotingChannel); throw (RemotingSendRequestException) remotingException; } if (remotingException instanceof RemotingTimeoutException) { if (nettyClientConfig.isClientCloseSocketIfTimeout()) { this.closeRemotingChannel(addr, remotingChannel); - log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr); + log.warn("InvokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr); } - log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr); + log.warn("InvokeSync: wait response timeout exception, the channel[{}]", addr); throw (RemotingTimeoutException) remotingException; } } @@ -229,11 +229,11 @@ public class MqttRemotingClient extends NettyRemotingClientAbstract implements R try { long costTime = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTime) { - throw new RemotingTooMuchRequestException("invokeAsync call timeout"); + throw new RemotingTooMuchRequestException("InvokeAsync call timeout"); } this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, invokeCallback); } catch (RemotingSendRequestException e) { - log.warn("invokeAsync: send request exception, so close the channel[{}]", addr); + log.warn("InvokeAsync: send request exception, so close the channel[{}]", addr); this.closeChannel(addr, channel); throw e; } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttRemotingServer.java index 3400e013d89b33f3f8fd723317b555a3a7739017..924ddca24e20eadd7d4c4c29a4f3285cbd88ec75 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttRemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttRemotingServer.java @@ -182,8 +182,8 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R .addLast(defaultEventExecutorGroup, new MqttDecoder(), MqttEncoder.INSTANCE, - new Mqtt2RemotingCommandHandler(), - new RemotingCommand2MqttHandler(), + new MqttMessage2RemotingCommandHandler(), + new RemotingCommand2MqttMessageHandler(), new IdleStateHandler(nettyServerConfig .getConnectionChannelReaderIdleSeconds(), nettyServerConfig diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RemotingCommand2MqttHandler.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RemotingCommand2MqttMessageHandler.java similarity index 94% rename from remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RemotingCommand2MqttHandler.java rename to remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RemotingCommand2MqttMessageHandler.java index 30770891987ddf8484c5fb90380f43d2bb3b6bbc..bfd57e1b56555633ab36c557c4f814c68382e4dc 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RemotingCommand2MqttHandler.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RemotingCommand2MqttMessageHandler.java @@ -22,7 +22,7 @@ import io.netty.handler.codec.MessageToMessageEncoder; import java.util.List; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -public class RemotingCommand2MqttHandler extends MessageToMessageEncoder { +public class RemotingCommand2MqttMessageHandler extends MessageToMessageEncoder { /** * Encode from one message to an other. This method will be called for each written message that diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/EncodeDecodeDispatcher.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/EncodeDecodeDispatcher.java new file mode 100644 index 0000000000000000000000000000000000000000..27e40cf25006e37b77b3810bbd00a7debd944dcf --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/EncodeDecodeDispatcher.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.transport.mqtt.dispatcher; + +import io.netty.handler.codec.mqtt.MqttMessageType; +import java.util.HashMap; +import java.util.Map; + +public class EncodeDecodeDispatcher { + + private static Map encodeDecodeDispatcher = new HashMap<>(); + + static { + encodeDecodeDispatcher.put(MqttMessageType.CONNECT, new MqttConnectEncodeDecode()); + encodeDecodeDispatcher.put(MqttMessageType.CONNACK, new MqttConnectackEncodeDecode()); + encodeDecodeDispatcher.put(MqttMessageType.DISCONNECT, null); + encodeDecodeDispatcher.put(MqttMessageType.PUBLISH, null); + encodeDecodeDispatcher.put(MqttMessageType.PUBACK, null); + encodeDecodeDispatcher.put(MqttMessageType.PUBREC, null); + encodeDecodeDispatcher.put(MqttMessageType.PUBREL, null); + encodeDecodeDispatcher.put(MqttMessageType.PUBCOMP, null); + encodeDecodeDispatcher.put(MqttMessageType.SUBSCRIBE, null); + encodeDecodeDispatcher.put(MqttMessageType.SUBACK, null); + encodeDecodeDispatcher.put(MqttMessageType.UNSUBSCRIBE, null); + encodeDecodeDispatcher.put(MqttMessageType.UNSUBACK, null); + encodeDecodeDispatcher.put(MqttMessageType.PINGREQ, null); + encodeDecodeDispatcher.put(MqttMessageType.PINGRESP, null); + } + + public static Map getEncodeDecodeDispatcher() { + return encodeDecodeDispatcher; + } + +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/Message2MessageEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/Message2MessageEncodeDecode.java new file mode 100644 index 0000000000000000000000000000000000000000..2c3735836d73b76bdf81d08f2f9d4b86428c4d5c --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/Message2MessageEncodeDecode.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.transport.mqtt.dispatcher; + +import io.netty.handler.codec.mqtt.MqttMessage; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +public interface Message2MessageEncodeDecode { + + RemotingCommand decode(MqttMessage mqttMessage); + + MqttMessage encode(RemotingCommand remotingCommand); +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttConnectEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttConnectEncodeDecode.java new file mode 100644 index 0000000000000000000000000000000000000000..0f0f3d79440958787064e68dfa0cd2d68b9a0ff8 --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttConnectEncodeDecode.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.transport.mqtt.dispatcher; + +import io.netty.handler.codec.mqtt.MqttConnectMessage; +import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; +import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttMessage; +import org.apache.rocketmq.remoting.netty.CodecHelper; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; +import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload; + +public class MqttConnectEncodeDecode implements Message2MessageEncodeDecode { + + @Override + public RemotingCommand decode(MqttMessage mqttMessage) { + RocketMQMqttConnectPayload payload = RocketMQMqttConnectPayload + .fromMqttConnectPayload(((MqttConnectMessage) mqttMessage).payload()); + RemotingCommand requestCommand = null; + MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader(); + MqttConnectVariableHeader variableHeader = (MqttConnectVariableHeader) mqttMessage + .variableHeader(); + + MqttHeader mqttHeader = new MqttHeader(); + mqttHeader.setMessageType(mqttFixedHeader.messageType().value()); + mqttHeader.setDup(mqttFixedHeader.isDup()); + mqttHeader.setQosLevel(mqttFixedHeader.qosLevel().value()); + mqttHeader.setRetain(mqttFixedHeader.isRetain()); + mqttHeader.setRemainingLength(mqttFixedHeader.remainingLength()); + + mqttHeader.setName(variableHeader.name()); + mqttHeader.setVersion(variableHeader.version()); + mqttHeader.setHasUserName(variableHeader.hasUserName()); + mqttHeader.setHasPassword(variableHeader.hasPassword()); + mqttHeader.setWillRetain(variableHeader.isWillRetain()); + mqttHeader.setWillQos(variableHeader.willQos()); + mqttHeader.setWillFlag(variableHeader.isWillFlag()); + mqttHeader.setCleanSession(variableHeader.isCleanSession()); + mqttHeader + .setKeepAliveTimeSeconds(variableHeader.keepAliveTimeSeconds()); + + requestCommand = RemotingCommand + .createRequestCommand(1000, mqttHeader); + CodecHelper.makeCustomHeaderToNet(requestCommand); + + requestCommand.setBody(payload.encode()); + return requestCommand; + } + + @Override + public MqttMessage encode(RemotingCommand remotingCommand) { + return null; + } +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttConnectackEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttConnectackEncodeDecode.java new file mode 100644 index 0000000000000000000000000000000000000000..0c603bf3accbf881479e5aba60f7d2903a62058b --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttConnectackEncodeDecode.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.transport.mqtt.dispatcher; + +import io.netty.handler.codec.mqtt.MqttMessage; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +public class MqttConnectackEncodeDecode implements Message2MessageEncodeDecode { + + @Override + public RemotingCommand decode(MqttMessage mqttMessage) { + return null; + } + + @Override + public MqttMessage encode(RemotingCommand remotingCommand) { + return null; + } +} diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java b/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java index abfff90ce0d8366ca73e8f317d9d1845f57e03a1..b610057fecfd0c2abccc67eac1a7bb3da447ce5e 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java @@ -16,10 +16,39 @@ */ package org.apache.rocketmq.snode.processor; +import io.netty.handler.codec.mqtt.MqttConnectMessage; +import io.netty.handler.codec.mqtt.MqttConnectPayload; +import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; +import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttQoS; +import org.apache.rocketmq.remoting.ClientConfig; +import org.apache.rocketmq.remoting.RemotingChannel; +import org.apache.rocketmq.remoting.ServerConfig; +import org.apache.rocketmq.snode.SnodeController; +import org.apache.rocketmq.snode.config.SnodeConfig; +import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler; +import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) public class MqttConnectMessageHandlerTest { + @Mock + private RemotingChannel remotingChannel; + + @Test + public void testHandlerMessage() throws Exception { + + MqttConnectMessageHandler mqttConnectMessageHandler = new MqttConnectMessageHandler( + new SnodeController(new ServerConfig(), new ClientConfig(), new SnodeConfig())); + + MqttConnectPayload payload = new MqttConnectPayload("1234567", "testTopic", "willMessage".getBytes(), null, "1234567".getBytes()); + MqttConnectMessage mqttConnectMessage = new MqttConnectMessage(new MqttFixedHeader( + MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 200),new MqttConnectVariableHeader(null,4,false,false,false,0,false,false,50),new MqttConnectPayload("abcd", "ttest", "message".getBytes(),"user","password".getBytes())); + + mqttConnectMessageHandler.handleMessage(mqttConnectMessage, remotingChannel); + } }