diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/InFlightMessage.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/InFlightMessage.java index 57562dd672d50a6216d80b7ae8ce0eed4b4a81d8..1ba191fcdb1678e2bc123ed013d0a9f3e2ce9664 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/InFlightMessage.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/InFlightMessage.java @@ -26,7 +26,7 @@ public class InFlightMessage { private final String messageId; private final long queueOffset; - InFlightMessage(String topic, Integer pushQos, byte[] body, BrokerData brokerData, String messageId, + public InFlightMessage(String topic, Integer pushQos, byte[] body, BrokerData brokerData, String messageId, long queueOffset) { this.topic = topic; this.pushQos = pushQos; diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/MQTTSession.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/MQTTSession.java index 7ed89562e6b2bda57bb530186d46535c7e75b5ed..32990a4b1b2a5e5b513b3b69c11fd3be86bbef36 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/MQTTSession.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/MQTTSession.java @@ -45,6 +45,8 @@ import org.apache.rocketmq.remoting.netty.NettyChannelImpl; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; +import static org.apache.rocketmq.mqtt.constant.MqttConstant.TOPIC_CLIENTID_SEPARATOR; + public class MQTTSession extends Client { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); @@ -110,6 +112,11 @@ public class MQTTSession extends Client { return Objects.equals(this.getClientId(), client.getClientId()); } + @Override + public int hashCode() { + return Objects.hash(this.getClientId()); + } + public boolean isConnected() { return isConnected; } @@ -143,7 +150,7 @@ public class MQTTSession extends Client { inflightSlots.decrementAndGet(); mqttHeader.setPacketId(getNextPacketId()); inflightWindow.put(mqttHeader.getPacketId(), new InFlightMessage(mqttHeader.getTopicName(), mqttHeader.getQosLevel(), messageExt.getBody(), brokerData, messageExt.getMsgId(), messageExt.getQueueOffset())); - inflightTimeouts.add(new InFlightPacket(mqttHeader.getPacketId(), FLIGHT_BEFORE_RESEND_MS)); +// inflightTimeouts.add(new InFlightPacket(mqttHeader.getPacketId(), FLIGHT_BEFORE_RESEND_MS)); put2processTable(((IOTClientManagerImpl) this.defaultMqttMessageProcessor.getIotClientManager()).getProcessTable(), brokerData.getBrokerName(), MqttUtil.getRootTopic(mqttHeader.getTopicName()), messageExt); pushMessage2Client(mqttHeader, messageExt.getBody()); } @@ -155,7 +162,7 @@ public class MQTTSession extends Client { ConcurrentHashMap>> processTable = ((IOTClientManagerImpl) this.defaultMqttMessageProcessor.getIotClientManager()).getProcessTable(); ConcurrentHashMap> map = processTable.get(remove.getBrokerData().getBrokerName()); if (map != null) { - TreeMap treeMap = map.get(rootTopic + "@" + this.getClientId()); + TreeMap treeMap = map.get(rootTopic + TOPIC_CLIENTID_SEPARATOR + this.getClientId()); if (treeMap != null) { treeMap.remove(remove.getQueueOffset()); } @@ -193,10 +200,10 @@ public class MQTTSession extends Client { MessageExt messageExt) { ConcurrentHashMap> map; TreeMap treeMap; - String offsetKey = rootTopic + "@" + this.getClientId(); - if (processTable.contains(brokerName)) { + String offsetKey = rootTopic + TOPIC_CLIENTID_SEPARATOR + this.getClientId(); + if (processTable.containsKey(brokerName)) { map = processTable.get(brokerName); - if (map.contains(offsetKey)) { + if (map.containsKey(offsetKey)) { treeMap = map.get(offsetKey); treeMap.putIfAbsent(messageExt.getQueueOffset(), messageExt); } else { @@ -252,4 +259,11 @@ public class MQTTSession extends Client { return inflightWindow; } + public DelayQueue getInflightTimeouts() { + return inflightTimeouts; + } + + public Hashtable getInUsePacketIds() { + return inUsePacketIds; + } } diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/constant/MqttConstant.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/constant/MqttConstant.java index 71f08af8c1bd92acf8bd128e40f9ec0762d26f66..5a4a4c928dd1c524a8a89d65d903aed5cf90086c 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/constant/MqttConstant.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/constant/MqttConstant.java @@ -25,6 +25,7 @@ public class MqttConstant { public static final String SUBSCRIPTION_FLAG_PLUS = "+"; public static final String SUBSCRIPTION_FLAG_SHARP = "#"; public static final String SUBSCRIPTION_SEPARATOR = "/"; + public static final String TOPIC_CLIENTID_SEPARATOR = "@"; public static final long DEFAULT_TIMEOUT_MILLS = 3000L; public static final String PROPERTY_MQTT_QOS = "PROPERTY_MQTT_QOS"; public static final AttributeKey MQTT_CLIENT_ATTRIBUTE_KEY = AttributeKey.valueOf("mqtt.client"); diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/MessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/MessageHandler.java index 679a2942aa0c6e7f1dfff6ca2e33a93cbafc88ba..8b118d9124563b937a06ef29b944b819618cd280 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/MessageHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/MessageHandler.java @@ -20,6 +20,7 @@ package org.apache.rocketmq.mqtt.mqtthandler; import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; import io.netty.handler.codec.mqtt.MqttQoS; import java.util.Enumeration; import java.util.HashSet; @@ -56,7 +57,7 @@ public interface MessageHandler { if (topic2Clients.containsKey(MqttUtil.getRootTopic(topic))) { Set clients = topic2Clients.get(MqttUtil.getRootTopic(topic)); for (Client client : clients) { - if(((MQTTSession)client).isConnected()) { + if (((MQTTSession) client).isConnected()) { Subscription subscription = clientId2Subscription.get(client.getClientId()); Enumeration keys = subscription.getSubscriptionTable().keys(); while (keys.hasMoreElements()) { @@ -71,18 +72,18 @@ public interface MessageHandler { return clientsTobePush; } - default RemotingCommand doResponse(MqttFixedHeader fixedHeader) { + default RemotingCommand doResponse(MqttFixedHeader fixedHeader, MqttPublishVariableHeader variableHeader) { if (fixedHeader.qosLevel().value() > 0) { RemotingCommand command = RemotingCommand.createResponseCommand(MqttHeader.class); MqttHeader mqttHeader = (MqttHeader) command.readCustomHeader(); - if (fixedHeader.qosLevel().equals(MqttQoS.AT_MOST_ONCE)) { + if (fixedHeader.qosLevel().equals(MqttQoS.AT_LEAST_ONCE)) { mqttHeader.setMessageType(MqttMessageType.PUBACK.value()); mqttHeader.setDup(false); mqttHeader.setQosLevel(MqttQoS.AT_MOST_ONCE.value()); mqttHeader.setRetain(false); mqttHeader.setRemainingLength(2); - mqttHeader.setPacketId(0); - } else if (fixedHeader.qosLevel().equals(MqttQoS.AT_LEAST_ONCE)) { + mqttHeader.setPacketId(variableHeader.packetId()); + } else if (fixedHeader.qosLevel().equals(MqttQoS.EXACTLY_ONCE)) { //PUBREC/PUBREL/PUBCOMP } return command; diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttMessageForwardHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttMessageForwardHandler.java index f336105a55d438291206ac24750c7ed17171c2c3..12c9b8cb8bbe08eb63dea7d5afa1743050fda64c 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttMessageForwardHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttMessageForwardHandler.java @@ -91,7 +91,7 @@ public class MqttMessageForwardHandler implements MessageHandler { //add task to orderedExecutor this.defaultMqttMessageProcessor.getOrderedExecutor().executeOrdered(client.getClientId(), SafeRunnable.safeRun(mqttPushTask)); } - return doResponse(fixedHeader); + return doResponse(fixedHeader, variableHeader); } return null; } diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPublishMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPublishMessageHandler.java index 27641e726b667d3ca302789f5a4e17be6c5e8f6a..7248972740154301209d13145bcc60b0fe3f3d23 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPublishMessageHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPublishMessageHandler.java @@ -92,7 +92,7 @@ public class MqttPublishMessageHandler implements MessageHandler { MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) message; MqttFixedHeader fixedHeader = mqttPublishMessage.fixedHeader(); MqttPublishVariableHeader variableHeader = mqttPublishMessage.variableHeader(); - if (MqttUtil.isQosLegal(fixedHeader.qosLevel())) { + if (!MqttUtil.isQosLegal(fixedHeader.qosLevel())) { log.error("The QoS level should be 0 or 1 or 2. The connection will be closed. remotingChannel={}, MqttMessage={}", remotingChannel.toString(), message.toString()); remotingChannel.close(); return null; @@ -128,7 +128,7 @@ public class MqttPublishMessageHandler implements MessageHandler { } finally { ReferenceCountUtil.release(message); } - + break; case AT_LEAST_ONCE: // Store msg and invoke callback to publish msg to subscribers // 1. Check if the root topic has been created @@ -183,11 +183,11 @@ public class MqttPublishMessageHandler implements MessageHandler { log.error("Store Qos=1 Message error: {}", ex); } }); - + break; case EXACTLY_ONCE: throw new MqttRuntimeException("Qos = 2 messages are not supported yet."); } - return doResponse(fixedHeader); + return doResponse(fixedHeader, variableHeader); } private void transferMessage(Set snodeAddresses, String topic, byte[] body) throws MqttException { diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/InnerMqttMessageProcessor.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/InnerMqttMessageProcessor.java index 90171241e4b4d0639d9413da37f53a1dfd8b7acb..55f388a97025255991fea6a1be14c0642c808312 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/InnerMqttMessageProcessor.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/InnerMqttMessageProcessor.java @@ -59,7 +59,8 @@ public class InnerMqttMessageProcessor implements RequestProcessor { private NnodeService nnodeService; private MqttMessageForwardHandler mqttMessageForwardHandler; - public InnerMqttMessageProcessor(DefaultMqttMessageProcessor defaultMqttMessageProcessor, RemotingServer innerMqttRemotingServer) { + public InnerMqttMessageProcessor(DefaultMqttMessageProcessor defaultMqttMessageProcessor, + RemotingServer innerMqttRemotingServer) { this.defaultMqttMessageProcessor = defaultMqttMessageProcessor; this.willMessageService = this.defaultMqttMessageProcessor.getWillMessageService(); this.iotClientManager = this.defaultMqttMessageProcessor.getIotClientManager(); @@ -73,14 +74,14 @@ public class InnerMqttMessageProcessor implements RequestProcessor { public RemotingCommand processRequest(RemotingChannel remotingChannel, RemotingCommand message) throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException { MqttHeader mqttHeader = (MqttHeader) message.readCustomHeader(); - if(mqttHeader.getMessageType().equals(PUBLISH)){ + if (mqttHeader.getMessageType().equals(PUBLISH)) { MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.valueOf(mqttHeader.getMessageType()), mqttHeader.isDup(), MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(), mqttHeader.getRemainingLength()); MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(mqttHeader.getTopicName(), mqttHeader.getPacketId()); MqttMessage mqttMessage = new MqttPublishMessage(fixedHeader, mqttPublishVariableHeader, Unpooled.copiedBuffer(message.getBody())); return mqttMessageForwardHandler.handleMessage(mqttMessage, remotingChannel); - }else{ + } else { return defaultMqttMessageProcessor.processRequest(remotingChannel, message); } diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/task/MqttPushTask.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/task/MqttPushTask.java index 48c3cd8def435e0168fd26c4e063398626c66fa5..e865aa9e98e231329e2ff85c4d9646c55a29ac23 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/task/MqttPushTask.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/task/MqttPushTask.java @@ -113,7 +113,7 @@ public class MqttPushTask implements Runnable { //push message if in-flight window has slot(not full) client.pushMessageQos1(mqttHeader, messageExt, brokerData); } - if (inflightFullFlag == true) { + if (inflightFullFlag) { break; } maxOffsetInQueue = getMaxOffset(brokerData.getBrokerName(), rootTopic); diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/util/MqttUtil.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/util/MqttUtil.java index 3eefc48ea76e987c83d84a9d725543ac20885e9c..d7327c68a7580cc3caaf5dd94b77d3911d8ecfb8 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/util/MqttUtil.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/util/MqttUtil.java @@ -39,7 +39,7 @@ public class MqttUtil { if (!qos.equals(MqttQoS.AT_LEAST_ONCE) && !qos.equals(MqttQoS.AT_MOST_ONCE) && !qos.equals(MqttQoS.EXACTLY_ONCE)) { return false; } - return false; + return true; } public static boolean isMatch(String topicFiter, String topic) { diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/util/orderedexecutor/MathUtils.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/util/orderedexecutor/MathUtils.java index 85196c18be70be6891df2b51da73c901cb7f2eab..7101de0a572fdfa71afe1c2364b0753ad3496ea5 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/util/orderedexecutor/MathUtils.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/util/orderedexecutor/MathUtils.java @@ -36,10 +36,6 @@ public class MathUtils { return mod; } - public static int findNextPositivePowerOfTwo(final int value) { - return 1 << (32 - Integer.numberOfLeadingZeros(value - 1)); - } - /** * Current time from some arbitrary time base in the past, counting in * nanoseconds, and not affected by settimeofday or similar system clock @@ -55,17 +51,6 @@ public class MathUtils { return System.nanoTime(); } - /** - * Milliseconds elapsed since the time specified, the input is nanoTime - * the only conversion happens when computing the elapsed time. - * - * @param startNanoTime the start of the interval that we are measuring - * @return elapsed time in milliseconds. - */ - public static long elapsedMSec(long startNanoTime) { - return (System.nanoTime() - startNanoTime) / NANOSECONDS_PER_MILLISECOND; - } - /** * Microseconds elapsed since the time specified, the input is nanoTime * the only conversion happens when computing the elapsed time. diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/util/orderedexecutor/MdcUtils.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/util/orderedexecutor/MdcUtils.java index 0dab4fad8c7ce38e22fc073fe5d8edf715d0aebe..0744e93d11bc110cbe558f6bd31bc850fe7edd83 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/util/orderedexecutor/MdcUtils.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/util/orderedexecutor/MdcUtils.java @@ -1,5 +1,4 @@ /* - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/mqtt/src/test/java/org/apache/rocketmq/mqtt/DefaultMqttMessageProcessorTest.java b/mqtt/src/test/java/org/apache/rocketmq/mqtt/DefaultMqttMessageProcessorTest.java index 570a86403e65a3b155ce0cfb9c12c7bac6ab5410..8ff04cc98a0dead89e1f62ffab36c3b086a4022d 100644 --- a/mqtt/src/test/java/org/apache/rocketmq/mqtt/DefaultMqttMessageProcessorTest.java +++ b/mqtt/src/test/java/org/apache/rocketmq/mqtt/DefaultMqttMessageProcessorTest.java @@ -19,12 +19,13 @@ package org.apache.rocketmq.mqtt; import io.netty.handler.codec.mqtt.MqttConnectPayload; import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttQoS; -import java.io.UnsupportedEncodingException; -import org.apache.rocketmq.common.MqttConfig; +import org.apache.rocketmq.common.exception.MQClientException; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.remoting.RemotingChannel; -import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; import org.apache.rocketmq.remoting.transport.mqtt.MqttRemotingServer; @@ -37,6 +38,8 @@ import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) public class DefaultMqttMessageProcessorTest { + + @Mock private DefaultMqttMessageProcessor defaultMqttMessageProcessor; @Mock @@ -53,11 +56,10 @@ public class DefaultMqttMessageProcessorTest { @Before public void init() { - defaultMqttMessageProcessor = new DefaultMqttMessageProcessor(new MqttConfig(), mqttRemotingServer); } @Test - public void testProcessRequest() throws RemotingCommandException, UnsupportedEncodingException { + public void testProcessRequest() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException { RemotingCommand request = createMqttConnectMesssageCommand(); defaultMqttMessageProcessor.processRequest(remotingChannel, request); } diff --git a/mqtt/src/test/java/org/apache/rocketmq/mqtt/MQTTSessionTest.java b/mqtt/src/test/java/org/apache/rocketmq/mqtt/MQTTSessionTest.java new file mode 100644 index 0000000000000000000000000000000000000000..c5de1d719072900b632507744049361a12118c4a --- /dev/null +++ b/mqtt/src/test/java/org/apache/rocketmq/mqtt/MQTTSessionTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.mqtt; + +import java.lang.reflect.Method; +import java.util.HashSet; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.rocketmq.common.MqttConfig; +import org.apache.rocketmq.common.SnodeConfig; +import org.apache.rocketmq.common.client.ClientRole; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.mqtt.client.InFlightMessage; +import org.apache.rocketmq.mqtt.client.MQTTSession; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; +import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.junit.Assert.assertEquals; + +@RunWith(MockitoJUnitRunner.class) +public class MQTTSessionTest { + + private DefaultMqttMessageProcessor defaultMqttMessageProcessor = new DefaultMqttMessageProcessor(new MqttConfig(), new SnodeConfig(), null, null, null); + private MQTTSession mqttSession = new MQTTSession("testClient", ClientRole.IOTCLIENT, new HashSet() { + { + add("IOT_GROUP"); + } + }, true, true, null, System.currentTimeMillis(), defaultMqttMessageProcessor); + + private ConcurrentHashMap>> processTable = new ConcurrentHashMap<>(); + + @Test + public void test_put2processTable() throws Exception { + Method method = MQTTSession.class.getDeclaredMethod("put2processTable", ConcurrentHashMap.class, String.class, String.class, MessageExt.class); + for (int i = 0; i < 5; i++) { + MessageExt messageExt = new MessageExt(0, System.currentTimeMillis(), null, System.currentTimeMillis(), null, null); + messageExt.setQueueOffset(i); + method.setAccessible(true); + method.invoke(mqttSession, processTable, "broker1", "topic" + i, messageExt); + } + MessageExt messageExt_0 = new MessageExt(0, System.currentTimeMillis(), null, System.currentTimeMillis(), null, null); + messageExt_0.setQueueOffset(10); + method.invoke(mqttSession, processTable, "broker2", "topic0", messageExt_0); + + MessageExt messageExt_1 = new MessageExt(0, System.currentTimeMillis(), null, System.currentTimeMillis(), null, null); + messageExt_1.setQueueOffset(11); + method.invoke(mqttSession, processTable, "broker2", "topic0", messageExt_1); + + assertEquals(2, processTable.size()); + assertEquals(5, processTable.get("broker1").size()); + assertEquals(2, processTable.get("broker2").get("topic0@testClient").size()); + } + + @Test + public void test_pushMessageQos1() { + MqttHeader mqttHeader = new MqttHeader(); + mqttHeader.setTopicName("topicTest"); + mqttHeader.setQosLevel(1); + + MessageExt messageExt = new MessageExt(0, System.currentTimeMillis(), null, System.currentTimeMillis(), null, null); + messageExt.setTopic("topicTest"); + messageExt.setBody("Hello".getBytes()); + BrokerData brokerData = new BrokerData("DefaultCluster", "broker1", null); + for (int i = 0; i < 10; i++) { + mqttSession.pushMessageQos1(mqttHeader, messageExt, brokerData); + assertEquals(i + 1, mqttHeader.getPacketId().intValue()); + System.out.println(mqttHeader.getPacketId()); + } + assertEquals(0, mqttSession.getInflightSlots().get()); + assertEquals(10, mqttSession.getInflightWindow().size()); + assertEquals(10, mqttSession.getInflightTimeouts().size()); + + mqttSession.pushMessageQos1(mqttHeader, messageExt, brokerData); + + assertEquals(0, mqttSession.getInflightSlots().get()); + assertEquals(10, mqttSession.getInflightWindow().size()); + assertEquals(10, mqttSession.getInflightTimeouts().size()); + } + + @Test + public void test_pubAckReceived() { + MqttHeader mqttHeader = new MqttHeader(); + mqttHeader.setTopicName("topicTest"); + mqttHeader.setQosLevel(1); + + MessageExt messageExt = new MessageExt(0, System.currentTimeMillis(), null, System.currentTimeMillis(), null, null); + messageExt.setTopic("topicTest"); + messageExt.setBody("Hello".getBytes()); + BrokerData brokerData = new BrokerData("DefaultCluster", "broker1", null); + for (int i = 0; i < 2; i++) { + mqttSession.pushMessageQos1(mqttHeader, messageExt, brokerData); + } + InFlightMessage inFlightMessage = mqttSession.pubAckReceived(1); + assertEquals(9, mqttSession.getInflightSlots().intValue()); + assertEquals(1, mqttSession.getInflightWindow().size()); + assertEquals(false, mqttSession.getInUsePacketIds().containsKey(1)); + } +} diff --git a/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttConnectMessageHandlerTest.java b/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttConnectMessageHandlerTest.java index 0bc29091f9b4d9aaff5db38018ea819b97614213..596d529d7decf351f01f037a3a9a441977d522d8 100644 --- a/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttConnectMessageHandlerTest.java +++ b/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttConnectMessageHandlerTest.java @@ -22,6 +22,8 @@ import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttQoS; +import org.apache.rocketmq.common.MqttConfig; +import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttConnectMessageHandler; import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.remoting.RemotingChannel; @@ -42,6 +44,7 @@ public class MqttConnectMessageHandlerTest { @Test public void testHandlerMessage() throws Exception { + this.defaultMqttMessageProcessor = new DefaultMqttMessageProcessor(new MqttConfig(), new SnodeConfig(), null, null, null); MqttConnectMessageHandler mqttConnectMessageHandler = new MqttConnectMessageHandler(defaultMqttMessageProcessor); MqttConnectPayload payload = new MqttConnectPayload("1234567", "testTopic", "willMessage".getBytes(), null, "1234567".getBytes()); MqttConnectMessage mqttConnectMessage = new MqttConnectMessage(new MqttFixedHeader( diff --git a/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttPingreqMessageHandlerTest.java b/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttPingreqMessageHandlerTest.java index 204cb8b2e05cac4f6839f96cb9ad255febfdfe52..64daa622f574a3cad5a926162ca46f33dab45e9e 100644 --- a/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttPingreqMessageHandlerTest.java +++ b/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttPingreqMessageHandlerTest.java @@ -18,23 +18,23 @@ package org.apache.rocketmq.mqtt; import io.netty.handler.codec.mqtt.MqttMessage; -import org.apache.rocketmq.common.client.Client; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl; +import org.apache.rocketmq.mqtt.client.MQTTSession; import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPingreqMessageHandler; import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.protocol.RemotingCommand; - import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; +import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.Mockito.*; -import static org.junit.Assert.*; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) public class MqttPingreqMessageHandlerTest { @@ -45,7 +45,7 @@ public class MqttPingreqMessageHandlerTest { @Mock private MqttMessage mqttMessage; @Mock - private Client client; + private MQTTSession client; @Mock private DefaultMqttMessageProcessor processor; diff --git a/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttPubackMessageHandlerTest.java b/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttPubackMessageHandlerTest.java new file mode 100644 index 0000000000000000000000000000000000000000..5fc7422a662df1fcfd9f6af49ee0c40d1f3a0c58 --- /dev/null +++ b/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttPubackMessageHandlerTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.mqtt; + +import io.netty.handler.codec.mqtt.MqttConnectMessage; +import io.netty.handler.codec.mqtt.MqttConnectPayload; +import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; +import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttPubAckMessage; +import io.netty.handler.codec.mqtt.MqttQoS; +import org.apache.rocketmq.common.MqttConfig; +import org.apache.rocketmq.common.SnodeConfig; +import org.apache.rocketmq.common.client.ClientRole; +import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl; +import org.apache.rocketmq.mqtt.client.InFlightMessage; +import org.apache.rocketmq.mqtt.client.MQTTSession; +import org.apache.rocketmq.mqtt.exception.WrongMessageTypeException; +import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPubackMessageHandler; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; +import org.apache.rocketmq.remoting.RemotingChannel; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doReturn; + +@RunWith(MockitoJUnitRunner.class) +public class MqttPubackMessageHandlerTest { + + @Rule + public final ExpectedException exception = ExpectedException.none(); + + private DefaultMqttMessageProcessor defaultMqttMessageProcessor; + private MqttPubackMessageHandler mqttPubackMessageHandler; + + @Mock + private RemotingChannel remotingChannel; + + @Before + public void before() { + defaultMqttMessageProcessor = Mockito.spy(new DefaultMqttMessageProcessor(new MqttConfig(), new SnodeConfig(), null, null, null)); + mqttPubackMessageHandler = new MqttPubackMessageHandler(defaultMqttMessageProcessor); + } + + @Test + public void test_handleMessage_wrongMessageType() { + MqttMessage mqttMessage = new MqttConnectMessage(new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 10), new MqttConnectVariableHeader("name", 0, false, false, false, 1, false, false, 10), new MqttConnectPayload("client1", null, (byte[]) null, null, null)); + + exception.expect(WrongMessageTypeException.class); + mqttPubackMessageHandler.handleMessage(mqttMessage, remotingChannel); + } + + @Test + public void test_handleMessage() { + MqttMessage mqttMessage = new MqttPubAckMessage(new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 10), MqttMessageIdVariableHeader.from(1)); + IOTClientManagerImpl iotClientManager = Mockito.mock(IOTClientManagerImpl.class); + Mockito.when((IOTClientManagerImpl) defaultMqttMessageProcessor.getIotClientManager()).thenReturn(iotClientManager); + + MQTTSession mqttSession = Mockito.spy(new MQTTSession("client1", ClientRole.IOTCLIENT, null, true, true, remotingChannel, System.currentTimeMillis(), defaultMqttMessageProcessor)); + Mockito.when(iotClientManager.getClient(anyString(), any(RemotingChannel.class))).thenReturn(mqttSession); + InFlightMessage inFlightMessage = Mockito.spy(new InFlightMessage("topicTest", 0, "Hello".getBytes(), null, null, 0)); + doReturn(inFlightMessage).when(mqttSession).pubAckReceived(anyInt()); + RemotingCommand remotingCommand = mqttPubackMessageHandler.handleMessage(mqttMessage, remotingChannel); + assert remotingCommand == null; + } +} diff --git a/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttPublishMessageHandlerTest.java b/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttPublishMessageHandlerTest.java new file mode 100644 index 0000000000000000000000000000000000000000..2103d3133af0c2e830e8d6cd7faf62f2dd05f60b --- /dev/null +++ b/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttPublishMessageHandlerTest.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.mqtt; + +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.mqtt.MqttConnectMessage; +import io.netty.handler.codec.mqtt.MqttConnectPayload; +import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; +import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttPublishMessage; +import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; +import io.netty.handler.codec.mqtt.MqttQoS; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.rocketmq.common.MqttConfig; +import org.apache.rocketmq.common.SnodeConfig; +import org.apache.rocketmq.common.client.Client; +import org.apache.rocketmq.common.client.ClientRole; +import org.apache.rocketmq.common.client.Subscription; +import org.apache.rocketmq.common.protocol.heartbeat.MqttSubscriptionData; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.common.service.EnodeService; +import org.apache.rocketmq.common.service.NnodeService; +import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl; +import org.apache.rocketmq.mqtt.client.MQTTSession; +import org.apache.rocketmq.mqtt.exception.WrongMessageTypeException; +import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPublishMessageHandler; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; +import org.apache.rocketmq.remoting.RemotingChannel; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; +import org.apache.rocketmq.remoting.transport.mqtt.MqttRemotingServer; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyString; + +@RunWith(MockitoJUnitRunner.class) +public class MqttPublishMessageHandlerTest { + + @Rule + public final ExpectedException exception = ExpectedException.none(); + @Mock + private MqttRemotingServer mqttRemotingServer; + @Mock + private EnodeService enodeService; + @Mock + private NnodeService nnodeService; + + private DefaultMqttMessageProcessor defaultMqttMessageProcessor; + + private MqttPublishMessageHandler mqttPublishMessageHandler; + + @Mock + private RemotingChannel remotingChannel; + + private MqttMessage mqttPublishMessage; + + @Before + public void before() { + defaultMqttMessageProcessor = Mockito.spy(new DefaultMqttMessageProcessor(new MqttConfig(), new SnodeConfig(), mqttRemotingServer, enodeService, nnodeService)); + mqttPublishMessageHandler = new MqttPublishMessageHandler(defaultMqttMessageProcessor); + } + + @Test + public void test_handleMessage_wrongMessageType() throws Exception { + MqttMessage mqttMessage = new MqttConnectMessage(new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 10), new MqttConnectVariableHeader("name", 0, false, false, false, 1, false, false, 10), new MqttConnectPayload("client1", null, (byte[]) null, null, null)); + + exception.expect(WrongMessageTypeException.class); + mqttPublishMessageHandler.handleMessage(mqttMessage, remotingChannel); + } + + @Test + public void test_handleMessage_wrongQos() throws Exception { + mqttPublishMessage = new MqttPublishMessage(new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.FAILURE, false, 10), new MqttPublishVariableHeader("topicTest", 1), Unpooled.copiedBuffer("Hello".getBytes())); + + Mockito.when(remotingChannel.toString()).thenReturn("testRemotingChannel"); + RemotingCommand remotingCommand = mqttPublishMessageHandler.handleMessage(mqttPublishMessage, remotingChannel); + assert remotingCommand == null; + Mockito.verify(remotingChannel).close(); + } + + @Test + public void test_handleMessage_qos0() throws Exception { + prepareSubscribeData(); + mqttPublishMessage = new MqttPublishMessage(new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_MOST_ONCE, false, 10), new MqttPublishVariableHeader("topic/c", 1), Unpooled.copiedBuffer("Hello".getBytes())); + Mockito.when(remotingChannel.toString()).thenReturn("testRemotingChannel"); + RemotingCommand remotingCommand = mqttPublishMessageHandler.handleMessage(mqttPublishMessage, remotingChannel); + assert remotingCommand == null; + } + + @Test + public void test_handleMessage_qos1() throws Exception { + prepareSubscribeData(); + mqttPublishMessage = new MqttPublishMessage(new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false, 10), new MqttPublishVariableHeader("topic/c", 1), Unpooled.copiedBuffer("Hello".getBytes())); + Mockito.when(remotingChannel.toString()).thenReturn("testRemotingChannel"); + TopicRouteData topicRouteData = buildTopicRouteData(); + Mockito.when(nnodeService.getTopicRouteDataByTopic(anyString(), anyBoolean())).thenReturn(topicRouteData); + CompletableFuture future = new CompletableFuture<>(); +// RemotingCommand response = Mockito.mock(RemotingCommand.class); + Mockito.when(this.defaultMqttMessageProcessor.getEnodeService().sendMessage(any(RemotingChannel.class), anyString(), any(RemotingCommand.class))).thenReturn(future); +// doAnswer(mock -> future.complete(response)).when(this.defaultMqttMessageProcessor.getEnodeService().sendMessage(any(RemotingChannel.class), anyString(), any(RemotingCommand.class))); + RemotingCommand remotingCommand = mqttPublishMessageHandler.handleMessage(mqttPublishMessage, remotingChannel); + assert remotingCommand != null; + MqttHeader mqttHeader = (MqttHeader) remotingCommand.readCustomHeader(); + assertEquals(1, mqttHeader.getPacketId().intValue()); + assertEquals(MqttQoS.AT_MOST_ONCE, mqttHeader.getQosLevel()); + assertEquals(false, mqttHeader.isDup()); + assertEquals(false, mqttHeader.isRetain()); + assertEquals(2, mqttHeader.getRemainingLength()); + } + + private TopicRouteData buildTopicRouteData() { + TopicRouteData topicRouteData = new TopicRouteData(); + List brokerDataList = new ArrayList<>(); + brokerDataList.add(new BrokerData("DefaultCluster", "broker1", null)); + topicRouteData.setBrokerDatas(brokerDataList); + return topicRouteData; + } + + private void prepareSubscribeData() { + IOTClientManagerImpl manager = (IOTClientManagerImpl) this.defaultMqttMessageProcessor.getIotClientManager(); + ConcurrentHashMap subscriptions = manager.getClientId2Subscription(); + ConcurrentHashMap> topic2Clients = manager.getTopic2Clients(); + + Subscription subscription1 = new Subscription(); + subscription1.setCleanSession(true); + subscription1.getSubscriptionTable().put("topic/a", new MqttSubscriptionData(1, "client1", "topic/a")); + subscription1.getSubscriptionTable().put("topic/+", new MqttSubscriptionData(1, "client1", "topic/+")); + subscriptions.put("client1", subscription1); + + Subscription subscription2 = new Subscription(); + subscription2.setCleanSession(true); + subscription2.getSubscriptionTable().put("topic/b", new MqttSubscriptionData(1, "client2", "topic/b")); + subscription2.getSubscriptionTable().put("topic/+", new MqttSubscriptionData(1, "client2", "topic/+")); + subscriptions.put("client2", subscription2); + + Subscription subscription3 = new Subscription(); + subscription3.setCleanSession(true); + subscription3.getSubscriptionTable().put("test/c", new MqttSubscriptionData(1, "client3", "topic/c")); + subscription3.getSubscriptionTable().put("test/d", new MqttSubscriptionData(1, "client3", "topic/d")); + subscriptions.put("client3", subscription3); + + Set clients_1 = new HashSet<>(); + Set groups = new HashSet(); + groups.add("IOT_GROUP"); + clients_1.add(new MQTTSession("client1", ClientRole.IOTCLIENT, groups, true, true, remotingChannel, System.currentTimeMillis(), defaultMqttMessageProcessor)); + clients_1.add(new MQTTSession("client2", ClientRole.IOTCLIENT, groups, true, true, remotingChannel, System.currentTimeMillis(), defaultMqttMessageProcessor)); + + Set clients_2 = new HashSet<>(); + groups.add("IOT_GROUP"); + clients_1.add(new MQTTSession("client3", ClientRole.IOTCLIENT, groups, true, true, remotingChannel, System.currentTimeMillis(), defaultMqttMessageProcessor)); + + topic2Clients.put("topic", clients_1); + topic2Clients.put("test", clients_2); + + } +} diff --git a/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttSubscribeMessageHandlerTest.java b/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttSubscribeMessageHandlerTest.java new file mode 100644 index 0000000000000000000000000000000000000000..b3ca4d7343b22c759dfeb5d774e026cbbe8a0507 --- /dev/null +++ b/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttSubscribeMessageHandlerTest.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.mqtt; + +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class MqttSubscribeMessageHandlerTest { + +} diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java index fffb66827a0128ea16272145f7bd47ed26b20c1e..8675f0a02a5defd87783d617c4b7f70794ee8488 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java @@ -58,7 +58,7 @@ public class SnodeStartup { private static final String DEFAULT_MQTT_CONFIG_FILE = "/conf/mqtt.properties"; private static String mqttConfigFileName = System.getProperty("rocketmq.mqtt.config", DEFAULT_MQTT_CONFIG_FILE); - public static void main(String[] args) throws IOException, JoranException { + public static void main(String[] args) throws IOException, JoranException, CloneNotSupportedException { SnodeConfig snodeConfig = loadConfig(args); MqttConfig mqttConfig = loadMqttConfig(snodeConfig); if (snodeConfig.isEmbeddedModeEnable()) { @@ -163,7 +163,7 @@ public class SnodeStartup { return mqttConfig; } - public static SnodeController createSnodeController(SnodeConfig snodeConfig, MqttConfig mqttConfig) throws JoranException { + public static SnodeController createSnodeController(SnodeConfig snodeConfig, MqttConfig mqttConfig) throws JoranException, CloneNotSupportedException { final SnodeController snodeController = new SnodeController(snodeConfig, mqttConfig); diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java index c68ca650341884a48fb5a4c044e6e2702b0dd79b..726251df2ffcdcbd8fd51bf40d265eca440a006b 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java @@ -73,6 +73,11 @@ public class LocalEnodeServiceImpl implements EnodeService { return completableFuture; } + @Override public RemotingCommand pullMessageSync(RemotingChannel remotingChannel, String enodeName, + RemotingCommand request) { + return null; + } + @Override public RemotingCommand creatRetryTopic(RemotingChannel remotingChannel, String enodeName, RemotingCommand request) { try { diff --git a/snode/src/test/java/org/apache/rocketmq/snode/SnodeControllerTest.java b/snode/src/test/java/org/apache/rocketmq/snode/SnodeControllerTest.java index a8c35a7231363efe2a29275b597743f13f3d7ef5..d220e7db4d775c34ad201cc8c9207d17eaaa6f64 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/SnodeControllerTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/SnodeControllerTest.java @@ -18,30 +18,55 @@ package org.apache.rocketmq.snode; import org.apache.rocketmq.common.MqttConfig; import org.apache.rocketmq.common.SnodeConfig; +import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.ServerConfig; +import org.junit.Before; import org.junit.Test; +import org.mockito.Spy; import static org.assertj.core.api.Assertions.assertThat; public class SnodeControllerTest { + @Spy + private ServerConfig serverConfig = new ServerConfig(); + @Spy + private ClientConfig clientConfig = new ClientConfig(); + @Spy + private ServerConfig mqttServerConfig = new ServerConfig(); + @Spy + private ClientConfig mqttClientConfig = new ClientConfig(); + + private SnodeController snodeController; + + @Before + public void init() throws CloneNotSupportedException { + SnodeConfig snodeConfig = new SnodeConfig(); + snodeConfig.setNettyClientConfig(clientConfig); + serverConfig.setListenPort(10912); + snodeConfig.setNettyServerConfig(serverConfig); + + MqttConfig mqttConfig = new MqttConfig(); + mqttServerConfig.setListenPort(mqttConfig.getListenPort()); + mqttConfig.setMqttClientConfig(mqttClientConfig); + mqttConfig.setMqttServerConfig(mqttServerConfig); + + snodeController = new SnodeController(snodeConfig, mqttConfig); + } + @Test public void testSnodeRestart() { - ServerConfig serverConfig = new ServerConfig(); - serverConfig.setListenPort(10912); - SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig()); assertThat(snodeController.initialize()); snodeController.start(); snodeController.shutdown(); } @Test - public void testSnodeRestartWithAclEnable() { + public void testSnodeRestartWithAclEnable() throws CloneNotSupportedException { System.setProperty("rocketmq.home.dir", "src/test/resources"); System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl.yml"); SnodeConfig snodeConfig = new SnodeConfig(); snodeConfig.setAclEnable(true); - SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig()); assertThat(snodeController.initialize()); snodeController.start(); snodeController.shutdown(); diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java b/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java index 482b7213ba0d0f8d1779a5e3c533c610d59382fe..c7460ed7a3fe7903b7e8e8119f595efd8055f3cd 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java @@ -23,13 +23,15 @@ import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2; +import org.apache.rocketmq.common.service.EnodeService; +import org.apache.rocketmq.common.service.NnodeService; +import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.RemotingChannel; +import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.CodecHelper; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.common.service.EnodeService; -import org.apache.rocketmq.common.service.NnodeService; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -47,7 +49,15 @@ public class SendMessageProcessorTest { private SendMessageProcessor sendMessageProcessor; @Spy - private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig()); + private ServerConfig serverConfig = new ServerConfig(); + @Spy + private ClientConfig clientConfig = new ClientConfig(); + @Spy + private ServerConfig mqttServerConfig = new ServerConfig(); + @Spy + private ClientConfig mqttClientConfig = new ClientConfig(); + + private SnodeController snodeController; @Mock private RemotingChannel remotingChannel; @@ -62,8 +72,22 @@ public class SendMessageProcessorTest { @Mock private NnodeService nnodeService; + public SendMessageProcessorTest() { + } + @Before - public void init() { + public void init() throws CloneNotSupportedException { + SnodeConfig snodeConfig = new SnodeConfig(); + serverConfig.setListenPort(snodeConfig.getListenPort()); + snodeConfig.setNettyClientConfig(clientConfig); + snodeConfig.setNettyServerConfig(serverConfig); + + MqttConfig mqttConfig = new MqttConfig(); + mqttServerConfig.setListenPort(mqttConfig.getListenPort()); + mqttConfig.setMqttClientConfig(mqttClientConfig); + mqttConfig.setMqttServerConfig(mqttServerConfig); + + snodeController = new SnodeController(snodeConfig, mqttConfig); snodeController.setNnodeService(nnodeService); snodeController.setEnodeService(enodeService); sendMessageProcessor = new SendMessageProcessor(snodeController); @@ -73,7 +97,7 @@ public class SendMessageProcessorTest { public void testSendMessageV2ProcessRequest() throws RemotingCommandException { CompletableFuture future = new CompletableFuture<>(); RemotingCommand request = createSendMesssageV2Command(); - when(this.snodeController.getEnodeService().sendMessage(null, anyString(), any(RemotingCommand.class))).thenReturn(future); + when(this.snodeController.getEnodeService().sendMessage(any(RemotingChannel.class), anyString(), any(RemotingCommand.class))).thenReturn(future); sendMessageProcessor.processRequest(remotingChannel, request); } @@ -82,7 +106,7 @@ public class SendMessageProcessorTest { snodeController.setEnodeService(enodeService); CompletableFuture future = new CompletableFuture<>(); RemotingCommand request = createSendBatchMesssageCommand(); - when(this.snodeController.getEnodeService().sendMessage(null, anyString(), any(RemotingCommand.class))).thenReturn(future); + when(this.snodeController.getEnodeService().sendMessage(any(RemotingChannel.class), anyString(), any(RemotingCommand.class))).thenReturn(future); sendMessageProcessor.processRequest(remotingChannel, request); } diff --git a/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java b/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java index 3a36e0f256d05fb17fa5e71e4f49a4654e9d0722..05cf1ff97e8725c82e4293934b5f938192ea7c2e 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java @@ -21,6 +21,8 @@ import java.util.List; import org.apache.rocketmq.common.MqttConfig; import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.service.NnodeService; +import org.apache.rocketmq.remoting.ClientConfig; +import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; @@ -46,15 +48,37 @@ import static org.mockito.Mockito.when; public class NnodeServiceImplTest extends SnodeTestBase { @Spy - private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig()); + private ServerConfig serverConfig = new ServerConfig(); + @Spy + private ClientConfig clientConfig = new ClientConfig(); + @Spy + private ServerConfig mqttServerConfig = new ServerConfig(); + @Spy + private ClientConfig mqttClientConfig = new ClientConfig(); + + private SnodeController snodeController; @Mock private NettyRemotingClient remotingClient; private NnodeService nnodeService; + public NnodeServiceImplTest() { + } + @Before - public void init() { + public void init() throws CloneNotSupportedException { + SnodeConfig snodeConfig = new SnodeConfig(); + serverConfig.setListenPort(snodeConfig.getListenPort()); + snodeConfig.setNettyClientConfig(clientConfig); + snodeConfig.setNettyServerConfig(serverConfig); + + MqttConfig mqttConfig = new MqttConfig(); + mqttServerConfig.setListenPort(mqttConfig.getListenPort()); + mqttConfig.setMqttClientConfig(mqttClientConfig); + mqttConfig.setMqttServerConfig(mqttServerConfig); + + snodeController = new SnodeController(snodeConfig, mqttConfig); snodeController.setRemotingClient(remotingClient); nnodeService = new NnodeServiceImpl(snodeController); } diff --git a/snode/src/test/java/org/apache/rocketmq/snode/service/RemoteEnodeServiceImplTest.java b/snode/src/test/java/org/apache/rocketmq/snode/service/RemoteEnodeServiceImplTest.java index 9e3aefe1b880d0cc76191c0cdb8f254bfce2f83e..c33b6855e6282dbeff800607a1ba199cf3f27713 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/service/RemoteEnodeServiceImplTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/service/RemoteEnodeServiceImplTest.java @@ -25,7 +25,9 @@ import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.common.service.EnodeService; import org.apache.rocketmq.common.service.NnodeService; +import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.InvokeCallback; +import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.remoting.netty.ResponseFuture; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient; @@ -57,7 +59,15 @@ public class RemoteEnodeServiceImplTest extends SnodeTestBase { private EnodeService enodeService; @Spy - private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig()); + private ServerConfig serverConfig = new ServerConfig(); + @Spy + private ClientConfig clientConfig = new ClientConfig(); + @Spy + private ServerConfig mqttServerConfig = new ServerConfig(); + @Spy + private ClientConfig mqttClientConfig = new ClientConfig(); + + private SnodeController snodeController; @Mock private NnodeService nnodeService; @@ -71,8 +81,22 @@ public class RemoteEnodeServiceImplTest extends SnodeTestBase { private String group = "snodeGroup"; + public RemoteEnodeServiceImplTest() { + } + @Before - public void init() { + public void init() throws CloneNotSupportedException { + SnodeConfig snodeConfig = new SnodeConfig(); + serverConfig.setListenPort(snodeConfig.getListenPort()); + snodeConfig.setNettyClientConfig(clientConfig); + snodeConfig.setNettyServerConfig(serverConfig); + + MqttConfig mqttConfig = new MqttConfig(); + mqttServerConfig.setListenPort(mqttConfig.getListenPort()); + mqttConfig.setMqttClientConfig(mqttClientConfig); + mqttConfig.setMqttServerConfig(mqttServerConfig); + + snodeController = new SnodeController(snodeConfig, mqttConfig); snodeController.setNnodeService(nnodeService); snodeController.setRemotingClient(remotingClient); enodeService = new RemoteEnodeServiceImpl(snodeController); diff --git a/snode/src/test/java/org/apache/rocketmq/snode/service/SlowConsumerServiceImplTest.java b/snode/src/test/java/org/apache/rocketmq/snode/service/SlowConsumerServiceImplTest.java index 048330dc214cf5eac8e5b0151badd41a8d5fc683..751e25a308f50f0f0c86d8f92fe15c6d346de923 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/service/SlowConsumerServiceImplTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/service/SlowConsumerServiceImplTest.java @@ -18,6 +18,8 @@ package org.apache.rocketmq.snode.service; import org.apache.rocketmq.common.MqttConfig; import org.apache.rocketmq.common.SnodeConfig; +import org.apache.rocketmq.remoting.ClientConfig; +import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.client.SlowConsumerService; import org.apache.rocketmq.snode.client.impl.SlowConsumerServiceImpl; @@ -36,8 +38,17 @@ import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) public class SlowConsumerServiceImplTest { + + @Spy + private ServerConfig serverConfig = new ServerConfig(); + @Spy + private ClientConfig clientConfig = new ClientConfig(); @Spy - private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig()); + private ServerConfig mqttServerConfig = new ServerConfig(); + @Spy + private ClientConfig mqttClientConfig = new ClientConfig(); + + private SnodeController snodeController; private final String enodeName = "testEndoe"; @@ -52,8 +63,22 @@ public class SlowConsumerServiceImplTest { @Mock private ConsumerOffsetManager consumerOffsetManager; + public SlowConsumerServiceImplTest() { + } + @Before - public void init() { + public void init() throws CloneNotSupportedException { + SnodeConfig snodeConfig = new SnodeConfig(); + serverConfig.setListenPort(snodeConfig.getListenPort()); + snodeConfig.setNettyClientConfig(clientConfig); + snodeConfig.setNettyServerConfig(serverConfig); + + MqttConfig mqttConfig = new MqttConfig(); + mqttServerConfig.setListenPort(mqttConfig.getListenPort()); + mqttConfig.setMqttClientConfig(mqttClientConfig); + mqttConfig.setMqttServerConfig(mqttServerConfig); + + snodeController = new SnodeController(snodeConfig, mqttConfig); slowConsumerService = new SlowConsumerServiceImpl(snodeController); } diff --git a/test/pom.xml b/test/pom.xml index ba603c815d4ee84a11d54c6b1b6946c503ea8333..43e1cd96279563d83a6c53477544ef8d90318ce8 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -46,6 +46,10 @@ truth 0.30 + + ${project.groupId} + rocketmq-common + diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/ChinaPropIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/ChinaPropIT.java index e524fb3dc048ecf8fbe8f4abff2db64a217e2807..36c39ba494179ec9a7f4160a84fb31cae34c4c73 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/ChinaPropIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/ChinaPropIT.java @@ -49,7 +49,7 @@ public class ChinaPropIT extends BaseConf { /** * @since version3.4.6 */ - @Test(expected = org.apache.rocketmq.client.exception.MQBrokerException.class) + @Test(expected = org.apache.rocketmq.common.exception.MQBrokerException.class) public void testSend20kChinaPropMsg() throws Exception { Message msg = MessageFactory.getRandomMessage(topic); msg.putUserProperty("key", RandomUtils.getCheseWord(32 * 1024 + 1)); diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageExceptionIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageExceptionIT.java index 716ac511afa599e27ea83d94aa33b235ec11e451..efae103c2f1f883b8bb947637ec2d71193a6c654 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageExceptionIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageExceptionIT.java @@ -59,45 +59,45 @@ public class MessageExceptionIT extends BaseConf { assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK); } - @Test(expected = org.apache.rocketmq.client.exception.MQClientException.class) + @Test(expected = org.apache.rocketmq.common.exception.MQClientException.class) public void testSynSendNullMessage() throws Exception { producer.send((Message) null); } - @Test(expected = org.apache.rocketmq.client.exception.MQClientException.class) + @Test(expected = org.apache.rocketmq.common.exception.MQClientException.class) public void testSynSendNullBodyMessage() throws Exception { Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes()); msg.setBody(null); producer.send(msg); } - @Test(expected = org.apache.rocketmq.client.exception.MQClientException.class) + @Test(expected = org.apache.rocketmq.common.exception.MQClientException.class) public void testSynSendZeroSizeBodyMessage() throws Exception { Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes()); msg.setBody(new byte[0]); producer.send(msg); } - @Test(expected = org.apache.rocketmq.client.exception.MQClientException.class) + @Test(expected = org.apache.rocketmq.common.exception.MQClientException.class) public void testSynSendOutOfSizeBodyMessage() throws Exception { Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes()); msg.setBody(new byte[1024 * 1024 * 4 + 1]); producer.send(msg); } - @Test(expected = org.apache.rocketmq.client.exception.MQClientException.class) + @Test(expected = org.apache.rocketmq.common.exception.MQClientException.class) public void testSynSendNullTopicMessage() throws Exception { Message msg = new Message(null, RandomUtils.getStringByUUID().getBytes()); producer.send(msg); } - @Test(expected = org.apache.rocketmq.client.exception.MQClientException.class) + @Test(expected = org.apache.rocketmq.common.exception.MQClientException.class) public void testSynSendBlankTopicMessage() throws Exception { Message msg = new Message("", RandomUtils.getStringByUUID().getBytes()); producer.send(msg); } - @Test(expected = org.apache.rocketmq.client.exception.MQClientException.class) + @Test(expected = org.apache.rocketmq.common.exception.MQClientException.class) public void testSend128kMsg() throws Exception { Message msg = new Message(topic, RandomUtils.getStringWithNumber(1024 * 1024 * 4 + 1).getBytes()); diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendExceptionIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendExceptionIT.java index 1113689bed2f801ee5af8c3845ed5a5f0d6aa2bf..666d312c094f277c8be94f58e5e20eea6e49d0ad 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendExceptionIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendExceptionIT.java @@ -55,7 +55,7 @@ public class OneWaySendExceptionIT extends BaseConf { producer.sendOneway(msg, messageQueue); } - @Test(expected = org.apache.rocketmq.client.exception.MQClientException.class) + @Test(expected = org.apache.rocketmq.common.exception.MQClientException.class) public void testSendSelectorNull() throws Exception { Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes()); DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr); @@ -63,7 +63,7 @@ public class OneWaySendExceptionIT extends BaseConf { producer.sendOneway(msg, selector, 100); } - @Test(expected = org.apache.rocketmq.client.exception.MQClientException.class) + @Test(expected = org.apache.rocketmq.common.exception.MQClientException.class) public void testSelectorThrowsException() throws Exception { Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes()); DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr);