diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPingreqMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPingreqMessageHandler.java index d867476fd2da9466de6c1255046900c02e2aa69a..dd1972d5f8edcff46e0828a9027be0166e3b1253 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPingreqMessageHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPingreqMessageHandler.java @@ -17,14 +17,22 @@ package org.apache.rocketmq.mqtt.mqtthandler.impl; +import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttPubAckMessage; +import org.apache.rocketmq.common.client.Client; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl; +import org.apache.rocketmq.mqtt.exception.WrongMessageTypeException; import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler; import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; public class MqttPingreqMessageHandler implements MessageHandler { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); @@ -43,6 +51,24 @@ public class MqttPingreqMessageHandler implements MessageHandler { */ @Override public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) { - return null; + IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) defaultMqttMessageProcessor.getIotClientManager(); + Client client = iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel); + log.debug("Handle MQTT client: {} Pingreq.", client.getClientId()); + RemotingCommand response = RemotingCommand.createResponseCommand(MqttHeader.class); + if (client != null && client.isConnected()) { + client.setLastUpdateTimestamp(System.currentTimeMillis()); + MqttHeader mqttHeader = (MqttHeader) response.readCustomHeader(); + mqttHeader.setMessageType(MqttMessageType.PINGRESP.value()); + mqttHeader.setDup(false); + mqttHeader.setQosLevel(0); + mqttHeader.setRetain(false); + mqttHeader.setRemainingLength(0); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("MQTT Client is null or not connected"); + return response; } } diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java index 563fdac4378a14c3d25680ece7651764b187546c..cb283144b7206ae849a90dcf1c8fbc8fb7fc1691 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java @@ -132,6 +132,7 @@ public class DefaultMqttMessageProcessor implements RequestProcessor { break; case UNSUBSCRIBE: case PINGREQ: + break; case DISCONNECT: } return type2handler.get(MqttMessageType.valueOf(mqttHeader.getMessageType())).handleMessage(mqttMessage, remotingChannel); diff --git a/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttPingreqMessageHandlerTest.java b/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttPingreqMessageHandlerTest.java new file mode 100644 index 0000000000000000000000000000000000000000..204cb8b2e05cac4f6839f96cb9ad255febfdfe52 --- /dev/null +++ b/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttPingreqMessageHandlerTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.mqtt; + +import io.netty.handler.codec.mqtt.MqttMessage; +import org.apache.rocketmq.common.client.Client; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl; +import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPingreqMessageHandler; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; +import org.apache.rocketmq.remoting.RemotingChannel; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.*; +import static org.junit.Assert.*; + +@RunWith(MockitoJUnitRunner.class) +public class MqttPingreqMessageHandlerTest { + @Mock + private RemotingChannel remotingChannel; + @Mock + private IOTClientManagerImpl iotClientManager; + @Mock + private MqttMessage mqttMessage; + @Mock + private Client client; + @Mock + private DefaultMqttMessageProcessor processor; + + private MqttPingreqMessageHandler mqttPingreqMessageHandler; + + @Before + public void init() { + mqttPingreqMessageHandler = new MqttPingreqMessageHandler(processor); + when(processor.getIotClientManager()).thenReturn(iotClientManager); + when(iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel)).thenReturn(client); + when(client.getClientId()).thenReturn("Mock Client"); + } + + @Test + public void testHandlerMessageReturnResp() { + when(client.isConnected()).thenReturn(true); + RemotingCommand response = mqttPingreqMessageHandler.handleMessage(mqttMessage, remotingChannel); + verify(client).setLastUpdateTimestamp(anyLong()); + assertEquals(ResponseCode.SUCCESS, response.getCode()); + } + + @Test + public void testHandlerMessageReturnNull() { + when(client.isConnected()).thenReturn(false); + RemotingCommand response = mqttPingreqMessageHandler.handleMessage(mqttMessage, remotingChannel); + assertEquals(ResponseCode.SYSTEM_ERROR,response.getCode()); + + } +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/EncodeDecodeDispatcher.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/EncodeDecodeDispatcher.java index 2c17f912c5a4b941f7355b94ecbe1cd5538c376a..3415e3a22c72f859ecc74e05e3e0ca35db1f59fa 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/EncodeDecodeDispatcher.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/EncodeDecodeDispatcher.java @@ -38,8 +38,8 @@ public class EncodeDecodeDispatcher { encodeDecodeDispatcher.put(MqttMessageType.SUBACK, new MqttSubackEncodeDecode()); encodeDecodeDispatcher.put(MqttMessageType.UNSUBSCRIBE, new MqttUnSubscribeEncodeDecode()); encodeDecodeDispatcher.put(MqttMessageType.UNSUBACK, new MqttUnSubackEncodeDecode()); - encodeDecodeDispatcher.put(MqttMessageType.PINGREQ, null); - encodeDecodeDispatcher.put(MqttMessageType.PINGRESP, null); + encodeDecodeDispatcher.put(MqttMessageType.PINGREQ, new MqttPingReqEncodeDecode() ); + encodeDecodeDispatcher.put(MqttMessageType.PINGRESP, new MqttPingRespEncodeDecode()); } public static Map getEncodeDecodeDispatcher() { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPingReqEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPingReqEncodeDecode.java new file mode 100644 index 0000000000000000000000000000000000000000..569a0c901ddf1b0aa1c413ea6fa5b7aac58a520c --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPingReqEncodeDecode.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.transport.mqtt.dispatcher; + +import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttMessage; +import java.io.UnsupportedEncodingException; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; + +public class MqttPingReqEncodeDecode implements Message2MessageEncodeDecode { + @Override + public RemotingCommand decode(MqttMessage mqttMessage) { + MqttHeader mqttHeader = new MqttHeader(); + MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader(); + mqttHeader.setMessageType(mqttFixedHeader.messageType().value()); + mqttHeader.setDup(mqttFixedHeader.isDup()); + mqttHeader.setQosLevel(mqttFixedHeader.qosLevel().value()); + mqttHeader.setRetain(mqttFixedHeader.isRetain()); + mqttHeader.setRemainingLength(mqttFixedHeader.remainingLength()); + return RemotingCommand.createRequestCommand(1000, mqttHeader); + } + + @Override + public MqttMessage encode( + RemotingCommand remotingCommand) throws RemotingCommandException, UnsupportedEncodingException { + return null; + } +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPingRespEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPingRespEncodeDecode.java new file mode 100644 index 0000000000000000000000000000000000000000..df7431b33031e54323d90e4d44c4399edd48a824 --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPingRespEncodeDecode.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.transport.mqtt.dispatcher; + +import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttQoS; +import java.io.UnsupportedEncodingException; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; + +public class MqttPingRespEncodeDecode implements Message2MessageEncodeDecode { + @Override + public RemotingCommand decode(MqttMessage mqttMessage) { + return null; + } + + @Override + public MqttMessage encode( + RemotingCommand remotingCommand) throws RemotingCommandException, UnsupportedEncodingException { + MqttHeader mqttHeader = (MqttHeader) remotingCommand.getCustomHeader(); + return new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGRESP, mqttHeader.isDup(), MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(), mqttHeader.getRemainingLength())); + + } +} +