From 3629a0a7171a2aad381f715e6250feec1ee3b779 Mon Sep 17 00:00:00 2001 From: chengxiangwang Date: Thu, 28 Feb 2019 15:54:19 +0800 Subject: [PATCH] add new module rocketmq-mqtt --- .../apache/rocketmq/common/MqttConfig.java | 10 ++ .../rocketmq/common}/client/Client.java | 3 +- .../common}/client/ClientManager.java | 2 +- .../common/client}/ClientManagerImpl.java | 10 +- .../rocketmq/common/client}/ClientRole.java | 19 +++- .../rocketmq/common/client}/Subscription.java | 4 +- .../rocketmq/common/constant/LoggerName.java | 1 + distribution/conf/snode.conf | 2 +- distribution/release.xml | 1 + .../example/mqtt/MqttSampleConsumer.java | 6 +- mqtt/pom.xml | 103 ++++++++++++++++++ .../mqtt/client}/IOTClientManagerImpl.java | 21 ++-- .../client/MqttClientHousekeepingService.java | 98 +++++++++++++++++ .../rocketmq/mqtt}/constant/MqttConstant.java | 7 +- .../mqtt}/exception/MqttConnectException.java | 2 +- .../exception/WrongMessageTypeException.java | 2 +- .../mqtt}/mqtthandler/MessageHandler.java | 2 +- .../impl}/MqttConnectMessageHandler.java | 42 +++---- .../impl}/MqttDisconnectMessageHandler.java | 22 ++-- .../impl}/MqttMessageForwarder.java | 21 ++-- .../mqtthandler/impl}/MqttMessageSender.java | 17 ++- .../impl}/MqttPingreqMessageHandler.java | 25 ++--- .../impl}/MqttPubackMessageHandler.java | 24 ++-- .../impl}/MqttPubcompMessageHandler.java | 18 ++- .../impl}/MqttPublishMessageHandler.java | 21 ++-- .../impl}/MqttPubrecMessageHandler.java | 18 ++- .../impl}/MqttPubrelMessageHandler.java | 17 ++- .../impl}/MqttSubscribeMessageHandler.java | 29 ++--- .../impl}/MqttUnsubscribeMessagHandler.java | 28 ++--- .../DefaultMqttMessageProcessor.java | 92 +++++++++++----- .../mqtt}/service/WillMessageService.java | 2 +- .../service/impl/MqttPushServiceImpl.java | 32 +++--- .../service/impl/WillMessageServiceImpl.java | 9 +- .../apache/rocketmq/mqtt}/util/MqttUtil.java | 4 +- .../DefaultMqttMessageProcessorTest.java | 15 ++- .../mqtt}/MqttConnectMessageHandlerTest.java | 13 ++- .../MqttDisconnectMessageHandlerTest.java | 22 ++-- .../mqtt}/WillMessageServiceImplTest.java | 17 +-- pom.xml | 6 + snode/pom.xml | 4 + .../rocketmq/snode/SnodeController.java | 45 ++++---- .../client/ClientHousekeepingService.java | 10 +- .../snode/client/SubscriptionManager.java | 2 +- .../client/impl/ConsumerManagerImpl.java | 1 + .../client/impl/ProducerManagerImpl.java | 1 + .../client/impl/SubscriptionManagerImpl.java | 1 + .../snode/constant/SnodeConstant.java | 4 +- .../snode/processor/HeartbeatProcessor.java | 4 +- .../snode/processor/PullMessageProcessor.java | 2 +- .../snode/service/impl/PushServiceImpl.java | 4 +- 50 files changed, 580 insertions(+), 285 deletions(-) rename {snode/src/main/java/org/apache/rocketmq/snode => common/src/main/java/org/apache/rocketmq/common}/client/Client.java (98%) rename {snode/src/main/java/org/apache/rocketmq/snode => common/src/main/java/org/apache/rocketmq/common}/client/ClientManager.java (96%) rename {snode/src/main/java/org/apache/rocketmq/snode/client/impl => common/src/main/java/org/apache/rocketmq/common/client}/ClientManagerImpl.java (96%) rename {snode/src/main/java/org/apache/rocketmq/snode/client/impl => common/src/main/java/org/apache/rocketmq/common/client}/ClientRole.java (50%) rename {snode/src/main/java/org/apache/rocketmq/snode/client/impl => common/src/main/java/org/apache/rocketmq/common/client}/Subscription.java (96%) create mode 100644 mqtt/pom.xml rename {snode/src/main/java/org/apache/rocketmq/snode/client/impl => mqtt/src/main/java/org/apache/rocketmq/mqtt/client}/IOTClientManagerImpl.java (90%) create mode 100644 mqtt/src/main/java/org/apache/rocketmq/mqtt/client/MqttClientHousekeepingService.java rename {snode/src/main/java/org/apache/rocketmq/snode => mqtt/src/main/java/org/apache/rocketmq/mqtt}/constant/MqttConstant.java (78%) rename {snode/src/main/java/org/apache/rocketmq/snode => mqtt/src/main/java/org/apache/rocketmq/mqtt}/exception/MqttConnectException.java (95%) rename {snode/src/main/java/org/apache/rocketmq/snode => mqtt/src/main/java/org/apache/rocketmq/mqtt}/exception/WrongMessageTypeException.java (95%) rename {snode/src/main/java/org/apache/rocketmq/snode/processor => mqtt/src/main/java/org/apache/rocketmq/mqtt}/mqtthandler/MessageHandler.java (95%) rename {snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler => mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl}/MqttConnectMessageHandler.java (86%) rename {snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler => mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl}/MqttDisconnectMessageHandler.java (78%) rename {snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler => mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl}/MqttMessageForwarder.java (66%) rename {snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler => mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl}/MqttMessageSender.java (66%) rename {snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler => mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl}/MqttPingreqMessageHandler.java (58%) rename {snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler => mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl}/MqttPubackMessageHandler.java (59%) rename {snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler => mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl}/MqttPubcompMessageHandler.java (65%) rename {snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler => mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl}/MqttPublishMessageHandler.java (84%) rename {snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler => mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl}/MqttPubrecMessageHandler.java (65%) rename {snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler => mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl}/MqttPubrelMessageHandler.java (65%) rename {snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler => mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl}/MqttSubscribeMessageHandler.java (90%) rename {snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler => mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl}/MqttUnsubscribeMessagHandler.java (86%) rename {snode/src/main/java/org/apache/rocketmq/snode => mqtt/src/main/java/org/apache/rocketmq/mqtt}/processor/DefaultMqttMessageProcessor.java (64%) rename {snode/src/main/java/org/apache/rocketmq/snode => mqtt/src/main/java/org/apache/rocketmq/mqtt}/service/WillMessageService.java (96%) rename {snode/src/main/java/org/apache/rocketmq/snode => mqtt/src/main/java/org/apache/rocketmq/mqtt}/service/impl/MqttPushServiceImpl.java (86%) rename {snode/src/main/java/org/apache/rocketmq/snode => mqtt/src/main/java/org/apache/rocketmq/mqtt}/service/impl/WillMessageServiceImpl.java (81%) rename {snode/src/main/java/org/apache/rocketmq/snode => mqtt/src/main/java/org/apache/rocketmq/mqtt}/util/MqttUtil.java (96%) rename {snode/src/test/java/org/apache/rocketmq/snode/processor => mqtt/src/test/java/org/apache/rocketmq/mqtt}/DefaultMqttMessageProcessorTest.java (92%) rename {snode/src/test/java/org/apache/rocketmq/snode/processor => mqtt/src/test/java/org/apache/rocketmq/mqtt}/MqttConnectMessageHandlerTest.java (85%) rename {snode/src/test/java/org/apache/rocketmq/snode/processor => mqtt/src/test/java/org/apache/rocketmq/mqtt}/MqttDisconnectMessageHandlerTest.java (73%) rename {snode/src/test/java/org/apache/rocketmq/snode/service => mqtt/src/test/java/org/apache/rocketmq/mqtt}/WillMessageServiceImplTest.java (71%) diff --git a/common/src/main/java/org/apache/rocketmq/common/MqttConfig.java b/common/src/main/java/org/apache/rocketmq/common/MqttConfig.java index fc9128d1..2966a3ba 100644 --- a/common/src/main/java/org/apache/rocketmq/common/MqttConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/MqttConfig.java @@ -50,6 +50,8 @@ public class MqttConfig { @ImportantField private boolean aclEnable = false; + private long houseKeepingInterval = 10 * 1000; + public int getListenPort() { return listenPort; } @@ -130,4 +132,12 @@ public class MqttConfig { public void setPushMqttMessageThreadPoolQueueCapacity(int pushMqttMessageThreadPoolQueueCapacity) { this.pushMqttMessageThreadPoolQueueCapacity = pushMqttMessageThreadPoolQueueCapacity; } + + public long getHouseKeepingInterval() { + return houseKeepingInterval; + } + + public void setHouseKeepingInterval(long houseKeepingInterval) { + this.houseKeepingInterval = houseKeepingInterval; + } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/Client.java b/common/src/main/java/org/apache/rocketmq/common/client/Client.java similarity index 98% rename from snode/src/main/java/org/apache/rocketmq/snode/client/Client.java rename to common/src/main/java/org/apache/rocketmq/common/client/Client.java index abaf30df..1719b6bc 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/Client.java +++ b/common/src/main/java/org/apache/rocketmq/common/client/Client.java @@ -14,13 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.snode.client; +package org.apache.rocketmq.common.client; import java.util.Objects; import java.util.Set; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.serialize.LanguageCode; -import org.apache.rocketmq.snode.client.impl.ClientRole; public class Client { diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientManager.java b/common/src/main/java/org/apache/rocketmq/common/client/ClientManager.java similarity index 96% rename from snode/src/main/java/org/apache/rocketmq/snode/client/ClientManager.java rename to common/src/main/java/org/apache/rocketmq/common/client/ClientManager.java index 600dd16e..c42244b5 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientManager.java +++ b/common/src/main/java/org/apache/rocketmq/common/client/ClientManager.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.snode.client; +package org.apache.rocketmq.common.client; import java.util.List; import java.util.Set; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientManagerImpl.java b/common/src/main/java/org/apache/rocketmq/common/client/ClientManagerImpl.java similarity index 96% rename from snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientManagerImpl.java rename to common/src/main/java/org/apache/rocketmq/common/client/ClientManagerImpl.java index d0cddcec..e39ca701 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientManagerImpl.java +++ b/common/src/main/java/org/apache/rocketmq/common/client/ClientManagerImpl.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.snode.client.impl; +package org.apache.rocketmq.common.client; import java.util.ArrayList; import java.util.Iterator; @@ -33,8 +33,6 @@ import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl; import org.apache.rocketmq.remoting.netty.NettyChannelImpl; -import org.apache.rocketmq.snode.client.Client; -import org.apache.rocketmq.snode.client.ClientManager; public abstract class ClientManagerImpl implements ClientManager { @@ -45,7 +43,7 @@ public abstract class ClientManagerImpl implements ClientManager { .newSingleThreadScheduledExecutor( new ThreadFactoryImpl("ClientHousekeepingScheduledThread")); - private final ConcurrentHashMap> groupClientTable = new ConcurrentHashMap<>( + private final ConcurrentHashMap> groupClientTable = new ConcurrentHashMap>( 1024); public abstract void onClosed(String group, RemotingChannel remotingChannel); @@ -176,7 +174,7 @@ public abstract class ClientManagerImpl implements ClientManager { public List getChannels(String groupId) { if (groupId != null) { - List result = new ArrayList<>(); + List result = new ArrayList(); ConcurrentHashMap channelsMap = this.groupClientTable.get(groupId); if (channelsMap != null) { result.addAll(this.groupClientTable.get(groupId).keySet()); @@ -189,7 +187,7 @@ public abstract class ClientManagerImpl implements ClientManager { @Override public List getAllClientId(String groupId) { - List result = new ArrayList<>(); + List result = new ArrayList(); Map channelClientMap = this.groupClientTable.get(groupId); if (channelClientMap != null) { Iterator> it = channelClientMap.entrySet() diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientRole.java b/common/src/main/java/org/apache/rocketmq/common/client/ClientRole.java similarity index 50% rename from snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientRole.java rename to common/src/main/java/org/apache/rocketmq/common/client/ClientRole.java index 9967976d..5abe69d5 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientRole.java +++ b/common/src/main/java/org/apache/rocketmq/common/client/ClientRole.java @@ -1,4 +1,21 @@ -package org.apache.rocketmq.snode.client.impl;/* +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.client;/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/Subscription.java b/common/src/main/java/org/apache/rocketmq/common/client/Subscription.java similarity index 96% rename from snode/src/main/java/org/apache/rocketmq/snode/client/impl/Subscription.java rename to common/src/main/java/org/apache/rocketmq/common/client/Subscription.java index 895d4c9a..3c524b34 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/Subscription.java +++ b/common/src/main/java/org/apache/rocketmq/common/client/Subscription.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.snode.client.impl; +package org.apache.rocketmq.common.client; import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; @@ -27,7 +27,7 @@ public class Subscription { private volatile MessageModel messageModel; private volatile ConsumeFromWhere consumeFromWhere; private volatile boolean cleanSession; - ConcurrentHashMap subscriptionTable = new ConcurrentHashMap<>(); + ConcurrentHashMap subscriptionTable = new ConcurrentHashMap(); private volatile long lastUpdateTimestamp = System.currentTimeMillis(); public SubscriptionData getSubscriptionData(String topic) { diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java index 48295a38..637e112d 100644 --- a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java +++ b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java @@ -38,4 +38,5 @@ public class LoggerName { public static final String WATER_MARK_LOGGER_NAME = "RocketmqWaterMark"; public static final String FILTER_LOGGER_NAME = "RocketmqFilter"; public static final String SNODE_LOGGER_NAME = "RocketmqSnode"; + public static final String MQTT_LOGGER_NAME = "RocketmqMQTT"; } diff --git a/distribution/conf/snode.conf b/distribution/conf/snode.conf index 649057d9..c710b8c6 100644 --- a/distribution/conf/snode.conf +++ b/distribution/conf/snode.conf @@ -15,4 +15,4 @@ namesrvAddr=localhost:9876 clusterName = DefaultCluster snodeName = snode-a - +embeddedModeEnable = false diff --git a/distribution/release.xml b/distribution/release.xml index 181a99ec..60ca3319 100644 --- a/distribution/release.xml +++ b/distribution/release.xml @@ -67,6 +67,7 @@ org.apache.rocketmq:rocketmq-namesrv org.apache.rocketmq:rocketmq-example org.apache.rocketmq:rocketmq-openmessaging + org.apache.rocketmq:rocketmq-mqtt org.apache.rocketmq:rocketmq-snode diff --git a/example/src/main/java/org/apache/rocketmq/example/mqtt/MqttSampleConsumer.java b/example/src/main/java/org/apache/rocketmq/example/mqtt/MqttSampleConsumer.java index 8c52a8f1..b2c6bb3e 100644 --- a/example/src/main/java/org/apache/rocketmq/example/mqtt/MqttSampleConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/mqtt/MqttSampleConsumer.java @@ -50,17 +50,17 @@ public class MqttSampleConsumer { log.info("Connected"); sampleClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable throwable) { - System.out.println("connection lost." + throwable.getLocalizedMessage()); + log.info("connection lost." + throwable.getLocalizedMessage()); } @Override public void messageArrived(String s, MqttMessage message) throws Exception { - System.out.println(message.toString()); + log.info(message.toString()); // System.exit(0); } @Override public void deliveryComplete(IMqttDeliveryToken token) { try { - System.out.println("delivery complete." + token.getMessage().toString()); + log.info("delivery complete." + token.getMessage().toString()); } catch (MqttException e) { e.printStackTrace(); } diff --git a/mqtt/pom.xml b/mqtt/pom.xml new file mode 100644 index 00000000..01b19b82 --- /dev/null +++ b/mqtt/pom.xml @@ -0,0 +1,103 @@ + + + + 4.0.0 + + org.apache.rocketmq + rocketmq-all + 4.4.1-SNAPSHOT + + rocketmq-mqtt + rocketmq-mqtt ${project.version} + + http://maven.apache.org + + UTF-8 + + + + ${project.groupId} + rocketmq-common + + + ${project.groupId} + rocketmq-store + + + ${project.groupId} + rocketmq-remoting + + + ${project.groupId} + rocketmq-client + + + ${project.groupId} + rocketmq-srvutil + + + ${project.groupId} + rocketmq-filter + + + ${project.groupId} + rocketmq-acl + + + ch.qos.logback + logback-classic + + + ch.qos.logback + logback-core + + + com.alibaba + fastjson + + + org.javassist + javassist + + + org.slf4j + slf4j-api + + + org.yaml + snakeyaml + + + io.prometheus + simpleclient + + + io.prometheus + simpleclient_httpserver + + + io.prometheus + simpleclient_hotspot + + + ${project.groupId} + rocketmq-broker + + + diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/IOTClientManagerImpl.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/IOTClientManagerImpl.java similarity index 90% rename from snode/src/main/java/org/apache/rocketmq/snode/client/impl/IOTClientManagerImpl.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/client/IOTClientManagerImpl.java index 66ec2f7e..5f56d16b 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/IOTClientManagerImpl.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/IOTClientManagerImpl.java @@ -14,34 +14,33 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.snode.client.impl; +package org.apache.rocketmq.mqtt.client; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import org.apache.rocketmq.common.client.Client; +import org.apache.rocketmq.common.client.ClientManagerImpl; +import org.apache.rocketmq.common.client.Subscription; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.RemotingChannel; -import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.client.Client; public class IOTClientManagerImpl extends ClientManagerImpl { - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); public static final String IOT_GROUP = "IOT_GROUP"; - private final SnodeController snodeController; private final ConcurrentHashMap>> topic2SubscriptionTable = new ConcurrentHashMap<>( 1024); private final ConcurrentHashMap clientId2Subscription = new ConcurrentHashMap<>(1024); - public IOTClientManagerImpl(SnodeController snodeController) { - this.snodeController = snodeController; + public IOTClientManagerImpl() { } public void onUnsubscribe(Client client, List topics) { @@ -89,6 +88,9 @@ public class IOTClientManagerImpl extends ClientManagerImpl { } iterator1.remove(); } + if (next.getValue() == null || next.getValue().size() == 0) { + iterator.remove(); + } } //remove offline messages } @@ -97,10 +99,6 @@ public class IOTClientManagerImpl extends ClientManagerImpl { return clientId2Subscription.get(clientId); } - public SnodeController getSnodeController() { - return snodeController; - } - public ConcurrentHashMap>> getTopic2SubscriptionTable() { return topic2SubscriptionTable; } @@ -112,5 +110,4 @@ public class IOTClientManagerImpl extends ClientManagerImpl { public void initSubscription(String clientId, Subscription subscription) { clientId2Subscription.put(clientId, subscription); } - } diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/MqttClientHousekeepingService.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/MqttClientHousekeepingService.java new file mode 100644 index 00000000..fd2fe0f4 --- /dev/null +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/MqttClientHousekeepingService.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.mqtt.client; + +import io.netty.channel.Channel; +import io.netty.util.Attribute; +import org.apache.rocketmq.common.client.Client; +import org.apache.rocketmq.common.client.ClientManager; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.mqtt.constant.MqttConstant; +import org.apache.rocketmq.remoting.ChannelEventListener; +import org.apache.rocketmq.remoting.RemotingChannel; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.netty.NettyChannelImpl; + +public class MqttClientHousekeepingService implements ChannelEventListener { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); + private final ClientManager iotClientManager; + + public MqttClientHousekeepingService(final ClientManager iotClientManager) { + this.iotClientManager = iotClientManager; + } + + public void start(long interval) { + this.iotClientManager.startScan(interval); + } + + public void shutdown() { + this.iotClientManager.shutdown(); + } + + private Client getClient(RemotingChannel remotingChannel) { + if (remotingChannel instanceof NettyChannelImpl) { + Channel channel = ((NettyChannelImpl) remotingChannel).getChannel(); + Attribute clientAttribute = channel.attr(MqttConstant.MQTT_CLIENT_ATTRIBUTE_KEY); + if (clientAttribute != null) { + Client client = clientAttribute.get(); + return client; + } + } + log.warn("RemotingChannel type error: {}", remotingChannel.getClass()); + return null; + } + + private void closeChannel(String remoteAddress, RemotingChannel remotingChannel) { + Client client = getClient(remotingChannel); + if (client != null) { + switch (client.getClientRole()) { + case IOTCLIENT: + this.iotClientManager.onClose(client.getGroups(), remotingChannel); + return; + default: + } + } + log.warn("Close channel without any role"); + } + + @Override + public void onChannelConnect(String remoteAddr, RemotingChannel channel) { + log.info("Remoting channel connected: {}", RemotingHelper.parseChannelRemoteAddr(channel.remoteAddress())); + + } + + @Override + public void onChannelClose(String remoteAddr, RemotingChannel remotingChannel) { + log.info("Remoting channel closed: {}", RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress())); + closeChannel(remoteAddr, remotingChannel); + } + + @Override + public void onChannelException(String remoteAddr, RemotingChannel remotingChannel) { + log.info("Remoting channel exception: {}", RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress())); + closeChannel(remoteAddr, remotingChannel); + + } + + @Override + public void onChannelIdle(String remoteAddr, RemotingChannel remotingChannel) { + log.info("Remoting channel idle: {}", RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress())); + closeChannel(remoteAddr, remotingChannel); + } +} diff --git a/snode/src/main/java/org/apache/rocketmq/snode/constant/MqttConstant.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/constant/MqttConstant.java similarity index 78% rename from snode/src/main/java/org/apache/rocketmq/snode/constant/MqttConstant.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/constant/MqttConstant.java index 2d2ecaa2..9e803a39 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/constant/MqttConstant.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/constant/MqttConstant.java @@ -15,11 +15,16 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.constant; +package org.apache.rocketmq.mqtt.constant; + +import io.netty.util.AttributeKey; +import org.apache.rocketmq.common.client.Client; public class MqttConstant { public static final int MAX_SUPPORTED_QOS = 0; public static final String SUBSCRIPTION_FLAG_PLUS = "+"; public static final String SUBSCRIPTION_FLAG_SHARP = "#"; public static final String SUBSCRIPTION_SEPARATOR = "/"; + public static final long DEFAULT_TIMEOUT_MILLS = 3000L; + public static final AttributeKey MQTT_CLIENT_ATTRIBUTE_KEY = AttributeKey.valueOf("mqtt.client"); } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/exception/MqttConnectException.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/exception/MqttConnectException.java similarity index 95% rename from snode/src/main/java/org/apache/rocketmq/snode/exception/MqttConnectException.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/exception/MqttConnectException.java index f636842f..93eb7333 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/exception/MqttConnectException.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/exception/MqttConnectException.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.exception; +package org.apache.rocketmq.mqtt.exception; public class MqttConnectException extends RuntimeException { diff --git a/snode/src/main/java/org/apache/rocketmq/snode/exception/WrongMessageTypeException.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/exception/WrongMessageTypeException.java similarity index 95% rename from snode/src/main/java/org/apache/rocketmq/snode/exception/WrongMessageTypeException.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/exception/WrongMessageTypeException.java index 355c7f77..d008d78d 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/exception/WrongMessageTypeException.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/exception/WrongMessageTypeException.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.exception; +package org.apache.rocketmq.mqtt.exception; public class WrongMessageTypeException extends RuntimeException { diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/MessageHandler.java similarity index 95% rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MessageHandler.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/MessageHandler.java index 24153165..fa0be8bc 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MessageHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/MessageHandler.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.processor.mqtthandler; +package org.apache.rocketmq.mqtt.mqtthandler; import io.netty.handler.codec.mqtt.MqttMessage; import org.apache.rocketmq.remoting.RemotingChannel; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttConnectMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttConnectMessageHandler.java similarity index 86% rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttConnectMessageHandler.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttConnectMessageHandler.java index 96f2843b..ecbb1247 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttConnectMessageHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttConnectMessageHandler.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.processor.mqtthandler; +package org.apache.rocketmq.mqtt.mqtthandler.impl; import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttConnectPayload; @@ -25,32 +25,32 @@ import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttQoS; import java.util.HashSet; import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.client.Client; +import org.apache.rocketmq.common.client.ClientManager; +import org.apache.rocketmq.common.client.ClientRole; +import org.apache.rocketmq.common.client.Subscription; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.mqtt.WillMessage; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl; +import org.apache.rocketmq.mqtt.exception.MqttConnectException; +import org.apache.rocketmq.mqtt.exception.WrongMessageTypeException; +import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; -import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.client.Client; -import org.apache.rocketmq.snode.client.ClientManager; -import org.apache.rocketmq.snode.client.impl.ClientRole; -import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl; -import org.apache.rocketmq.snode.client.impl.Subscription; -import org.apache.rocketmq.snode.exception.MqttConnectException; -import org.apache.rocketmq.snode.exception.WrongMessageTypeException; public class MqttConnectMessageHandler implements MessageHandler { - - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); - private final SnodeController snodeController; + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); private static final int MIN_AVAILABLE_VERSION = 3; private static final int MAX_AVAILABLE_VERSION = 4; + private final DefaultMqttMessageProcessor defaultMqttMessageProcessor; - public MqttConnectMessageHandler(SnodeController snodeController) { - this.snodeController = snodeController; + public MqttConnectMessageHandler(DefaultMqttMessageProcessor defaultMqttMessageProcessor) { + this.defaultMqttMessageProcessor = defaultMqttMessageProcessor; } @Override @@ -97,7 +97,7 @@ public class MqttConnectMessageHandler implements MessageHandler { remotingChannel.close(); return null; } - IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) snodeController.getIotClientManager(); + IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) defaultMqttMessageProcessor.getIotClientManager(); //set Session Present according to whether the server has already stored Session State for the clientId if (mqttConnectMessage.variableHeader().isCleanSession()) { mqttHeader.setSessionPresent(false); @@ -120,7 +120,11 @@ public class MqttConnectMessageHandler implements MessageHandler { Client client = new Client(); client.setClientId(payload.clientIdentifier()); client.setClientRole(ClientRole.IOTCLIENT); - client.setGroups(new HashSet(){{add("IOT_GROUP");}}); + client.setGroups(new HashSet() { + { + add("IOT_GROUP"); + } + }); client.setConnected(true); client.setRemotingChannel(remotingChannel); client.setLastUpdateTimestamp(System.currentTimeMillis()); @@ -138,7 +142,7 @@ public class MqttConnectMessageHandler implements MessageHandler { willMessage.setWillTopic(payload.willTopic()); willMessage.setRetain(mqttConnectMessage.variableHeader().isWillRetain()); willMessage.setBody(payload.willMessageInBytes()); - snodeController.getWillMessageService().saveWillMessage(client.getClientId(), willMessage); + defaultMqttMessageProcessor.getWillMessageService().saveWillMessage(client.getClientId(), willMessage); } mqttHeader.setConnectReturnCode(MqttConnectReturnCode.CONNECTION_ACCEPTED.name()); @@ -148,7 +152,7 @@ public class MqttConnectMessageHandler implements MessageHandler { } private boolean alreadyStoredSession(String clientId) { - IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) snodeController.getIotClientManager(); + IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) defaultMqttMessageProcessor.getIotClientManager(); Subscription subscription = iotClientManager.getSubscriptionByClientId(clientId); if (subscription == null) { return false; @@ -164,7 +168,7 @@ public class MqttConnectMessageHandler implements MessageHandler { } private boolean isConnected(RemotingChannel remotingChannel, String clientId) { - ClientManager iotClientManager = snodeController.getIotClientManager(); + ClientManager iotClientManager = defaultMqttMessageProcessor.getIotClientManager(); Client client = iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel); if (client != null && client.getClientId().equals(clientId) && client.isConnected()) { return true; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttDisconnectMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttDisconnectMessageHandler.java similarity index 78% rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttDisconnectMessageHandler.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttDisconnectMessageHandler.java index 66affac4..5f21a4b2 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttDisconnectMessageHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttDisconnectMessageHandler.java @@ -15,27 +15,27 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.processor.mqtthandler; +package org.apache.rocketmq.mqtt.mqtthandler.impl; import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttQoS; +import org.apache.rocketmq.common.client.Client; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl; +import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.client.Client; -import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl; public class MqttDisconnectMessageHandler implements MessageHandler { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); + private final DefaultMqttMessageProcessor defaultMqttMessageProcessor; - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); - private final SnodeController snodeController; - - public MqttDisconnectMessageHandler(SnodeController snodeController) { - this.snodeController = snodeController; + public MqttDisconnectMessageHandler(DefaultMqttMessageProcessor processor) { + this.defaultMqttMessageProcessor = processor; } /** @@ -56,10 +56,10 @@ public class MqttDisconnectMessageHandler implements MessageHandler { } //discard will message associated with the current connection(client) - Client client = snodeController.getIotClientManager() + Client client = defaultMqttMessageProcessor.getIotClientManager() .getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel); if (client != null) { - snodeController.getWillMessageService().deleteWillMessage(client.getClientId()); + defaultMqttMessageProcessor.getWillMessageService().deleteWillMessage(client.getClientId()); } client.setConnected(false); if (remotingChannel.isActive()) { diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttMessageForwarder.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttMessageForwarder.java similarity index 66% rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttMessageForwarder.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttMessageForwarder.java index eaba1a4a..7cc9d029 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttMessageForwarder.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttMessageForwarder.java @@ -15,24 +15,23 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.processor.mqtthandler; +package org.apache.rocketmq.mqtt.mqtthandler.impl; import io.netty.handler.codec.mqtt.MqttMessage; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.apache.rocketmq.snode.SnodeController; public class MqttMessageForwarder implements MessageHandler { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); + private final DefaultMqttMessageProcessor defaultMqttMessageProcessor; - private final SnodeController snodeController; -/* private SubscriptionStore subscriptionStore; - - public MqttMessageForwarder(SubscriptionStore subscriptionStore) { - this.subscriptionStore = subscriptionStore; - }*/ - - public MqttMessageForwarder(SnodeController snodeController) { - this.snodeController = snodeController; + public MqttMessageForwarder(DefaultMqttMessageProcessor processor) { + this.defaultMqttMessageProcessor = processor; } /** diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttMessageSender.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttMessageSender.java similarity index 66% rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttMessageSender.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttMessageSender.java index 624a4bf3..e0e2ddff 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttMessageSender.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttMessageSender.java @@ -15,20 +15,25 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.processor.mqtthandler; +package org.apache.rocketmq.mqtt.mqtthandler.impl; import io.netty.handler.codec.mqtt.MqttMessage; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.apache.rocketmq.snode.SnodeController; public class MqttMessageSender implements MessageHandler { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); + private final DefaultMqttMessageProcessor defaultMqttMessageProcessor; - private final SnodeController snodeController; - - public MqttMessageSender(SnodeController snodeController) { - this.snodeController = snodeController; + public MqttMessageSender(DefaultMqttMessageProcessor processor) { + this.defaultMqttMessageProcessor = processor; } + /** * send the PUBLISH message to client * diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPingreqMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPingreqMessageHandler.java similarity index 58% rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPingreqMessageHandler.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPingreqMessageHandler.java index 1f75fcc6..d867476f 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPingreqMessageHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPingreqMessageHandler.java @@ -15,29 +15,28 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.processor.mqtthandler; +package org.apache.rocketmq.mqtt.mqtthandler.impl; import io.netty.handler.codec.mqtt.MqttMessage; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.apache.rocketmq.snode.SnodeController; public class MqttPingreqMessageHandler implements MessageHandler { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); + private final DefaultMqttMessageProcessor defaultMqttMessageProcessor; - private final SnodeController snodeController; - - public MqttPingreqMessageHandler(SnodeController snodeController) { - this.snodeController = snodeController; + public MqttPingreqMessageHandler(DefaultMqttMessageProcessor processor) { + this.defaultMqttMessageProcessor = processor; } /** - * handle the PINGREQ message from client - *
    - *
  1. check client exists
  2. - *
  3. check client is connected
  4. - *
  5. generate the PINGRESP message
  6. - *
  7. send the PINGRESP message to the client
  8. - *
+ * handle the PINGREQ message from client
  1. check client exists
  2. check client is connected
  3. + *
  4. generate the PINGRESP message
  5. send the PINGRESP message to the client
* * @param message * @return diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPubackMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPubackMessageHandler.java similarity index 59% rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPubackMessageHandler.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPubackMessageHandler.java index 69dee269..71d94589 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPubackMessageHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPubackMessageHandler.java @@ -15,26 +15,30 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.processor.mqtthandler; +package org.apache.rocketmq.mqtt.mqtthandler.impl; import io.netty.handler.codec.mqtt.MqttMessage; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.apache.rocketmq.snode.SnodeController; public class MqttPubackMessageHandler implements MessageHandler { - private final SnodeController snodeController; + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); + private final DefaultMqttMessageProcessor defaultMqttMessageProcessor; - public MqttPubackMessageHandler(SnodeController snodeController) { - this.snodeController = snodeController; + public MqttPubackMessageHandler(DefaultMqttMessageProcessor processor) { + this.defaultMqttMessageProcessor = processor; } + /** - * handle the PUBACK message from the client - *
    - *
  1. remove the message from the published in-flight messages
  2. - *
  3. ack the message in the MessageStore
  4. - *
+ * handle the PUBACK message from the client
  1. remove the message from the published in-flight messages
  2. + *
  3. ack the message in the MessageStore
+ * * @param * @return */ diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPubcompMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPubcompMessageHandler.java similarity index 65% rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPubcompMessageHandler.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPubcompMessageHandler.java index e191f5a4..2b1109a7 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPubcompMessageHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPubcompMessageHandler.java @@ -15,22 +15,28 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.processor.mqtthandler; +package org.apache.rocketmq.mqtt.mqtthandler.impl; import io.netty.handler.codec.mqtt.MqttMessage; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.apache.rocketmq.snode.SnodeController; public class MqttPubcompMessageHandler implements MessageHandler { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); + private final DefaultMqttMessageProcessor defaultMqttMessageProcessor; - private final SnodeController snodeController; - - public MqttPubcompMessageHandler(SnodeController snodeController) { - this.snodeController = snodeController; + public MqttPubcompMessageHandler(DefaultMqttMessageProcessor processor) { + this.defaultMqttMessageProcessor = processor; } + /** * handle the PUBCOMP message from the client + * * @param message * @return */ diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPublishMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPublishMessageHandler.java similarity index 84% rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPublishMessageHandler.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPublishMessageHandler.java index 17776ea0..15552b01 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPublishMessageHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPublishMessageHandler.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.processor.mqtthandler; +package org.apache.rocketmq.mqtt.mqtthandler.impl; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.mqtt.MqttFixedHeader; @@ -27,21 +27,20 @@ import io.netty.handler.codec.mqtt.MqttQoS; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; -import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.exception.WrongMessageTypeException; -import org.apache.rocketmq.snode.util.MqttUtil; +import org.apache.rocketmq.mqtt.exception.WrongMessageTypeException; +import org.apache.rocketmq.mqtt.util.MqttUtil; public class MqttPublishMessageHandler implements MessageHandler { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); + private final DefaultMqttMessageProcessor defaultMqttMessageProcessor; - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); - - private final SnodeController snodeController; - - public MqttPublishMessageHandler(SnodeController snodeController) { - this.snodeController = snodeController; + public MqttPublishMessageHandler(DefaultMqttMessageProcessor processor) { + this.defaultMqttMessageProcessor = processor; } @Override @@ -61,7 +60,7 @@ public class MqttPublishMessageHandler implements MessageHandler { ByteBuf payload = mqttPublishMessage.payload(); if (fixedHeader.qosLevel().equals(MqttQoS.AT_MOST_ONCE)) { - snodeController.getMqttPushService().pushMessageQos0(variableHeader.topicName(), payload); + defaultMqttMessageProcessor.getMqttPushService().pushMessageQos0(variableHeader.topicName(), payload); } else if (fixedHeader.qosLevel().equals(MqttQoS.AT_LEAST_ONCE)) { //Push messages to subscribers and add it to IN-FLIGHT messages } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPubrecMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPubrecMessageHandler.java similarity index 65% rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPubrecMessageHandler.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPubrecMessageHandler.java index edd0d55c..e2f10290 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPubrecMessageHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPubrecMessageHandler.java @@ -15,22 +15,28 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.processor.mqtthandler; +package org.apache.rocketmq.mqtt.mqtthandler.impl; import io.netty.handler.codec.mqtt.MqttMessage; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.apache.rocketmq.snode.SnodeController; public class MqttPubrecMessageHandler implements MessageHandler { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); + private final DefaultMqttMessageProcessor defaultMqttMessageProcessor; - private final SnodeController snodeController; - - public MqttPubrecMessageHandler(SnodeController snodeController) { - this.snodeController = snodeController; + public MqttPubrecMessageHandler(DefaultMqttMessageProcessor processor) { + this.defaultMqttMessageProcessor = processor; } + /** * handle the PUBREC message from the clinet + * * @param message * @return */ diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPubrelMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPubrelMessageHandler.java similarity index 65% rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPubrelMessageHandler.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPubrelMessageHandler.java index c1061e4d..284a460e 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPubrelMessageHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPubrelMessageHandler.java @@ -15,23 +15,28 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.processor.mqtthandler; +package org.apache.rocketmq.mqtt.mqtthandler.impl; import io.netty.handler.codec.mqtt.MqttMessage; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.apache.rocketmq.snode.SnodeController; public class MqttPubrelMessageHandler implements MessageHandler { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); + private final DefaultMqttMessageProcessor defaultMqttMessageProcessor; - private final SnodeController snodeController; - - public MqttPubrelMessageHandler(SnodeController snodeController) { - this.snodeController = snodeController; + public MqttPubrelMessageHandler(DefaultMqttMessageProcessor processor) { + this.defaultMqttMessageProcessor = processor; } /** * handle the PUBREL message from the client + * * @param message * @return */ diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttSubscribeMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttSubscribeMessageHandler.java similarity index 90% rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttSubscribeMessageHandler.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttSubscribeMessageHandler.java index 34f91d36..31cfd0c2 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttSubscribeMessageHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttSubscribeMessageHandler.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.processor.mqtthandler; +package org.apache.rocketmq.mqtt.mqtthandler.impl; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessageType; @@ -29,31 +29,32 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import org.apache.rocketmq.common.client.Client; +import org.apache.rocketmq.common.client.Subscription; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.heartbeat.MqttSubscriptionData; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl; +import org.apache.rocketmq.mqtt.constant.MqttConstant; +import org.apache.rocketmq.mqtt.exception.WrongMessageTypeException; +import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; +import org.apache.rocketmq.mqtt.util.MqttUtil; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil; -import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.client.Client; -import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl; -import org.apache.rocketmq.snode.client.impl.Subscription; -import org.apache.rocketmq.snode.constant.MqttConstant; -import org.apache.rocketmq.snode.exception.WrongMessageTypeException; -import org.apache.rocketmq.snode.util.MqttUtil; public class MqttSubscribeMessageHandler implements MessageHandler { - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); - private final SnodeController snodeController; + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); + private final DefaultMqttMessageProcessor defaultMqttMessageProcessor; - public MqttSubscribeMessageHandler(SnodeController snodeController) { - this.snodeController = snodeController; + public MqttSubscribeMessageHandler(DefaultMqttMessageProcessor processor) { + this.defaultMqttMessageProcessor = processor; } /** @@ -72,7 +73,7 @@ public class MqttSubscribeMessageHandler implements MessageHandler { } MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) message; MqttSubscribePayload payload = mqttSubscribeMessage.payload(); - IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) snodeController.getIotClientManager(); + IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) defaultMqttMessageProcessor.getIotClientManager(); Client client = iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel); if (client == null) { log.error("Can't find associated client, the connection will be closed. remotingChannel={}, MqttMessage={}", remotingChannel.toString(), message.toString()); @@ -135,7 +136,7 @@ public class MqttSubscribeMessageHandler implements MessageHandler { //2.update topic2SubscriptionTable String rootTopic = MqttUtil.getRootTopic(mqttTopicSubscription.topicName()); ConcurrentHashMap> client2SubscriptionData = topic2SubscriptionTable.get(rootTopic); - if (client2SubscriptionData == null) { + if (client2SubscriptionData == null || client2SubscriptionData.size() == 0) { client2SubscriptionData = new ConcurrentHashMap<>(); ConcurrentHashMap> prev = topic2SubscriptionTable.putIfAbsent(rootTopic, client2SubscriptionData); if (prev != null) { diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttUnsubscribeMessagHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttUnsubscribeMessagHandler.java similarity index 86% rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttUnsubscribeMessagHandler.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttUnsubscribeMessagHandler.java index 38f6519d..34c7fdad 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttUnsubscribeMessagHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttUnsubscribeMessagHandler.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.processor.mqtthandler; +package org.apache.rocketmq.mqtt.mqtthandler.impl; import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; @@ -27,31 +27,31 @@ import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import org.apache.rocketmq.common.client.Client; +import org.apache.rocketmq.common.client.Subscription; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl; +import org.apache.rocketmq.mqtt.exception.WrongMessageTypeException; +import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; +import org.apache.rocketmq.mqtt.util.MqttUtil; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; -import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.client.Client; -import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl; -import org.apache.rocketmq.snode.client.impl.Subscription; -import org.apache.rocketmq.snode.exception.WrongMessageTypeException; -import org.apache.rocketmq.snode.util.MqttUtil; /** * handle the UNSUBSCRIBE message from the client */ public class MqttUnsubscribeMessagHandler implements MessageHandler { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); + private final DefaultMqttMessageProcessor defaultMqttMessageProcessor; - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); - private final SnodeController snodeController; - - public MqttUnsubscribeMessagHandler(SnodeController snodeController) { - this.snodeController = snodeController; + public MqttUnsubscribeMessagHandler(DefaultMqttMessageProcessor processor) { + this.defaultMqttMessageProcessor = processor; } @Override @@ -73,7 +73,7 @@ public class MqttUnsubscribeMessagHandler implements MessageHandler { remotingChannel.close(); return null; } - IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) snodeController.getIotClientManager(); + IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) defaultMqttMessageProcessor.getIotClientManager(); Client client = iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel); if (client == null) { log.error("Can't find associated client, the connection will be closed. remotingChannel={}, MqttMessage={}", remotingChannel.toString(), message.toString()); @@ -98,7 +98,7 @@ public class MqttUnsubscribeMessagHandler implements MessageHandler { } private void doUnsubscribe(Client client, List topics, IOTClientManagerImpl iotClientManager) { - ConcurrentHashMap clientId2Subscription = iotClientManager.getClientId2Subscription(); + ConcurrentHashMap clientId2Subscription = iotClientManager.getClientId2Subscription(); ConcurrentHashMap>> topic2SubscriptionTable = iotClientManager.getTopic2SubscriptionTable(); for (String topicFilter : topics) { diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java similarity index 64% rename from snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java index 9eb62c24..563fdac4 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.processor; +package org.apache.rocketmq.mqtt.processor; import io.netty.buffer.Unpooled; import io.netty.handler.codec.mqtt.MqttConnectMessage; @@ -33,55 +33,73 @@ import io.netty.handler.codec.mqtt.MqttSubscribePayload; import java.io.UnsupportedEncodingException; import java.util.HashMap; import java.util.Map; +import org.apache.rocketmq.common.MqttConfig; +import org.apache.rocketmq.common.client.ClientManager; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl; +import org.apache.rocketmq.mqtt.client.MqttClientHousekeepingService; +import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler; +import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttConnectMessageHandler; +import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttDisconnectMessageHandler; +import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPingreqMessageHandler; +import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPubackMessageHandler; +import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPubcompMessageHandler; +import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPublishMessageHandler; +import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPubrecMessageHandler; +import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPubrelMessageHandler; +import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttSubscribeMessageHandler; +import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttUnsubscribeMessagHandler; +import org.apache.rocketmq.mqtt.service.WillMessageService; +import org.apache.rocketmq.mqtt.service.impl.MqttPushServiceImpl; +import org.apache.rocketmq.mqtt.service.impl.WillMessageServiceImpl; import org.apache.rocketmq.remoting.RemotingChannel; +import org.apache.rocketmq.remoting.RemotingServer; import org.apache.rocketmq.remoting.RequestProcessor; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil; -import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.processor.mqtthandler.MessageHandler; -import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler; -import org.apache.rocketmq.snode.processor.mqtthandler.MqttDisconnectMessageHandler; -import org.apache.rocketmq.snode.processor.mqtthandler.MqttPingreqMessageHandler; -import org.apache.rocketmq.snode.processor.mqtthandler.MqttPubackMessageHandler; -import org.apache.rocketmq.snode.processor.mqtthandler.MqttPubcompMessageHandler; -import org.apache.rocketmq.snode.processor.mqtthandler.MqttPublishMessageHandler; -import org.apache.rocketmq.snode.processor.mqtthandler.MqttPubrecMessageHandler; -import org.apache.rocketmq.snode.processor.mqtthandler.MqttPubrelMessageHandler; -import org.apache.rocketmq.snode.processor.mqtthandler.MqttSubscribeMessageHandler; -import org.apache.rocketmq.snode.processor.mqtthandler.MqttUnsubscribeMessagHandler; public class DefaultMqttMessageProcessor implements RequestProcessor { - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); private Map type2handler = new HashMap<>(); - private final SnodeController snodeController; private static final int MIN_AVAILABLE_VERSION = 3; private static final int MAX_AVAILABLE_VERSION = 4; + private WillMessageService willMessageService; + private MqttPushServiceImpl mqttPushService; + private ClientManager iotClientManager; + private RemotingServer mqttRemotingServer; + private MqttClientHousekeepingService mqttClientHousekeepingService; + private MqttConfig mqttConfig; + + public DefaultMqttMessageProcessor(MqttConfig mqttConfig, RemotingServer mqttRemotingServer) { + this.willMessageService = new WillMessageServiceImpl(); + this.mqttPushService = new MqttPushServiceImpl(this, mqttConfig); + this.iotClientManager = new IOTClientManagerImpl(); + this.mqttRemotingServer = mqttRemotingServer; + this.mqttClientHousekeepingService = new MqttClientHousekeepingService(iotClientManager); + this.mqttClientHousekeepingService.start(mqttConfig.getHouseKeepingInterval()); - public DefaultMqttMessageProcessor(SnodeController snodeController) { - this.snodeController = snodeController; registerMessageHandler(MqttMessageType.CONNECT, - new MqttConnectMessageHandler(this.snodeController)); + new MqttConnectMessageHandler(this)); registerMessageHandler(MqttMessageType.DISCONNECT, - new MqttDisconnectMessageHandler(this.snodeController)); + new MqttDisconnectMessageHandler(this)); registerMessageHandler(MqttMessageType.PINGREQ, - new MqttPingreqMessageHandler(this.snodeController)); + new MqttPingreqMessageHandler(this)); registerMessageHandler(MqttMessageType.PUBLISH, - new MqttPublishMessageHandler(this.snodeController)); - registerMessageHandler(MqttMessageType.PUBACK, new MqttPubackMessageHandler(this.snodeController)); + new MqttPublishMessageHandler(this)); + registerMessageHandler(MqttMessageType.PUBACK, new MqttPubackMessageHandler(this)); registerMessageHandler(MqttMessageType.PUBCOMP, - new MqttPubcompMessageHandler(this.snodeController)); - registerMessageHandler(MqttMessageType.PUBREC, new MqttPubrecMessageHandler(this.snodeController)); - registerMessageHandler(MqttMessageType.PUBREL, new MqttPubrelMessageHandler(this.snodeController)); + new MqttPubcompMessageHandler(this)); + registerMessageHandler(MqttMessageType.PUBREC, new MqttPubrecMessageHandler(this)); + registerMessageHandler(MqttMessageType.PUBREL, new MqttPubrelMessageHandler(this)); registerMessageHandler(MqttMessageType.SUBSCRIBE, - new MqttSubscribeMessageHandler(this.snodeController)); + new MqttSubscribeMessageHandler(this)); registerMessageHandler(MqttMessageType.UNSUBSCRIBE, - new MqttUnsubscribeMessagHandler(this.snodeController)); + new MqttUnsubscribeMessagHandler(this)); } @Override @@ -127,4 +145,24 @@ public class DefaultMqttMessageProcessor implements RequestProcessor { private void registerMessageHandler(MqttMessageType type, MessageHandler handler) { type2handler.put(type, handler); } + + public WillMessageService getWillMessageService() { + return willMessageService; + } + + public MqttPushServiceImpl getMqttPushService() { + return mqttPushService; + } + + public ClientManager getIotClientManager() { + return iotClientManager; + } + + public MqttConfig getMqttConfig() { + return mqttConfig; + } + + public RemotingServer getMqttRemotingServer() { + return mqttRemotingServer; + } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/WillMessageService.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/service/WillMessageService.java similarity index 96% rename from snode/src/main/java/org/apache/rocketmq/snode/service/WillMessageService.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/service/WillMessageService.java index 516bde5f..f88d4530 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/WillMessageService.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/service/WillMessageService.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.service; +package org.apache.rocketmq.mqtt.service; import org.apache.rocketmq.common.message.mqtt.WillMessage; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/MqttPushServiceImpl.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/service/impl/MqttPushServiceImpl.java similarity index 86% rename from snode/src/main/java/org/apache/rocketmq/snode/service/impl/MqttPushServiceImpl.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/service/impl/MqttPushServiceImpl.java index e8530259..9b9b17e1 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/MqttPushServiceImpl.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/service/impl/MqttPushServiceImpl.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.snode.service.impl; +package org.apache.rocketmq.mqtt.service.impl; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.mqtt.MqttMessageType; @@ -27,37 +27,38 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.rocketmq.common.MqttConfig; +import org.apache.rocketmq.common.client.Client; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl; +import org.apache.rocketmq.mqtt.constant.MqttConstant; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; +import org.apache.rocketmq.mqtt.util.MqttUtil; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl; import org.apache.rocketmq.remoting.netty.NettyChannelImpl; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; -import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.client.Client; -import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl; -import org.apache.rocketmq.snode.constant.SnodeConstant; -import org.apache.rocketmq.snode.util.MqttUtil; public class MqttPushServiceImpl { - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); - private SnodeController snodeController; private ExecutorService pushMqttMessageExecutorService; + private DefaultMqttMessageProcessor defaultMqttMessageProcessor; - public MqttPushServiceImpl(final SnodeController snodeController) { - this.snodeController = snodeController; + public MqttPushServiceImpl(DefaultMqttMessageProcessor defaultMqttMessageProcessor, MqttConfig mqttConfig) { + this.defaultMqttMessageProcessor = defaultMqttMessageProcessor; pushMqttMessageExecutorService = ThreadUtils.newThreadPoolExecutor( - this.snodeController.getMqttConfig().getPushMqttMessageMinPoolSize(), - this.snodeController.getMqttConfig().getPushMqttMessageMaxPoolSize(), + mqttConfig.getPushMqttMessageMinPoolSize(), + mqttConfig.getPushMqttMessageMaxPoolSize(), 3000, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(this.snodeController.getMqttConfig().getPushMqttMessageThreadPoolQueueCapacity()), + new ArrayBlockingQueue<>(mqttConfig.getPushMqttMessageThreadPoolQueueCapacity()), "pushMqttMessageThread", false); } @@ -86,7 +87,7 @@ public class MqttPushServiceImpl { RemotingCommand requestCommand = buildRequestCommand(topic, qos, retain, packetId); //find those clients publishing the message to - IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) snodeController.getIotClientManager(); + IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) defaultMqttMessageProcessor.getIotClientManager(); ConcurrentHashMap>> topic2SubscriptionTable = iotClientManager.getTopic2SubscriptionTable(); Set clients = new HashSet<>(); if (topic2SubscriptionTable.containsKey(MqttUtil.getRootTopic(topic))) { @@ -109,12 +110,11 @@ public class MqttPushServiceImpl { byte[] body = new byte[message.readableBytes()]; message.readBytes(body); requestCommand.setBody(body); - snodeController.getMqttRemotingServer().push(remotingChannel, requestCommand, SnodeConstant.DEFAULT_TIMEOUT_MILLS); + defaultMqttMessageProcessor.getMqttRemotingServer().push(remotingChannel, requestCommand, MqttConstant.DEFAULT_TIMEOUT_MILLS); } } catch (Exception ex) { log.warn("Exception was thrown when pushing MQTT message to topic: {}, exception={}", topic, ex.getMessage()); } finally { - System.out.println("Release Bytebuf"); ReferenceCountUtil.release(message); } } else { diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/WillMessageServiceImpl.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/service/impl/WillMessageServiceImpl.java similarity index 81% rename from snode/src/main/java/org/apache/rocketmq/snode/service/impl/WillMessageServiceImpl.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/service/impl/WillMessageServiceImpl.java index 78b4c1e3..a769d907 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/WillMessageServiceImpl.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/service/impl/WillMessageServiceImpl.java @@ -15,20 +15,17 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.service.impl; +package org.apache.rocketmq.mqtt.service.impl; import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.common.message.mqtt.WillMessage; -import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.service.WillMessageService; +import org.apache.rocketmq.mqtt.service.WillMessageService; public class WillMessageServiceImpl implements WillMessageService { private static ConcurrentHashMap willMessageTable = new ConcurrentHashMap<>(); - private final SnodeController snodeController; - public WillMessageServiceImpl(SnodeController snodeController) { - this.snodeController = snodeController; + public WillMessageServiceImpl() { } @Override diff --git a/snode/src/main/java/org/apache/rocketmq/snode/util/MqttUtil.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/util/MqttUtil.java similarity index 96% rename from snode/src/main/java/org/apache/rocketmq/snode/util/MqttUtil.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/util/MqttUtil.java index ef44a7ac..3eefc48e 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/util/MqttUtil.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/util/MqttUtil.java @@ -15,11 +15,11 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.util; +package org.apache.rocketmq.mqtt.util; import io.netty.handler.codec.mqtt.MqttQoS; import java.util.UUID; -import org.apache.rocketmq.snode.constant.MqttConstant; +import org.apache.rocketmq.mqtt.constant.MqttConstant; public class MqttUtil { diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java b/mqtt/src/test/java/org/apache/rocketmq/mqtt/DefaultMqttMessageProcessorTest.java similarity index 92% rename from snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java rename to mqtt/src/test/java/org/apache/rocketmq/mqtt/DefaultMqttMessageProcessorTest.java index 076a0052..570a8640 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java +++ b/mqtt/src/test/java/org/apache/rocketmq/mqtt/DefaultMqttMessageProcessorTest.java @@ -14,38 +14,37 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.snode.processor; +package org.apache.rocketmq.mqtt; import io.netty.handler.codec.mqtt.MqttConnectPayload; import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttQoS; import java.io.UnsupportedEncodingException; import org.apache.rocketmq.common.MqttConfig; -import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; +import org.apache.rocketmq.remoting.transport.mqtt.MqttRemotingServer; import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil; -import org.apache.rocketmq.snode.SnodeController; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; -import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) public class DefaultMqttMessageProcessorTest { private DefaultMqttMessageProcessor defaultMqttMessageProcessor; - @Spy - private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig()); - @Mock private RemotingChannel remotingChannel; + @Mock + private MqttRemotingServer mqttRemotingServer; + private String topic = "SnodeTopic"; private String group = "SnodeGroup"; @@ -54,7 +53,7 @@ public class DefaultMqttMessageProcessorTest { @Before public void init() { - defaultMqttMessageProcessor = new DefaultMqttMessageProcessor(snodeController); + defaultMqttMessageProcessor = new DefaultMqttMessageProcessor(new MqttConfig(), mqttRemotingServer); } @Test diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java b/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttConnectMessageHandlerTest.java similarity index 85% rename from snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java rename to mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttConnectMessageHandlerTest.java index b0301e7e..0bc29091 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java +++ b/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttConnectMessageHandlerTest.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.snode.processor; +package org.apache.rocketmq.mqtt; import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttConnectPayload; @@ -22,11 +22,9 @@ import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttQoS; -import org.apache.rocketmq.common.MqttConfig; -import org.apache.rocketmq.common.SnodeConfig; +import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttConnectMessageHandler; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.remoting.RemotingChannel; -import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; @@ -38,10 +36,13 @@ public class MqttConnectMessageHandlerTest { @Mock private RemotingChannel remotingChannel; + @Mock + private DefaultMqttMessageProcessor defaultMqttMessageProcessor; + @Test public void testHandlerMessage() throws Exception { - MqttConnectMessageHandler mqttConnectMessageHandler = new MqttConnectMessageHandler(new SnodeController(new SnodeConfig(), new MqttConfig())); + MqttConnectMessageHandler mqttConnectMessageHandler = new MqttConnectMessageHandler(defaultMqttMessageProcessor); MqttConnectPayload payload = new MqttConnectPayload("1234567", "testTopic", "willMessage".getBytes(), null, "1234567".getBytes()); MqttConnectMessage mqttConnectMessage = new MqttConnectMessage(new MqttFixedHeader( MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 200), new MqttConnectVariableHeader(null, 4, false, false, false, 0, false, false, 50), new MqttConnectPayload("abcd", "ttest", "message".getBytes(), "user", "password".getBytes())); diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttDisconnectMessageHandlerTest.java b/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttDisconnectMessageHandlerTest.java similarity index 73% rename from snode/src/test/java/org/apache/rocketmq/snode/processor/MqttDisconnectMessageHandlerTest.java rename to mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttDisconnectMessageHandlerTest.java index 0f474b13..088b6230 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttDisconnectMessageHandlerTest.java +++ b/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttDisconnectMessageHandlerTest.java @@ -14,20 +14,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.snode.processor; +package org.apache.rocketmq.mqtt; import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttQoS; -import org.apache.rocketmq.common.MqttConfig; -import org.apache.rocketmq.common.SnodeConfig; +import org.apache.rocketmq.common.client.Client; import org.apache.rocketmq.common.message.mqtt.WillMessage; +import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl; +import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttDisconnectMessageHandler; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.remoting.RemotingChannel; -import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.client.Client; -import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl; -import org.apache.rocketmq.snode.processor.mqtthandler.MqttDisconnectMessageHandler; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; @@ -39,17 +37,19 @@ public class MqttDisconnectMessageHandlerTest { @Mock private RemotingChannel remotingChannel; + @Mock + private DefaultMqttMessageProcessor defaultMqttMessageProcessor; + @Test public void testHandlerMessage() throws Exception { - SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig()); MqttDisconnectMessageHandler mqttDisconnectMessageHandler = new MqttDisconnectMessageHandler( - snodeController); + defaultMqttMessageProcessor); Client client = new Client(); client.setRemotingChannel(remotingChannel); client.setClientId("123456"); - snodeController.getIotClientManager().register(IOTClientManagerImpl.IOT_GROUP, client); - snodeController.getWillMessageService().saveWillMessage("123456", new WillMessage()); + defaultMqttMessageProcessor.getIotClientManager().register(IOTClientManagerImpl.IOT_GROUP, client); + defaultMqttMessageProcessor.getWillMessageService().saveWillMessage("123456", new WillMessage()); MqttMessage mqttDisconnectMessage = new MqttMessage(new MqttFixedHeader( MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 200)); diff --git a/snode/src/test/java/org/apache/rocketmq/snode/service/WillMessageServiceImplTest.java b/mqtt/src/test/java/org/apache/rocketmq/mqtt/WillMessageServiceImplTest.java similarity index 71% rename from snode/src/test/java/org/apache/rocketmq/snode/service/WillMessageServiceImplTest.java rename to mqtt/src/test/java/org/apache/rocketmq/mqtt/WillMessageServiceImplTest.java index 57f7c7a5..dfce1148 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/service/WillMessageServiceImplTest.java +++ b/mqtt/src/test/java/org/apache/rocketmq/mqtt/WillMessageServiceImplTest.java @@ -14,31 +14,24 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.snode.service; +package org.apache.rocketmq.mqtt; -import org.apache.rocketmq.common.MqttConfig; -import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.message.mqtt.WillMessage; -import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.SnodeTestBase; -import org.apache.rocketmq.snode.service.impl.WillMessageServiceImpl; +import org.apache.rocketmq.mqtt.service.WillMessageService; +import org.apache.rocketmq.mqtt.service.impl.WillMessageServiceImpl; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) -public class WillMessageServiceImplTest extends SnodeTestBase { - - @Spy - private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig()); +public class WillMessageServiceImplTest { private WillMessageService willMessageService; @Before public void init() { - willMessageService = new WillMessageServiceImpl(snodeController); + willMessageService = new WillMessageServiceImpl(); } @Test diff --git a/pom.xml b/pom.xml index 3899247f..ed34c04d 100644 --- a/pom.xml +++ b/pom.xml @@ -126,6 +126,7 @@ distribution openmessaging logging + mqtt snode acl @@ -533,6 +534,11 @@ rocketmq-snode ${project.version} + + ${project.groupId} + rocketmq-mqtt + ${project.version} + org.slf4j slf4j-api diff --git a/snode/pom.xml b/snode/pom.xml index da1e3f3e..17158c0c 100644 --- a/snode/pom.xml +++ b/snode/pom.xml @@ -97,6 +97,10 @@ ${project.groupId} rocketmq-broker + + ${project.groupId} + rocketmq-mqtt + diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java index 6f54ae02..d3a48b78 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java @@ -27,11 +27,13 @@ import org.apache.rocketmq.broker.BrokerStartup; import org.apache.rocketmq.common.MqttConfig; import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.client.ClientManager; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.RemotingClient; import org.apache.rocketmq.remoting.RemotingClientFactory; @@ -48,18 +50,15 @@ import org.apache.rocketmq.remoting.interceptor.ResponseContext; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.util.ServiceProvider; import org.apache.rocketmq.snode.client.ClientHousekeepingService; -import org.apache.rocketmq.snode.client.ClientManager; import org.apache.rocketmq.snode.client.SlowConsumerService; import org.apache.rocketmq.snode.client.SubscriptionGroupManager; import org.apache.rocketmq.snode.client.SubscriptionManager; import org.apache.rocketmq.snode.client.impl.ConsumerManagerImpl; -import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl; import org.apache.rocketmq.snode.client.impl.ProducerManagerImpl; import org.apache.rocketmq.snode.client.impl.SlowConsumerServiceImpl; import org.apache.rocketmq.snode.client.impl.SubscriptionManagerImpl; import org.apache.rocketmq.snode.offset.ConsumerOffsetManager; import org.apache.rocketmq.snode.processor.ConsumerManageProcessor; -import org.apache.rocketmq.snode.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.snode.processor.HeartbeatProcessor; import org.apache.rocketmq.snode.processor.PullMessageProcessor; import org.apache.rocketmq.snode.processor.SendMessageProcessor; @@ -69,16 +68,13 @@ import org.apache.rocketmq.snode.service.MetricsService; import org.apache.rocketmq.snode.service.NnodeService; import org.apache.rocketmq.snode.service.PushService; import org.apache.rocketmq.snode.service.ScheduledService; -import org.apache.rocketmq.snode.service.WillMessageService; import org.apache.rocketmq.snode.service.impl.ClientServiceImpl; import org.apache.rocketmq.snode.service.impl.LocalEnodeServiceImpl; import org.apache.rocketmq.snode.service.impl.MetricsServiceImpl; -import org.apache.rocketmq.snode.service.impl.MqttPushServiceImpl; import org.apache.rocketmq.snode.service.impl.NnodeServiceImpl; import org.apache.rocketmq.snode.service.impl.PushServiceImpl; import org.apache.rocketmq.snode.service.impl.RemoteEnodeServiceImpl; import org.apache.rocketmq.snode.service.impl.ScheduledServiceImpl; -import org.apache.rocketmq.snode.service.impl.WillMessageServiceImpl; public class SnodeController { @@ -105,7 +101,7 @@ public class SnodeController { private ScheduledService scheduledService; private ClientManager producerManager; private ClientManager consumerManager; - private ClientManager iotClientManager; +// private ClientManager iotClientManager; private SubscriptionManager subscriptionManager; private ClientHousekeepingService clientHousekeepingService; private SubscriptionGroupManager subscriptionGroupManager; @@ -122,8 +118,8 @@ public class SnodeController { private ClientService clientService; private SlowConsumerService slowConsumerService; private MetricsService metricsService; - private WillMessageService willMessageService; - private MqttPushServiceImpl mqttPushService; +// private WillMessageService willMessageService; +// private MqttPushServiceImpl mqttPushService; private final ScheduledExecutorService scheduledExecutorService = Executors .newSingleThreadScheduledExecutor(new ThreadFactoryImpl( @@ -153,7 +149,12 @@ public class SnodeController { if (this.mqttRemotingClient != null) { this.mqttRemotingClient.init(this.mqttClientConfig, null); } - + this.mqttRemotingServer = RemotingServerFactory.getInstance().createRemotingServer( + RemotingUtil.MQTT_PROTOCOL); + if (this.mqttRemotingServer != null) { + this.mqttRemotingServer.init(this.mqttServerConfig, this.clientHousekeepingService); + this.mqttRemotingServer.registerInterceptorGroup(this.remotingServerInterceptorGroup); + } this.sendMessageExecutor = ThreadUtils.newThreadPoolExecutor( snodeConfig.getSnodeSendMessageMinPoolSize(), snodeConfig.getSnodeSendMessageMaxPoolSize(), @@ -211,19 +212,19 @@ public class SnodeController { this.sendMessageProcessor = new SendMessageProcessor(this); this.heartbeatProcessor = new HeartbeatProcessor(this); this.pullMessageProcessor = new PullMessageProcessor(this); - this.defaultMqttMessageProcessor = new DefaultMqttMessageProcessor(this); + this.defaultMqttMessageProcessor = new DefaultMqttMessageProcessor(this.mqttConfig, mqttRemotingServer); this.pushService = new PushServiceImpl(this); this.clientService = new ClientServiceImpl(this); this.subscriptionManager = new SubscriptionManagerImpl(); this.producerManager = new ProducerManagerImpl(); this.consumerManager = new ConsumerManagerImpl(this); - this.iotClientManager = new IOTClientManagerImpl(this); +// this.iotClientManager = new IOTClientManagerImpl(this); this.clientHousekeepingService = new ClientHousekeepingService(this.producerManager, - this.consumerManager, this.iotClientManager); + this.consumerManager, null); this.slowConsumerService = new SlowConsumerServiceImpl(this); this.metricsService = new MetricsServiceImpl(); - this.willMessageService = new WillMessageServiceImpl(this); - this.mqttPushService = new MqttPushServiceImpl(this); +// this.willMessageService = new WillMessageServiceImpl(this); +// this.mqttPushService = new MqttPushServiceImpl(this); } public SnodeConfig getSnodeConfig() { @@ -258,12 +259,6 @@ public class SnodeController { this.snodeServer.init(this.nettyServerConfig, this.clientHousekeepingService); this.snodeServer.registerInterceptorGroup(this.remotingServerInterceptorGroup); } - this.mqttRemotingServer = RemotingServerFactory.getInstance().createRemotingServer( - RemotingUtil.MQTT_PROTOCOL); - if (this.mqttRemotingServer != null) { - this.mqttRemotingServer.init(this.mqttServerConfig, this.clientHousekeepingService); - this.mqttRemotingServer.registerInterceptorGroup(this.remotingServerInterceptorGroup); - } registerProcessor(); return true; } @@ -507,13 +502,13 @@ public class SnodeController { this.consumerManager = consumerManager; } - public ClientManager getIotClientManager() { +/* public ClientManager getIotClientManager() { return iotClientManager; } public void setIotClientManager(ClientManager iotClientManager) { this.iotClientManager = iotClientManager; - } + }*/ public SubscriptionManager getSubscriptionManager() { return subscriptionManager; @@ -551,7 +546,7 @@ public class SnodeController { this.metricsService = metricsService; } - public WillMessageService getWillMessageService() { +/* public WillMessageService getWillMessageService() { return willMessageService; } @@ -566,5 +561,5 @@ public class SnodeController { public void setMqttPushService(MqttPushServiceImpl mqttPushService) { this.mqttPushService = mqttPushService; - } + }*/ } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java b/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java index 899a9ef2..590f479b 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java @@ -18,6 +18,8 @@ package org.apache.rocketmq.snode.client; import io.netty.channel.Channel; import io.netty.util.Attribute; +import org.apache.rocketmq.common.client.Client; +import org.apache.rocketmq.common.client.ClientManager; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; @@ -49,7 +51,7 @@ public class ClientHousekeepingService implements ChannelEventListener { public void shutdown() { this.producerManager.shutdown(); this.consumerManager.shutdown(); - this.iotClientManager.shutdown(); +// this.iotClientManager.shutdown(); } private Client getClient(RemotingChannel remotingChannel) { @@ -75,9 +77,9 @@ public class ClientHousekeepingService implements ChannelEventListener { case Producer: this.producerManager.onClose(client.getGroups(), remotingChannel); return; - case IOTCLIENT: - this.iotClientManager.onClose(client.getGroups(), remotingChannel); - return; +// case IOTCLIENT: +// this.iotClientManager.onClose(client.getGroups(), remotingChannel); +// return; default: } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionManager.java b/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionManager.java index a648ba81..4247a4d2 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionManager.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionManager.java @@ -17,13 +17,13 @@ package org.apache.rocketmq.snode.client; import java.util.Set; +import org.apache.rocketmq.common.client.Subscription; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.remoting.RemotingChannel; -import org.apache.rocketmq.snode.client.impl.Subscription; public interface SubscriptionManager { boolean subscribe(String groupId, Set subscriptionDataSet, ConsumeType consumeType, diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ConsumerManagerImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ConsumerManagerImpl.java index fb6693c9..6e6d8daa 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ConsumerManagerImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ConsumerManagerImpl.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.snode.client.impl; +import org.apache.rocketmq.common.client.ClientManagerImpl; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ProducerManagerImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ProducerManagerImpl.java index 150bc427..03ee085d 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ProducerManagerImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ProducerManagerImpl.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.snode.client.impl; +import org.apache.rocketmq.common.client.ClientManagerImpl; import org.apache.rocketmq.remoting.RemotingChannel; public class ProducerManagerImpl extends ClientManagerImpl { diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java index 4a4a35e8..dffd210a 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java @@ -21,6 +21,7 @@ import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import org.apache.rocketmq.common.client.Subscription; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageQueue; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java b/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java index 7798c0df..789599fc 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java @@ -17,8 +17,8 @@ package org.apache.rocketmq.snode.constant; import io.netty.util.AttributeKey; -import org.apache.rocketmq.snode.client.Client; -import org.apache.rocketmq.snode.client.impl.ClientRole; +import org.apache.rocketmq.common.client.Client; +import org.apache.rocketmq.common.client.ClientRole; public class SnodeConstant { public static final long HEARTBEAT_TIME_OUT = 3000; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java index 3a7d822d..4d9c3fec 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java @@ -20,6 +20,8 @@ import io.netty.channel.Channel; import io.netty.util.Attribute; import java.util.HashSet; import java.util.Set; +import org.apache.rocketmq.common.client.Client; +import org.apache.rocketmq.common.client.ClientRole; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.protocol.RequestCode; @@ -41,8 +43,6 @@ import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl; import org.apache.rocketmq.remoting.netty.NettyChannelImpl; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.client.Client; -import org.apache.rocketmq.snode.client.impl.ClientRole; import org.apache.rocketmq.snode.constant.SnodeConstant; public class HeartbeatProcessor implements RequestProcessor { diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java index 95982bc7..9b0f2720 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.snode.processor; import java.util.concurrent.CompletableFuture; +import org.apache.rocketmq.common.client.Subscription; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.help.FAQUrl; import org.apache.rocketmq.common.protocol.ResponseCode; @@ -36,7 +37,6 @@ import org.apache.rocketmq.remoting.interceptor.RequestContext; import org.apache.rocketmq.remoting.interceptor.ResponseContext; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.client.impl.Subscription; public class PullMessageProcessor implements RequestProcessor { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java index 24a5a9c2..4a5326c8 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java @@ -24,6 +24,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.client.Client; +import org.apache.rocketmq.common.client.Subscription; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; @@ -40,8 +42,6 @@ import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.remoting.netty.NettyChannelImpl; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.client.Client; -import org.apache.rocketmq.snode.client.impl.Subscription; import org.apache.rocketmq.snode.constant.SnodeConstant; import org.apache.rocketmq.snode.service.PushService; -- GitLab