diff --git a/common/src/main/java/org/apache/rocketmq/common/MqttConfig.java b/common/src/main/java/org/apache/rocketmq/common/MqttConfig.java index fc9128d15f31d95eec218f6c4521c217cad1eb3d..2966a3ba72a6487d04e420e672c3a7dd85be468b 100644 --- a/common/src/main/java/org/apache/rocketmq/common/MqttConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/MqttConfig.java @@ -50,6 +50,8 @@ public class MqttConfig { @ImportantField private boolean aclEnable = false; + private long houseKeepingInterval = 10 * 1000; + public int getListenPort() { return listenPort; } @@ -130,4 +132,12 @@ public class MqttConfig { public void setPushMqttMessageThreadPoolQueueCapacity(int pushMqttMessageThreadPoolQueueCapacity) { this.pushMqttMessageThreadPoolQueueCapacity = pushMqttMessageThreadPoolQueueCapacity; } + + public long getHouseKeepingInterval() { + return houseKeepingInterval; + } + + public void setHouseKeepingInterval(long houseKeepingInterval) { + this.houseKeepingInterval = houseKeepingInterval; + } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/Client.java b/common/src/main/java/org/apache/rocketmq/common/client/Client.java similarity index 98% rename from snode/src/main/java/org/apache/rocketmq/snode/client/Client.java rename to common/src/main/java/org/apache/rocketmq/common/client/Client.java index abaf30dfd36f8f4ec91d5dfef7a133095829bd73..1719b6bc69e8e1ad01155cdd8377ba692815c588 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/Client.java +++ b/common/src/main/java/org/apache/rocketmq/common/client/Client.java @@ -14,13 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.snode.client; +package org.apache.rocketmq.common.client; import java.util.Objects; import java.util.Set; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.serialize.LanguageCode; -import org.apache.rocketmq.snode.client.impl.ClientRole; public class Client { diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientManager.java b/common/src/main/java/org/apache/rocketmq/common/client/ClientManager.java similarity index 96% rename from snode/src/main/java/org/apache/rocketmq/snode/client/ClientManager.java rename to common/src/main/java/org/apache/rocketmq/common/client/ClientManager.java index 600dd16eb5cc16a88c1872c2b68b045060641502..c42244b5343980d436e18f2b178ef7cb91ddea07 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientManager.java +++ b/common/src/main/java/org/apache/rocketmq/common/client/ClientManager.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.snode.client; +package org.apache.rocketmq.common.client; import java.util.List; import java.util.Set; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientManagerImpl.java b/common/src/main/java/org/apache/rocketmq/common/client/ClientManagerImpl.java similarity index 96% rename from snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientManagerImpl.java rename to common/src/main/java/org/apache/rocketmq/common/client/ClientManagerImpl.java index d0cddceca61d28d0caf3b855ba19a2a2d815cffe..e39ca7013c43996659bc334829036ae7db32d3fb 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientManagerImpl.java +++ b/common/src/main/java/org/apache/rocketmq/common/client/ClientManagerImpl.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.snode.client.impl; +package org.apache.rocketmq.common.client; import java.util.ArrayList; import java.util.Iterator; @@ -33,8 +33,6 @@ import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl; import org.apache.rocketmq.remoting.netty.NettyChannelImpl; -import org.apache.rocketmq.snode.client.Client; -import org.apache.rocketmq.snode.client.ClientManager; public abstract class ClientManagerImpl implements ClientManager { @@ -45,7 +43,7 @@ public abstract class ClientManagerImpl implements ClientManager { .newSingleThreadScheduledExecutor( new ThreadFactoryImpl("ClientHousekeepingScheduledThread")); - private final ConcurrentHashMap> groupClientTable = new ConcurrentHashMap<>( + private final ConcurrentHashMap> groupClientTable = new ConcurrentHashMap>( 1024); public abstract void onClosed(String group, RemotingChannel remotingChannel); @@ -176,7 +174,7 @@ public abstract class ClientManagerImpl implements ClientManager { public List getChannels(String groupId) { if (groupId != null) { - List result = new ArrayList<>(); + List result = new ArrayList(); ConcurrentHashMap channelsMap = this.groupClientTable.get(groupId); if (channelsMap != null) { result.addAll(this.groupClientTable.get(groupId).keySet()); @@ -189,7 +187,7 @@ public abstract class ClientManagerImpl implements ClientManager { @Override public List getAllClientId(String groupId) { - List result = new ArrayList<>(); + List result = new ArrayList(); Map channelClientMap = this.groupClientTable.get(groupId); if (channelClientMap != null) { Iterator> it = channelClientMap.entrySet() diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientRole.java b/common/src/main/java/org/apache/rocketmq/common/client/ClientRole.java similarity index 50% rename from snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientRole.java rename to common/src/main/java/org/apache/rocketmq/common/client/ClientRole.java index 9967976d9000afe5a7349c41705d2f9b796007d7..5abe69d53873d38685ce7f1005191c756c698d00 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientRole.java +++ b/common/src/main/java/org/apache/rocketmq/common/client/ClientRole.java @@ -1,4 +1,21 @@ -package org.apache.rocketmq.snode.client.impl;/* +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.client;/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/Subscription.java b/common/src/main/java/org/apache/rocketmq/common/client/Subscription.java similarity index 96% rename from snode/src/main/java/org/apache/rocketmq/snode/client/impl/Subscription.java rename to common/src/main/java/org/apache/rocketmq/common/client/Subscription.java index 895d4c9a67511ca7d04bdd31ab095d79c10fcd23..3c524b344b1cc8c1bc71584e289b51dd556e51fb 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/Subscription.java +++ b/common/src/main/java/org/apache/rocketmq/common/client/Subscription.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.snode.client.impl; +package org.apache.rocketmq.common.client; import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; @@ -27,7 +27,7 @@ public class Subscription { private volatile MessageModel messageModel; private volatile ConsumeFromWhere consumeFromWhere; private volatile boolean cleanSession; - ConcurrentHashMap subscriptionTable = new ConcurrentHashMap<>(); + ConcurrentHashMap subscriptionTable = new ConcurrentHashMap(); private volatile long lastUpdateTimestamp = System.currentTimeMillis(); public SubscriptionData getSubscriptionData(String topic) { diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java index 48295a38c0b6a7624f4b31e893e9be3eefd6e16c..637e112d1af704d959d8c051014afc2faa71d7a7 100644 --- a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java +++ b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java @@ -38,4 +38,5 @@ public class LoggerName { public static final String WATER_MARK_LOGGER_NAME = "RocketmqWaterMark"; public static final String FILTER_LOGGER_NAME = "RocketmqFilter"; public static final String SNODE_LOGGER_NAME = "RocketmqSnode"; + public static final String MQTT_LOGGER_NAME = "RocketmqMQTT"; } diff --git a/distribution/conf/snode.conf b/distribution/conf/snode.conf index 649057d9496653ad80748f06b4de45d45dd0cfc6..c710b8c66d54fd24c91e21e05c8bd8b664104a9c 100644 --- a/distribution/conf/snode.conf +++ b/distribution/conf/snode.conf @@ -15,4 +15,4 @@ namesrvAddr=localhost:9876 clusterName = DefaultCluster snodeName = snode-a - +embeddedModeEnable = false diff --git a/distribution/release.xml b/distribution/release.xml index 181a99ecc115f6c05e79b90dfa12dc77ff0ce6b7..60ca331900567666f643020695d5455c857fdf3c 100644 --- a/distribution/release.xml +++ b/distribution/release.xml @@ -67,6 +67,7 @@ org.apache.rocketmq:rocketmq-namesrv org.apache.rocketmq:rocketmq-example org.apache.rocketmq:rocketmq-openmessaging + org.apache.rocketmq:rocketmq-mqtt org.apache.rocketmq:rocketmq-snode diff --git a/example/src/main/java/org/apache/rocketmq/example/mqtt/MqttSampleConsumer.java b/example/src/main/java/org/apache/rocketmq/example/mqtt/MqttSampleConsumer.java index 8c52a8f1e62da0278a3a1cf77c2bbd95cabb1952..b2c6bb3e5bdafa6dce0323894819fc12c74a2069 100644 --- a/example/src/main/java/org/apache/rocketmq/example/mqtt/MqttSampleConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/mqtt/MqttSampleConsumer.java @@ -50,17 +50,17 @@ public class MqttSampleConsumer { log.info("Connected"); sampleClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable throwable) { - System.out.println("connection lost." + throwable.getLocalizedMessage()); + log.info("connection lost." + throwable.getLocalizedMessage()); } @Override public void messageArrived(String s, MqttMessage message) throws Exception { - System.out.println(message.toString()); + log.info(message.toString()); // System.exit(0); } @Override public void deliveryComplete(IMqttDeliveryToken token) { try { - System.out.println("delivery complete." + token.getMessage().toString()); + log.info("delivery complete." + token.getMessage().toString()); } catch (MqttException e) { e.printStackTrace(); } diff --git a/mqtt/pom.xml b/mqtt/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..01b19b8278fe43c6cfa17c2473ee17fda6691f64 --- /dev/null +++ b/mqtt/pom.xml @@ -0,0 +1,103 @@ + + + + 4.0.0 + + org.apache.rocketmq + rocketmq-all + 4.4.1-SNAPSHOT + + rocketmq-mqtt + rocketmq-mqtt ${project.version} + + http://maven.apache.org + + UTF-8 + + + + ${project.groupId} + rocketmq-common + + + ${project.groupId} + rocketmq-store + + + ${project.groupId} + rocketmq-remoting + + + ${project.groupId} + rocketmq-client + + + ${project.groupId} + rocketmq-srvutil + + + ${project.groupId} + rocketmq-filter + + + ${project.groupId} + rocketmq-acl + + + ch.qos.logback + logback-classic + + + ch.qos.logback + logback-core + + + com.alibaba + fastjson + + + org.javassist + javassist + + + org.slf4j + slf4j-api + + + org.yaml + snakeyaml + + + io.prometheus + simpleclient + + + io.prometheus + simpleclient_httpserver + + + io.prometheus + simpleclient_hotspot + + + ${project.groupId} + rocketmq-broker + + + diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/IOTClientManagerImpl.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/IOTClientManagerImpl.java similarity index 90% rename from snode/src/main/java/org/apache/rocketmq/snode/client/impl/IOTClientManagerImpl.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/client/IOTClientManagerImpl.java index 66ec2f7ef9ef8083cef70681a65894af92166373..5f56d16b041c910f82f2ab200f6581992a19cd7b 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/IOTClientManagerImpl.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/IOTClientManagerImpl.java @@ -14,34 +14,33 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.snode.client.impl; +package org.apache.rocketmq.mqtt.client; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import org.apache.rocketmq.common.client.Client; +import org.apache.rocketmq.common.client.ClientManagerImpl; +import org.apache.rocketmq.common.client.Subscription; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.RemotingChannel; -import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.client.Client; public class IOTClientManagerImpl extends ClientManagerImpl { - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); public static final String IOT_GROUP = "IOT_GROUP"; - private final SnodeController snodeController; private final ConcurrentHashMap>> topic2SubscriptionTable = new ConcurrentHashMap<>( 1024); private final ConcurrentHashMap clientId2Subscription = new ConcurrentHashMap<>(1024); - public IOTClientManagerImpl(SnodeController snodeController) { - this.snodeController = snodeController; + public IOTClientManagerImpl() { } public void onUnsubscribe(Client client, List topics) { @@ -89,6 +88,9 @@ public class IOTClientManagerImpl extends ClientManagerImpl { } iterator1.remove(); } + if (next.getValue() == null || next.getValue().size() == 0) { + iterator.remove(); + } } //remove offline messages } @@ -97,10 +99,6 @@ public class IOTClientManagerImpl extends ClientManagerImpl { return clientId2Subscription.get(clientId); } - public SnodeController getSnodeController() { - return snodeController; - } - public ConcurrentHashMap>> getTopic2SubscriptionTable() { return topic2SubscriptionTable; } @@ -112,5 +110,4 @@ public class IOTClientManagerImpl extends ClientManagerImpl { public void initSubscription(String clientId, Subscription subscription) { clientId2Subscription.put(clientId, subscription); } - } diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/MqttClientHousekeepingService.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/MqttClientHousekeepingService.java new file mode 100644 index 0000000000000000000000000000000000000000..fd2fe0f4e94860c125e5b0ea04a31a3b9e899167 --- /dev/null +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/MqttClientHousekeepingService.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.mqtt.client; + +import io.netty.channel.Channel; +import io.netty.util.Attribute; +import org.apache.rocketmq.common.client.Client; +import org.apache.rocketmq.common.client.ClientManager; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.mqtt.constant.MqttConstant; +import org.apache.rocketmq.remoting.ChannelEventListener; +import org.apache.rocketmq.remoting.RemotingChannel; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.netty.NettyChannelImpl; + +public class MqttClientHousekeepingService implements ChannelEventListener { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); + private final ClientManager iotClientManager; + + public MqttClientHousekeepingService(final ClientManager iotClientManager) { + this.iotClientManager = iotClientManager; + } + + public void start(long interval) { + this.iotClientManager.startScan(interval); + } + + public void shutdown() { + this.iotClientManager.shutdown(); + } + + private Client getClient(RemotingChannel remotingChannel) { + if (remotingChannel instanceof NettyChannelImpl) { + Channel channel = ((NettyChannelImpl) remotingChannel).getChannel(); + Attribute clientAttribute = channel.attr(MqttConstant.MQTT_CLIENT_ATTRIBUTE_KEY); + if (clientAttribute != null) { + Client client = clientAttribute.get(); + return client; + } + } + log.warn("RemotingChannel type error: {}", remotingChannel.getClass()); + return null; + } + + private void closeChannel(String remoteAddress, RemotingChannel remotingChannel) { + Client client = getClient(remotingChannel); + if (client != null) { + switch (client.getClientRole()) { + case IOTCLIENT: + this.iotClientManager.onClose(client.getGroups(), remotingChannel); + return; + default: + } + } + log.warn("Close channel without any role"); + } + + @Override + public void onChannelConnect(String remoteAddr, RemotingChannel channel) { + log.info("Remoting channel connected: {}", RemotingHelper.parseChannelRemoteAddr(channel.remoteAddress())); + + } + + @Override + public void onChannelClose(String remoteAddr, RemotingChannel remotingChannel) { + log.info("Remoting channel closed: {}", RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress())); + closeChannel(remoteAddr, remotingChannel); + } + + @Override + public void onChannelException(String remoteAddr, RemotingChannel remotingChannel) { + log.info("Remoting channel exception: {}", RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress())); + closeChannel(remoteAddr, remotingChannel); + + } + + @Override + public void onChannelIdle(String remoteAddr, RemotingChannel remotingChannel) { + log.info("Remoting channel idle: {}", RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress())); + closeChannel(remoteAddr, remotingChannel); + } +} diff --git a/snode/src/main/java/org/apache/rocketmq/snode/constant/MqttConstant.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/constant/MqttConstant.java similarity index 78% rename from snode/src/main/java/org/apache/rocketmq/snode/constant/MqttConstant.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/constant/MqttConstant.java index 2d2ecaa251a017e5fe91f96f9d7e205b20fa5bd3..9e803a39da407b2b9044d4afd6cbe42fadaf0bb3 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/constant/MqttConstant.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/constant/MqttConstant.java @@ -15,11 +15,16 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.constant; +package org.apache.rocketmq.mqtt.constant; + +import io.netty.util.AttributeKey; +import org.apache.rocketmq.common.client.Client; public class MqttConstant { public static final int MAX_SUPPORTED_QOS = 0; public static final String SUBSCRIPTION_FLAG_PLUS = "+"; public static final String SUBSCRIPTION_FLAG_SHARP = "#"; public static final String SUBSCRIPTION_SEPARATOR = "/"; + public static final long DEFAULT_TIMEOUT_MILLS = 3000L; + public static final AttributeKey MQTT_CLIENT_ATTRIBUTE_KEY = AttributeKey.valueOf("mqtt.client"); } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/exception/MqttConnectException.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/exception/MqttConnectException.java similarity index 95% rename from snode/src/main/java/org/apache/rocketmq/snode/exception/MqttConnectException.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/exception/MqttConnectException.java index f636842f94292aab3695def6e8c28d1abe164bf0..93eb733378e79c9324b8a4e29d978e35de0d97ba 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/exception/MqttConnectException.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/exception/MqttConnectException.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.exception; +package org.apache.rocketmq.mqtt.exception; public class MqttConnectException extends RuntimeException { diff --git a/snode/src/main/java/org/apache/rocketmq/snode/exception/WrongMessageTypeException.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/exception/WrongMessageTypeException.java similarity index 95% rename from snode/src/main/java/org/apache/rocketmq/snode/exception/WrongMessageTypeException.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/exception/WrongMessageTypeException.java index 355c7f7732cd194db36170d84c2ac92e5425974f..d008d78dcc18462c9849a13d632080d9e619e21f 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/exception/WrongMessageTypeException.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/exception/WrongMessageTypeException.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.exception; +package org.apache.rocketmq.mqtt.exception; public class WrongMessageTypeException extends RuntimeException { diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/MessageHandler.java similarity index 95% rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MessageHandler.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/MessageHandler.java index 241531655445ceb025939578be1e965c03834e76..fa0be8bce67f37197c8838b1964a4bf1ef807cce 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MessageHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/MessageHandler.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.processor.mqtthandler; +package org.apache.rocketmq.mqtt.mqtthandler; import io.netty.handler.codec.mqtt.MqttMessage; import org.apache.rocketmq.remoting.RemotingChannel; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttConnectMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttConnectMessageHandler.java similarity index 86% rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttConnectMessageHandler.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttConnectMessageHandler.java index 96f2843b9839da4071842aaa7d7fb9170c6457f2..ecbb12476837a28fea9bfc194756214a14bd05ad 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttConnectMessageHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttConnectMessageHandler.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.processor.mqtthandler; +package org.apache.rocketmq.mqtt.mqtthandler.impl; import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttConnectPayload; @@ -25,32 +25,32 @@ import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttQoS; import java.util.HashSet; import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.client.Client; +import org.apache.rocketmq.common.client.ClientManager; +import org.apache.rocketmq.common.client.ClientRole; +import org.apache.rocketmq.common.client.Subscription; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.mqtt.WillMessage; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl; +import org.apache.rocketmq.mqtt.exception.MqttConnectException; +import org.apache.rocketmq.mqtt.exception.WrongMessageTypeException; +import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; -import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.client.Client; -import org.apache.rocketmq.snode.client.ClientManager; -import org.apache.rocketmq.snode.client.impl.ClientRole; -import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl; -import org.apache.rocketmq.snode.client.impl.Subscription; -import org.apache.rocketmq.snode.exception.MqttConnectException; -import org.apache.rocketmq.snode.exception.WrongMessageTypeException; public class MqttConnectMessageHandler implements MessageHandler { - - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); - private final SnodeController snodeController; + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); private static final int MIN_AVAILABLE_VERSION = 3; private static final int MAX_AVAILABLE_VERSION = 4; + private final DefaultMqttMessageProcessor defaultMqttMessageProcessor; - public MqttConnectMessageHandler(SnodeController snodeController) { - this.snodeController = snodeController; + public MqttConnectMessageHandler(DefaultMqttMessageProcessor defaultMqttMessageProcessor) { + this.defaultMqttMessageProcessor = defaultMqttMessageProcessor; } @Override @@ -97,7 +97,7 @@ public class MqttConnectMessageHandler implements MessageHandler { remotingChannel.close(); return null; } - IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) snodeController.getIotClientManager(); + IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) defaultMqttMessageProcessor.getIotClientManager(); //set Session Present according to whether the server has already stored Session State for the clientId if (mqttConnectMessage.variableHeader().isCleanSession()) { mqttHeader.setSessionPresent(false); @@ -120,7 +120,11 @@ public class MqttConnectMessageHandler implements MessageHandler { Client client = new Client(); client.setClientId(payload.clientIdentifier()); client.setClientRole(ClientRole.IOTCLIENT); - client.setGroups(new HashSet(){{add("IOT_GROUP");}}); + client.setGroups(new HashSet() { + { + add("IOT_GROUP"); + } + }); client.setConnected(true); client.setRemotingChannel(remotingChannel); client.setLastUpdateTimestamp(System.currentTimeMillis()); @@ -138,7 +142,7 @@ public class MqttConnectMessageHandler implements MessageHandler { willMessage.setWillTopic(payload.willTopic()); willMessage.setRetain(mqttConnectMessage.variableHeader().isWillRetain()); willMessage.setBody(payload.willMessageInBytes()); - snodeController.getWillMessageService().saveWillMessage(client.getClientId(), willMessage); + defaultMqttMessageProcessor.getWillMessageService().saveWillMessage(client.getClientId(), willMessage); } mqttHeader.setConnectReturnCode(MqttConnectReturnCode.CONNECTION_ACCEPTED.name()); @@ -148,7 +152,7 @@ public class MqttConnectMessageHandler implements MessageHandler { } private boolean alreadyStoredSession(String clientId) { - IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) snodeController.getIotClientManager(); + IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) defaultMqttMessageProcessor.getIotClientManager(); Subscription subscription = iotClientManager.getSubscriptionByClientId(clientId); if (subscription == null) { return false; @@ -164,7 +168,7 @@ public class MqttConnectMessageHandler implements MessageHandler { } private boolean isConnected(RemotingChannel remotingChannel, String clientId) { - ClientManager iotClientManager = snodeController.getIotClientManager(); + ClientManager iotClientManager = defaultMqttMessageProcessor.getIotClientManager(); Client client = iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel); if (client != null && client.getClientId().equals(clientId) && client.isConnected()) { return true; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttDisconnectMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttDisconnectMessageHandler.java similarity index 78% rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttDisconnectMessageHandler.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttDisconnectMessageHandler.java index 66affac46f45ad59ea5974f918df091495805249..5f21a4b2fca51a665d3ced9f4707984f311e8d7a 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttDisconnectMessageHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttDisconnectMessageHandler.java @@ -15,27 +15,27 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.processor.mqtthandler; +package org.apache.rocketmq.mqtt.mqtthandler.impl; import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttQoS; +import org.apache.rocketmq.common.client.Client; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl; +import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.client.Client; -import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl; public class MqttDisconnectMessageHandler implements MessageHandler { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); + private final DefaultMqttMessageProcessor defaultMqttMessageProcessor; - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); - private final SnodeController snodeController; - - public MqttDisconnectMessageHandler(SnodeController snodeController) { - this.snodeController = snodeController; + public MqttDisconnectMessageHandler(DefaultMqttMessageProcessor processor) { + this.defaultMqttMessageProcessor = processor; } /** @@ -56,10 +56,10 @@ public class MqttDisconnectMessageHandler implements MessageHandler { } //discard will message associated with the current connection(client) - Client client = snodeController.getIotClientManager() + Client client = defaultMqttMessageProcessor.getIotClientManager() .getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel); if (client != null) { - snodeController.getWillMessageService().deleteWillMessage(client.getClientId()); + defaultMqttMessageProcessor.getWillMessageService().deleteWillMessage(client.getClientId()); } client.setConnected(false); if (remotingChannel.isActive()) { diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttMessageForwarder.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttMessageForwarder.java similarity index 66% rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttMessageForwarder.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttMessageForwarder.java index eaba1a4a02d6cc328113f35de0c75dc9e78390ad..7cc9d0299cf3f028c4ef0067fb740d3d1c137795 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttMessageForwarder.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttMessageForwarder.java @@ -15,24 +15,23 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.processor.mqtthandler; +package org.apache.rocketmq.mqtt.mqtthandler.impl; import io.netty.handler.codec.mqtt.MqttMessage; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.apache.rocketmq.snode.SnodeController; public class MqttMessageForwarder implements MessageHandler { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); + private final DefaultMqttMessageProcessor defaultMqttMessageProcessor; - private final SnodeController snodeController; -/* private SubscriptionStore subscriptionStore; - - public MqttMessageForwarder(SubscriptionStore subscriptionStore) { - this.subscriptionStore = subscriptionStore; - }*/ - - public MqttMessageForwarder(SnodeController snodeController) { - this.snodeController = snodeController; + public MqttMessageForwarder(DefaultMqttMessageProcessor processor) { + this.defaultMqttMessageProcessor = processor; } /** diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttMessageSender.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttMessageSender.java similarity index 66% rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttMessageSender.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttMessageSender.java index 624a4bf32e7386b69e12c49a348914b663c3c114..e0e2ddff560faf2353c609d18ed298d85e76d97e 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttMessageSender.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttMessageSender.java @@ -15,20 +15,25 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.processor.mqtthandler; +package org.apache.rocketmq.mqtt.mqtthandler.impl; import io.netty.handler.codec.mqtt.MqttMessage; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.apache.rocketmq.snode.SnodeController; public class MqttMessageSender implements MessageHandler { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); + private final DefaultMqttMessageProcessor defaultMqttMessageProcessor; - private final SnodeController snodeController; - - public MqttMessageSender(SnodeController snodeController) { - this.snodeController = snodeController; + public MqttMessageSender(DefaultMqttMessageProcessor processor) { + this.defaultMqttMessageProcessor = processor; } + /** * send the PUBLISH message to client * diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPingreqMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPingreqMessageHandler.java similarity index 58% rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPingreqMessageHandler.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPingreqMessageHandler.java index 1f75fcc64e7f4e7773c1418f636f9a95abb2291e..d867476fd2da9466de6c1255046900c02e2aa69a 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPingreqMessageHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPingreqMessageHandler.java @@ -15,29 +15,28 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.processor.mqtthandler; +package org.apache.rocketmq.mqtt.mqtthandler.impl; import io.netty.handler.codec.mqtt.MqttMessage; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.apache.rocketmq.snode.SnodeController; public class MqttPingreqMessageHandler implements MessageHandler { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); + private final DefaultMqttMessageProcessor defaultMqttMessageProcessor; - private final SnodeController snodeController; - - public MqttPingreqMessageHandler(SnodeController snodeController) { - this.snodeController = snodeController; + public MqttPingreqMessageHandler(DefaultMqttMessageProcessor processor) { + this.defaultMqttMessageProcessor = processor; } /** - * handle the PINGREQ message from client - *
    - *
  1. check client exists
  2. - *
  3. check client is connected
  4. - *
  5. generate the PINGRESP message
  6. - *
  7. send the PINGRESP message to the client
  8. - *
+ * handle the PINGREQ message from client
  1. check client exists
  2. check client is connected
  3. + *
  4. generate the PINGRESP message
  5. send the PINGRESP message to the client
* * @param message * @return diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPubackMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPubackMessageHandler.java similarity index 59% rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPubackMessageHandler.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPubackMessageHandler.java index 69dee2694b12729b6f9c47ae8434f2e82bd8acf4..71d94589955b151bc319d55b2af108dec9c851e4 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPubackMessageHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPubackMessageHandler.java @@ -15,26 +15,30 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.processor.mqtthandler; +package org.apache.rocketmq.mqtt.mqtthandler.impl; import io.netty.handler.codec.mqtt.MqttMessage; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.apache.rocketmq.snode.SnodeController; public class MqttPubackMessageHandler implements MessageHandler { - private final SnodeController snodeController; + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); + private final DefaultMqttMessageProcessor defaultMqttMessageProcessor; - public MqttPubackMessageHandler(SnodeController snodeController) { - this.snodeController = snodeController; + public MqttPubackMessageHandler(DefaultMqttMessageProcessor processor) { + this.defaultMqttMessageProcessor = processor; } + /** - * handle the PUBACK message from the client - *
    - *
  1. remove the message from the published in-flight messages
  2. - *
  3. ack the message in the MessageStore
  4. - *
+ * handle the PUBACK message from the client
  1. remove the message from the published in-flight messages
  2. + *
  3. ack the message in the MessageStore
+ * * @param * @return */ diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPubcompMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPubcompMessageHandler.java similarity index 65% rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPubcompMessageHandler.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPubcompMessageHandler.java index e191f5a4491764b77bca83ea28e779b509e22550..2b1109a79a5cabd6defb81910781a94be9917ebe 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPubcompMessageHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPubcompMessageHandler.java @@ -15,22 +15,28 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.processor.mqtthandler; +package org.apache.rocketmq.mqtt.mqtthandler.impl; import io.netty.handler.codec.mqtt.MqttMessage; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.apache.rocketmq.snode.SnodeController; public class MqttPubcompMessageHandler implements MessageHandler { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); + private final DefaultMqttMessageProcessor defaultMqttMessageProcessor; - private final SnodeController snodeController; - - public MqttPubcompMessageHandler(SnodeController snodeController) { - this.snodeController = snodeController; + public MqttPubcompMessageHandler(DefaultMqttMessageProcessor processor) { + this.defaultMqttMessageProcessor = processor; } + /** * handle the PUBCOMP message from the client + * * @param message * @return */ diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPublishMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPublishMessageHandler.java similarity index 84% rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPublishMessageHandler.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPublishMessageHandler.java index 17776ea0f452c7831d3babe2f2f7bb88cb6f6edc..15552b018e8aeeba0d3cc2ed0e94d8f0876dc6ab 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPublishMessageHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPublishMessageHandler.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.processor.mqtthandler; +package org.apache.rocketmq.mqtt.mqtthandler.impl; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.mqtt.MqttFixedHeader; @@ -27,21 +27,20 @@ import io.netty.handler.codec.mqtt.MqttQoS; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; -import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.exception.WrongMessageTypeException; -import org.apache.rocketmq.snode.util.MqttUtil; +import org.apache.rocketmq.mqtt.exception.WrongMessageTypeException; +import org.apache.rocketmq.mqtt.util.MqttUtil; public class MqttPublishMessageHandler implements MessageHandler { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); + private final DefaultMqttMessageProcessor defaultMqttMessageProcessor; - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); - - private final SnodeController snodeController; - - public MqttPublishMessageHandler(SnodeController snodeController) { - this.snodeController = snodeController; + public MqttPublishMessageHandler(DefaultMqttMessageProcessor processor) { + this.defaultMqttMessageProcessor = processor; } @Override @@ -61,7 +60,7 @@ public class MqttPublishMessageHandler implements MessageHandler { ByteBuf payload = mqttPublishMessage.payload(); if (fixedHeader.qosLevel().equals(MqttQoS.AT_MOST_ONCE)) { - snodeController.getMqttPushService().pushMessageQos0(variableHeader.topicName(), payload); + defaultMqttMessageProcessor.getMqttPushService().pushMessageQos0(variableHeader.topicName(), payload); } else if (fixedHeader.qosLevel().equals(MqttQoS.AT_LEAST_ONCE)) { //Push messages to subscribers and add it to IN-FLIGHT messages } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPubrecMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPubrecMessageHandler.java similarity index 65% rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPubrecMessageHandler.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPubrecMessageHandler.java index edd0d55c30c2ee25f868d85dc00ed32da96b51f4..e2f10290d905c3bcb5f25401c7397fac5abf287b 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPubrecMessageHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPubrecMessageHandler.java @@ -15,22 +15,28 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.processor.mqtthandler; +package org.apache.rocketmq.mqtt.mqtthandler.impl; import io.netty.handler.codec.mqtt.MqttMessage; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.apache.rocketmq.snode.SnodeController; public class MqttPubrecMessageHandler implements MessageHandler { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); + private final DefaultMqttMessageProcessor defaultMqttMessageProcessor; - private final SnodeController snodeController; - - public MqttPubrecMessageHandler(SnodeController snodeController) { - this.snodeController = snodeController; + public MqttPubrecMessageHandler(DefaultMqttMessageProcessor processor) { + this.defaultMqttMessageProcessor = processor; } + /** * handle the PUBREC message from the clinet + * * @param message * @return */ diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPubrelMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPubrelMessageHandler.java similarity index 65% rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPubrelMessageHandler.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPubrelMessageHandler.java index c1061e4db6fda8fb3ce449b381069f9dbdf2c9aa..284a460e0d0be97aa10cf5894598ee2127e31a8a 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPubrelMessageHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPubrelMessageHandler.java @@ -15,23 +15,28 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.processor.mqtthandler; +package org.apache.rocketmq.mqtt.mqtthandler.impl; import io.netty.handler.codec.mqtt.MqttMessage; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.apache.rocketmq.snode.SnodeController; public class MqttPubrelMessageHandler implements MessageHandler { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); + private final DefaultMqttMessageProcessor defaultMqttMessageProcessor; - private final SnodeController snodeController; - - public MqttPubrelMessageHandler(SnodeController snodeController) { - this.snodeController = snodeController; + public MqttPubrelMessageHandler(DefaultMqttMessageProcessor processor) { + this.defaultMqttMessageProcessor = processor; } /** * handle the PUBREL message from the client + * * @param message * @return */ diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttSubscribeMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttSubscribeMessageHandler.java similarity index 90% rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttSubscribeMessageHandler.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttSubscribeMessageHandler.java index 34f91d367cd89ba2f3e5d816067759f1b46534ad..31cfd0c2c0774b742e6e599563116fc408e21916 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttSubscribeMessageHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttSubscribeMessageHandler.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.processor.mqtthandler; +package org.apache.rocketmq.mqtt.mqtthandler.impl; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessageType; @@ -29,31 +29,32 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import org.apache.rocketmq.common.client.Client; +import org.apache.rocketmq.common.client.Subscription; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.heartbeat.MqttSubscriptionData; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl; +import org.apache.rocketmq.mqtt.constant.MqttConstant; +import org.apache.rocketmq.mqtt.exception.WrongMessageTypeException; +import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; +import org.apache.rocketmq.mqtt.util.MqttUtil; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil; -import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.client.Client; -import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl; -import org.apache.rocketmq.snode.client.impl.Subscription; -import org.apache.rocketmq.snode.constant.MqttConstant; -import org.apache.rocketmq.snode.exception.WrongMessageTypeException; -import org.apache.rocketmq.snode.util.MqttUtil; public class MqttSubscribeMessageHandler implements MessageHandler { - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); - private final SnodeController snodeController; + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); + private final DefaultMqttMessageProcessor defaultMqttMessageProcessor; - public MqttSubscribeMessageHandler(SnodeController snodeController) { - this.snodeController = snodeController; + public MqttSubscribeMessageHandler(DefaultMqttMessageProcessor processor) { + this.defaultMqttMessageProcessor = processor; } /** @@ -72,7 +73,7 @@ public class MqttSubscribeMessageHandler implements MessageHandler { } MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) message; MqttSubscribePayload payload = mqttSubscribeMessage.payload(); - IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) snodeController.getIotClientManager(); + IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) defaultMqttMessageProcessor.getIotClientManager(); Client client = iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel); if (client == null) { log.error("Can't find associated client, the connection will be closed. remotingChannel={}, MqttMessage={}", remotingChannel.toString(), message.toString()); @@ -135,7 +136,7 @@ public class MqttSubscribeMessageHandler implements MessageHandler { //2.update topic2SubscriptionTable String rootTopic = MqttUtil.getRootTopic(mqttTopicSubscription.topicName()); ConcurrentHashMap> client2SubscriptionData = topic2SubscriptionTable.get(rootTopic); - if (client2SubscriptionData == null) { + if (client2SubscriptionData == null || client2SubscriptionData.size() == 0) { client2SubscriptionData = new ConcurrentHashMap<>(); ConcurrentHashMap> prev = topic2SubscriptionTable.putIfAbsent(rootTopic, client2SubscriptionData); if (prev != null) { diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttUnsubscribeMessagHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttUnsubscribeMessagHandler.java similarity index 86% rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttUnsubscribeMessagHandler.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttUnsubscribeMessagHandler.java index 38f6519dfbc3673a11572d6f5cdb69cbefdf3a25..34c7fdad84092dd670ed1118aba7d510838f919d 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttUnsubscribeMessagHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttUnsubscribeMessagHandler.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.processor.mqtthandler; +package org.apache.rocketmq.mqtt.mqtthandler.impl; import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; @@ -27,31 +27,31 @@ import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import org.apache.rocketmq.common.client.Client; +import org.apache.rocketmq.common.client.Subscription; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl; +import org.apache.rocketmq.mqtt.exception.WrongMessageTypeException; +import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; +import org.apache.rocketmq.mqtt.util.MqttUtil; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; -import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.client.Client; -import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl; -import org.apache.rocketmq.snode.client.impl.Subscription; -import org.apache.rocketmq.snode.exception.WrongMessageTypeException; -import org.apache.rocketmq.snode.util.MqttUtil; /** * handle the UNSUBSCRIBE message from the client */ public class MqttUnsubscribeMessagHandler implements MessageHandler { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); + private final DefaultMqttMessageProcessor defaultMqttMessageProcessor; - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); - private final SnodeController snodeController; - - public MqttUnsubscribeMessagHandler(SnodeController snodeController) { - this.snodeController = snodeController; + public MqttUnsubscribeMessagHandler(DefaultMqttMessageProcessor processor) { + this.defaultMqttMessageProcessor = processor; } @Override @@ -73,7 +73,7 @@ public class MqttUnsubscribeMessagHandler implements MessageHandler { remotingChannel.close(); return null; } - IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) snodeController.getIotClientManager(); + IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) defaultMqttMessageProcessor.getIotClientManager(); Client client = iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel); if (client == null) { log.error("Can't find associated client, the connection will be closed. remotingChannel={}, MqttMessage={}", remotingChannel.toString(), message.toString()); @@ -98,7 +98,7 @@ public class MqttUnsubscribeMessagHandler implements MessageHandler { } private void doUnsubscribe(Client client, List topics, IOTClientManagerImpl iotClientManager) { - ConcurrentHashMap clientId2Subscription = iotClientManager.getClientId2Subscription(); + ConcurrentHashMap clientId2Subscription = iotClientManager.getClientId2Subscription(); ConcurrentHashMap>> topic2SubscriptionTable = iotClientManager.getTopic2SubscriptionTable(); for (String topicFilter : topics) { diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java similarity index 64% rename from snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java index 9eb62c240cd8fd8337719d8c46e4f81c8c162414..563fdac4378a14c3d25680ece7651764b187546c 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.processor; +package org.apache.rocketmq.mqtt.processor; import io.netty.buffer.Unpooled; import io.netty.handler.codec.mqtt.MqttConnectMessage; @@ -33,55 +33,73 @@ import io.netty.handler.codec.mqtt.MqttSubscribePayload; import java.io.UnsupportedEncodingException; import java.util.HashMap; import java.util.Map; +import org.apache.rocketmq.common.MqttConfig; +import org.apache.rocketmq.common.client.ClientManager; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl; +import org.apache.rocketmq.mqtt.client.MqttClientHousekeepingService; +import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler; +import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttConnectMessageHandler; +import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttDisconnectMessageHandler; +import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPingreqMessageHandler; +import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPubackMessageHandler; +import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPubcompMessageHandler; +import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPublishMessageHandler; +import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPubrecMessageHandler; +import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPubrelMessageHandler; +import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttSubscribeMessageHandler; +import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttUnsubscribeMessagHandler; +import org.apache.rocketmq.mqtt.service.WillMessageService; +import org.apache.rocketmq.mqtt.service.impl.MqttPushServiceImpl; +import org.apache.rocketmq.mqtt.service.impl.WillMessageServiceImpl; import org.apache.rocketmq.remoting.RemotingChannel; +import org.apache.rocketmq.remoting.RemotingServer; import org.apache.rocketmq.remoting.RequestProcessor; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil; -import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.processor.mqtthandler.MessageHandler; -import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler; -import org.apache.rocketmq.snode.processor.mqtthandler.MqttDisconnectMessageHandler; -import org.apache.rocketmq.snode.processor.mqtthandler.MqttPingreqMessageHandler; -import org.apache.rocketmq.snode.processor.mqtthandler.MqttPubackMessageHandler; -import org.apache.rocketmq.snode.processor.mqtthandler.MqttPubcompMessageHandler; -import org.apache.rocketmq.snode.processor.mqtthandler.MqttPublishMessageHandler; -import org.apache.rocketmq.snode.processor.mqtthandler.MqttPubrecMessageHandler; -import org.apache.rocketmq.snode.processor.mqtthandler.MqttPubrelMessageHandler; -import org.apache.rocketmq.snode.processor.mqtthandler.MqttSubscribeMessageHandler; -import org.apache.rocketmq.snode.processor.mqtthandler.MqttUnsubscribeMessagHandler; public class DefaultMqttMessageProcessor implements RequestProcessor { - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); private Map type2handler = new HashMap<>(); - private final SnodeController snodeController; private static final int MIN_AVAILABLE_VERSION = 3; private static final int MAX_AVAILABLE_VERSION = 4; + private WillMessageService willMessageService; + private MqttPushServiceImpl mqttPushService; + private ClientManager iotClientManager; + private RemotingServer mqttRemotingServer; + private MqttClientHousekeepingService mqttClientHousekeepingService; + private MqttConfig mqttConfig; + + public DefaultMqttMessageProcessor(MqttConfig mqttConfig, RemotingServer mqttRemotingServer) { + this.willMessageService = new WillMessageServiceImpl(); + this.mqttPushService = new MqttPushServiceImpl(this, mqttConfig); + this.iotClientManager = new IOTClientManagerImpl(); + this.mqttRemotingServer = mqttRemotingServer; + this.mqttClientHousekeepingService = new MqttClientHousekeepingService(iotClientManager); + this.mqttClientHousekeepingService.start(mqttConfig.getHouseKeepingInterval()); - public DefaultMqttMessageProcessor(SnodeController snodeController) { - this.snodeController = snodeController; registerMessageHandler(MqttMessageType.CONNECT, - new MqttConnectMessageHandler(this.snodeController)); + new MqttConnectMessageHandler(this)); registerMessageHandler(MqttMessageType.DISCONNECT, - new MqttDisconnectMessageHandler(this.snodeController)); + new MqttDisconnectMessageHandler(this)); registerMessageHandler(MqttMessageType.PINGREQ, - new MqttPingreqMessageHandler(this.snodeController)); + new MqttPingreqMessageHandler(this)); registerMessageHandler(MqttMessageType.PUBLISH, - new MqttPublishMessageHandler(this.snodeController)); - registerMessageHandler(MqttMessageType.PUBACK, new MqttPubackMessageHandler(this.snodeController)); + new MqttPublishMessageHandler(this)); + registerMessageHandler(MqttMessageType.PUBACK, new MqttPubackMessageHandler(this)); registerMessageHandler(MqttMessageType.PUBCOMP, - new MqttPubcompMessageHandler(this.snodeController)); - registerMessageHandler(MqttMessageType.PUBREC, new MqttPubrecMessageHandler(this.snodeController)); - registerMessageHandler(MqttMessageType.PUBREL, new MqttPubrelMessageHandler(this.snodeController)); + new MqttPubcompMessageHandler(this)); + registerMessageHandler(MqttMessageType.PUBREC, new MqttPubrecMessageHandler(this)); + registerMessageHandler(MqttMessageType.PUBREL, new MqttPubrelMessageHandler(this)); registerMessageHandler(MqttMessageType.SUBSCRIBE, - new MqttSubscribeMessageHandler(this.snodeController)); + new MqttSubscribeMessageHandler(this)); registerMessageHandler(MqttMessageType.UNSUBSCRIBE, - new MqttUnsubscribeMessagHandler(this.snodeController)); + new MqttUnsubscribeMessagHandler(this)); } @Override @@ -127,4 +145,24 @@ public class DefaultMqttMessageProcessor implements RequestProcessor { private void registerMessageHandler(MqttMessageType type, MessageHandler handler) { type2handler.put(type, handler); } + + public WillMessageService getWillMessageService() { + return willMessageService; + } + + public MqttPushServiceImpl getMqttPushService() { + return mqttPushService; + } + + public ClientManager getIotClientManager() { + return iotClientManager; + } + + public MqttConfig getMqttConfig() { + return mqttConfig; + } + + public RemotingServer getMqttRemotingServer() { + return mqttRemotingServer; + } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/WillMessageService.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/service/WillMessageService.java similarity index 96% rename from snode/src/main/java/org/apache/rocketmq/snode/service/WillMessageService.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/service/WillMessageService.java index 516bde5f5f5176cd1840bb8674868ff240a842ba..f88d45308a3586ef1cb3cb28606c154928176f72 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/WillMessageService.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/service/WillMessageService.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.service; +package org.apache.rocketmq.mqtt.service; import org.apache.rocketmq.common.message.mqtt.WillMessage; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/MqttPushServiceImpl.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/service/impl/MqttPushServiceImpl.java similarity index 86% rename from snode/src/main/java/org/apache/rocketmq/snode/service/impl/MqttPushServiceImpl.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/service/impl/MqttPushServiceImpl.java index e853025996dd2afb4eec14cff42d906599fd93c1..9b9b17e10104e2eb3aa7bfe3caad266b5cae4fe8 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/MqttPushServiceImpl.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/service/impl/MqttPushServiceImpl.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.snode.service.impl; +package org.apache.rocketmq.mqtt.service.impl; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.mqtt.MqttMessageType; @@ -27,37 +27,38 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.rocketmq.common.MqttConfig; +import org.apache.rocketmq.common.client.Client; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl; +import org.apache.rocketmq.mqtt.constant.MqttConstant; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; +import org.apache.rocketmq.mqtt.util.MqttUtil; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl; import org.apache.rocketmq.remoting.netty.NettyChannelImpl; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; -import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.client.Client; -import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl; -import org.apache.rocketmq.snode.constant.SnodeConstant; -import org.apache.rocketmq.snode.util.MqttUtil; public class MqttPushServiceImpl { - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); - private SnodeController snodeController; private ExecutorService pushMqttMessageExecutorService; + private DefaultMqttMessageProcessor defaultMqttMessageProcessor; - public MqttPushServiceImpl(final SnodeController snodeController) { - this.snodeController = snodeController; + public MqttPushServiceImpl(DefaultMqttMessageProcessor defaultMqttMessageProcessor, MqttConfig mqttConfig) { + this.defaultMqttMessageProcessor = defaultMqttMessageProcessor; pushMqttMessageExecutorService = ThreadUtils.newThreadPoolExecutor( - this.snodeController.getMqttConfig().getPushMqttMessageMinPoolSize(), - this.snodeController.getMqttConfig().getPushMqttMessageMaxPoolSize(), + mqttConfig.getPushMqttMessageMinPoolSize(), + mqttConfig.getPushMqttMessageMaxPoolSize(), 3000, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(this.snodeController.getMqttConfig().getPushMqttMessageThreadPoolQueueCapacity()), + new ArrayBlockingQueue<>(mqttConfig.getPushMqttMessageThreadPoolQueueCapacity()), "pushMqttMessageThread", false); } @@ -86,7 +87,7 @@ public class MqttPushServiceImpl { RemotingCommand requestCommand = buildRequestCommand(topic, qos, retain, packetId); //find those clients publishing the message to - IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) snodeController.getIotClientManager(); + IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) defaultMqttMessageProcessor.getIotClientManager(); ConcurrentHashMap>> topic2SubscriptionTable = iotClientManager.getTopic2SubscriptionTable(); Set clients = new HashSet<>(); if (topic2SubscriptionTable.containsKey(MqttUtil.getRootTopic(topic))) { @@ -109,12 +110,11 @@ public class MqttPushServiceImpl { byte[] body = new byte[message.readableBytes()]; message.readBytes(body); requestCommand.setBody(body); - snodeController.getMqttRemotingServer().push(remotingChannel, requestCommand, SnodeConstant.DEFAULT_TIMEOUT_MILLS); + defaultMqttMessageProcessor.getMqttRemotingServer().push(remotingChannel, requestCommand, MqttConstant.DEFAULT_TIMEOUT_MILLS); } } catch (Exception ex) { log.warn("Exception was thrown when pushing MQTT message to topic: {}, exception={}", topic, ex.getMessage()); } finally { - System.out.println("Release Bytebuf"); ReferenceCountUtil.release(message); } } else { diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/WillMessageServiceImpl.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/service/impl/WillMessageServiceImpl.java similarity index 81% rename from snode/src/main/java/org/apache/rocketmq/snode/service/impl/WillMessageServiceImpl.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/service/impl/WillMessageServiceImpl.java index 78b4c1e37d0d2c20e83df8c50b1eda6a71bddeae..a769d907f2b0e3e4e4ad40bd89abf9f7436d2815 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/WillMessageServiceImpl.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/service/impl/WillMessageServiceImpl.java @@ -15,20 +15,17 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.service.impl; +package org.apache.rocketmq.mqtt.service.impl; import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.common.message.mqtt.WillMessage; -import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.service.WillMessageService; +import org.apache.rocketmq.mqtt.service.WillMessageService; public class WillMessageServiceImpl implements WillMessageService { private static ConcurrentHashMap willMessageTable = new ConcurrentHashMap<>(); - private final SnodeController snodeController; - public WillMessageServiceImpl(SnodeController snodeController) { - this.snodeController = snodeController; + public WillMessageServiceImpl() { } @Override diff --git a/snode/src/main/java/org/apache/rocketmq/snode/util/MqttUtil.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/util/MqttUtil.java similarity index 96% rename from snode/src/main/java/org/apache/rocketmq/snode/util/MqttUtil.java rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/util/MqttUtil.java index ef44a7acdb4bf6ff080fe1c7fd3c762487ae7af3..3eefc48ea76e987c83d84a9d725543ac20885e9c 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/util/MqttUtil.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/util/MqttUtil.java @@ -15,11 +15,11 @@ * limitations under the License. */ -package org.apache.rocketmq.snode.util; +package org.apache.rocketmq.mqtt.util; import io.netty.handler.codec.mqtt.MqttQoS; import java.util.UUID; -import org.apache.rocketmq.snode.constant.MqttConstant; +import org.apache.rocketmq.mqtt.constant.MqttConstant; public class MqttUtil { diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java b/mqtt/src/test/java/org/apache/rocketmq/mqtt/DefaultMqttMessageProcessorTest.java similarity index 92% rename from snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java rename to mqtt/src/test/java/org/apache/rocketmq/mqtt/DefaultMqttMessageProcessorTest.java index 076a005257906b413eef205dd86ae72e6690dc3b..570a86403e65a3b155ce0cfb9c12c7bac6ab5410 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java +++ b/mqtt/src/test/java/org/apache/rocketmq/mqtt/DefaultMqttMessageProcessorTest.java @@ -14,38 +14,37 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.snode.processor; +package org.apache.rocketmq.mqtt; import io.netty.handler.codec.mqtt.MqttConnectPayload; import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttQoS; import java.io.UnsupportedEncodingException; import org.apache.rocketmq.common.MqttConfig; -import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; +import org.apache.rocketmq.remoting.transport.mqtt.MqttRemotingServer; import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil; -import org.apache.rocketmq.snode.SnodeController; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; -import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) public class DefaultMqttMessageProcessorTest { private DefaultMqttMessageProcessor defaultMqttMessageProcessor; - @Spy - private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig()); - @Mock private RemotingChannel remotingChannel; + @Mock + private MqttRemotingServer mqttRemotingServer; + private String topic = "SnodeTopic"; private String group = "SnodeGroup"; @@ -54,7 +53,7 @@ public class DefaultMqttMessageProcessorTest { @Before public void init() { - defaultMqttMessageProcessor = new DefaultMqttMessageProcessor(snodeController); + defaultMqttMessageProcessor = new DefaultMqttMessageProcessor(new MqttConfig(), mqttRemotingServer); } @Test diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java b/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttConnectMessageHandlerTest.java similarity index 85% rename from snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java rename to mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttConnectMessageHandlerTest.java index b0301e7e9b59d4085fe7b5707ffbac6117a9d98a..0bc29091f9b4d9aaff5db38018ea819b97614213 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java +++ b/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttConnectMessageHandlerTest.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.snode.processor; +package org.apache.rocketmq.mqtt; import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttConnectPayload; @@ -22,11 +22,9 @@ import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttQoS; -import org.apache.rocketmq.common.MqttConfig; -import org.apache.rocketmq.common.SnodeConfig; +import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttConnectMessageHandler; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.remoting.RemotingChannel; -import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; @@ -38,10 +36,13 @@ public class MqttConnectMessageHandlerTest { @Mock private RemotingChannel remotingChannel; + @Mock + private DefaultMqttMessageProcessor defaultMqttMessageProcessor; + @Test public void testHandlerMessage() throws Exception { - MqttConnectMessageHandler mqttConnectMessageHandler = new MqttConnectMessageHandler(new SnodeController(new SnodeConfig(), new MqttConfig())); + MqttConnectMessageHandler mqttConnectMessageHandler = new MqttConnectMessageHandler(defaultMqttMessageProcessor); MqttConnectPayload payload = new MqttConnectPayload("1234567", "testTopic", "willMessage".getBytes(), null, "1234567".getBytes()); MqttConnectMessage mqttConnectMessage = new MqttConnectMessage(new MqttFixedHeader( MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 200), new MqttConnectVariableHeader(null, 4, false, false, false, 0, false, false, 50), new MqttConnectPayload("abcd", "ttest", "message".getBytes(), "user", "password".getBytes())); diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttDisconnectMessageHandlerTest.java b/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttDisconnectMessageHandlerTest.java similarity index 73% rename from snode/src/test/java/org/apache/rocketmq/snode/processor/MqttDisconnectMessageHandlerTest.java rename to mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttDisconnectMessageHandlerTest.java index 0f474b1307fd5231ca860b5f5801e400f447c069..088b6230419bda546baedabf04faecb857239f01 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttDisconnectMessageHandlerTest.java +++ b/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttDisconnectMessageHandlerTest.java @@ -14,20 +14,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.snode.processor; +package org.apache.rocketmq.mqtt; import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttQoS; -import org.apache.rocketmq.common.MqttConfig; -import org.apache.rocketmq.common.SnodeConfig; +import org.apache.rocketmq.common.client.Client; import org.apache.rocketmq.common.message.mqtt.WillMessage; +import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl; +import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttDisconnectMessageHandler; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.remoting.RemotingChannel; -import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.client.Client; -import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl; -import org.apache.rocketmq.snode.processor.mqtthandler.MqttDisconnectMessageHandler; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; @@ -39,17 +37,19 @@ public class MqttDisconnectMessageHandlerTest { @Mock private RemotingChannel remotingChannel; + @Mock + private DefaultMqttMessageProcessor defaultMqttMessageProcessor; + @Test public void testHandlerMessage() throws Exception { - SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig()); MqttDisconnectMessageHandler mqttDisconnectMessageHandler = new MqttDisconnectMessageHandler( - snodeController); + defaultMqttMessageProcessor); Client client = new Client(); client.setRemotingChannel(remotingChannel); client.setClientId("123456"); - snodeController.getIotClientManager().register(IOTClientManagerImpl.IOT_GROUP, client); - snodeController.getWillMessageService().saveWillMessage("123456", new WillMessage()); + defaultMqttMessageProcessor.getIotClientManager().register(IOTClientManagerImpl.IOT_GROUP, client); + defaultMqttMessageProcessor.getWillMessageService().saveWillMessage("123456", new WillMessage()); MqttMessage mqttDisconnectMessage = new MqttMessage(new MqttFixedHeader( MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 200)); diff --git a/snode/src/test/java/org/apache/rocketmq/snode/service/WillMessageServiceImplTest.java b/mqtt/src/test/java/org/apache/rocketmq/mqtt/WillMessageServiceImplTest.java similarity index 71% rename from snode/src/test/java/org/apache/rocketmq/snode/service/WillMessageServiceImplTest.java rename to mqtt/src/test/java/org/apache/rocketmq/mqtt/WillMessageServiceImplTest.java index 57f7c7a52346c10fa1aa36d21e613238a0039870..dfce114839497daa10782fa9f481d32f3e59ad9f 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/service/WillMessageServiceImplTest.java +++ b/mqtt/src/test/java/org/apache/rocketmq/mqtt/WillMessageServiceImplTest.java @@ -14,31 +14,24 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.snode.service; +package org.apache.rocketmq.mqtt; -import org.apache.rocketmq.common.MqttConfig; -import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.message.mqtt.WillMessage; -import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.SnodeTestBase; -import org.apache.rocketmq.snode.service.impl.WillMessageServiceImpl; +import org.apache.rocketmq.mqtt.service.WillMessageService; +import org.apache.rocketmq.mqtt.service.impl.WillMessageServiceImpl; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) -public class WillMessageServiceImplTest extends SnodeTestBase { - - @Spy - private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig()); +public class WillMessageServiceImplTest { private WillMessageService willMessageService; @Before public void init() { - willMessageService = new WillMessageServiceImpl(snodeController); + willMessageService = new WillMessageServiceImpl(); } @Test diff --git a/pom.xml b/pom.xml index 3899247f27951b60e69f9a2c7247a7f6579479b5..ed34c04dc021b054e12ae94c6c2c83ac735ddefe 100644 --- a/pom.xml +++ b/pom.xml @@ -126,6 +126,7 @@ distribution openmessaging logging + mqtt snode acl @@ -533,6 +534,11 @@ rocketmq-snode ${project.version} + + ${project.groupId} + rocketmq-mqtt + ${project.version} + org.slf4j slf4j-api diff --git a/snode/pom.xml b/snode/pom.xml index da1e3f3ef19a7155b87eae1146c21020d18a6356..17158c0c12cc664c03db43970d22fd0cef20f905 100644 --- a/snode/pom.xml +++ b/snode/pom.xml @@ -97,6 +97,10 @@ ${project.groupId} rocketmq-broker + + ${project.groupId} + rocketmq-mqtt + diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java index 6f54ae0225c9845a17a0e2bfee45397ca9788e46..d3a48b7871556e26acf618b666789eb6e17b1872 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java @@ -27,11 +27,13 @@ import org.apache.rocketmq.broker.BrokerStartup; import org.apache.rocketmq.common.MqttConfig; import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.client.ClientManager; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.RemotingClient; import org.apache.rocketmq.remoting.RemotingClientFactory; @@ -48,18 +50,15 @@ import org.apache.rocketmq.remoting.interceptor.ResponseContext; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.util.ServiceProvider; import org.apache.rocketmq.snode.client.ClientHousekeepingService; -import org.apache.rocketmq.snode.client.ClientManager; import org.apache.rocketmq.snode.client.SlowConsumerService; import org.apache.rocketmq.snode.client.SubscriptionGroupManager; import org.apache.rocketmq.snode.client.SubscriptionManager; import org.apache.rocketmq.snode.client.impl.ConsumerManagerImpl; -import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl; import org.apache.rocketmq.snode.client.impl.ProducerManagerImpl; import org.apache.rocketmq.snode.client.impl.SlowConsumerServiceImpl; import org.apache.rocketmq.snode.client.impl.SubscriptionManagerImpl; import org.apache.rocketmq.snode.offset.ConsumerOffsetManager; import org.apache.rocketmq.snode.processor.ConsumerManageProcessor; -import org.apache.rocketmq.snode.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.snode.processor.HeartbeatProcessor; import org.apache.rocketmq.snode.processor.PullMessageProcessor; import org.apache.rocketmq.snode.processor.SendMessageProcessor; @@ -69,16 +68,13 @@ import org.apache.rocketmq.snode.service.MetricsService; import org.apache.rocketmq.snode.service.NnodeService; import org.apache.rocketmq.snode.service.PushService; import org.apache.rocketmq.snode.service.ScheduledService; -import org.apache.rocketmq.snode.service.WillMessageService; import org.apache.rocketmq.snode.service.impl.ClientServiceImpl; import org.apache.rocketmq.snode.service.impl.LocalEnodeServiceImpl; import org.apache.rocketmq.snode.service.impl.MetricsServiceImpl; -import org.apache.rocketmq.snode.service.impl.MqttPushServiceImpl; import org.apache.rocketmq.snode.service.impl.NnodeServiceImpl; import org.apache.rocketmq.snode.service.impl.PushServiceImpl; import org.apache.rocketmq.snode.service.impl.RemoteEnodeServiceImpl; import org.apache.rocketmq.snode.service.impl.ScheduledServiceImpl; -import org.apache.rocketmq.snode.service.impl.WillMessageServiceImpl; public class SnodeController { @@ -105,7 +101,7 @@ public class SnodeController { private ScheduledService scheduledService; private ClientManager producerManager; private ClientManager consumerManager; - private ClientManager iotClientManager; +// private ClientManager iotClientManager; private SubscriptionManager subscriptionManager; private ClientHousekeepingService clientHousekeepingService; private SubscriptionGroupManager subscriptionGroupManager; @@ -122,8 +118,8 @@ public class SnodeController { private ClientService clientService; private SlowConsumerService slowConsumerService; private MetricsService metricsService; - private WillMessageService willMessageService; - private MqttPushServiceImpl mqttPushService; +// private WillMessageService willMessageService; +// private MqttPushServiceImpl mqttPushService; private final ScheduledExecutorService scheduledExecutorService = Executors .newSingleThreadScheduledExecutor(new ThreadFactoryImpl( @@ -153,7 +149,12 @@ public class SnodeController { if (this.mqttRemotingClient != null) { this.mqttRemotingClient.init(this.mqttClientConfig, null); } - + this.mqttRemotingServer = RemotingServerFactory.getInstance().createRemotingServer( + RemotingUtil.MQTT_PROTOCOL); + if (this.mqttRemotingServer != null) { + this.mqttRemotingServer.init(this.mqttServerConfig, this.clientHousekeepingService); + this.mqttRemotingServer.registerInterceptorGroup(this.remotingServerInterceptorGroup); + } this.sendMessageExecutor = ThreadUtils.newThreadPoolExecutor( snodeConfig.getSnodeSendMessageMinPoolSize(), snodeConfig.getSnodeSendMessageMaxPoolSize(), @@ -211,19 +212,19 @@ public class SnodeController { this.sendMessageProcessor = new SendMessageProcessor(this); this.heartbeatProcessor = new HeartbeatProcessor(this); this.pullMessageProcessor = new PullMessageProcessor(this); - this.defaultMqttMessageProcessor = new DefaultMqttMessageProcessor(this); + this.defaultMqttMessageProcessor = new DefaultMqttMessageProcessor(this.mqttConfig, mqttRemotingServer); this.pushService = new PushServiceImpl(this); this.clientService = new ClientServiceImpl(this); this.subscriptionManager = new SubscriptionManagerImpl(); this.producerManager = new ProducerManagerImpl(); this.consumerManager = new ConsumerManagerImpl(this); - this.iotClientManager = new IOTClientManagerImpl(this); +// this.iotClientManager = new IOTClientManagerImpl(this); this.clientHousekeepingService = new ClientHousekeepingService(this.producerManager, - this.consumerManager, this.iotClientManager); + this.consumerManager, null); this.slowConsumerService = new SlowConsumerServiceImpl(this); this.metricsService = new MetricsServiceImpl(); - this.willMessageService = new WillMessageServiceImpl(this); - this.mqttPushService = new MqttPushServiceImpl(this); +// this.willMessageService = new WillMessageServiceImpl(this); +// this.mqttPushService = new MqttPushServiceImpl(this); } public SnodeConfig getSnodeConfig() { @@ -258,12 +259,6 @@ public class SnodeController { this.snodeServer.init(this.nettyServerConfig, this.clientHousekeepingService); this.snodeServer.registerInterceptorGroup(this.remotingServerInterceptorGroup); } - this.mqttRemotingServer = RemotingServerFactory.getInstance().createRemotingServer( - RemotingUtil.MQTT_PROTOCOL); - if (this.mqttRemotingServer != null) { - this.mqttRemotingServer.init(this.mqttServerConfig, this.clientHousekeepingService); - this.mqttRemotingServer.registerInterceptorGroup(this.remotingServerInterceptorGroup); - } registerProcessor(); return true; } @@ -507,13 +502,13 @@ public class SnodeController { this.consumerManager = consumerManager; } - public ClientManager getIotClientManager() { +/* public ClientManager getIotClientManager() { return iotClientManager; } public void setIotClientManager(ClientManager iotClientManager) { this.iotClientManager = iotClientManager; - } + }*/ public SubscriptionManager getSubscriptionManager() { return subscriptionManager; @@ -551,7 +546,7 @@ public class SnodeController { this.metricsService = metricsService; } - public WillMessageService getWillMessageService() { +/* public WillMessageService getWillMessageService() { return willMessageService; } @@ -566,5 +561,5 @@ public class SnodeController { public void setMqttPushService(MqttPushServiceImpl mqttPushService) { this.mqttPushService = mqttPushService; - } + }*/ } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java b/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java index 899a9ef24e0cf7d5474ab76c6cecbe58536ad681..590f479b2be5aeca0a8783f14ccda2c5ef642550 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java @@ -18,6 +18,8 @@ package org.apache.rocketmq.snode.client; import io.netty.channel.Channel; import io.netty.util.Attribute; +import org.apache.rocketmq.common.client.Client; +import org.apache.rocketmq.common.client.ClientManager; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; @@ -49,7 +51,7 @@ public class ClientHousekeepingService implements ChannelEventListener { public void shutdown() { this.producerManager.shutdown(); this.consumerManager.shutdown(); - this.iotClientManager.shutdown(); +// this.iotClientManager.shutdown(); } private Client getClient(RemotingChannel remotingChannel) { @@ -75,9 +77,9 @@ public class ClientHousekeepingService implements ChannelEventListener { case Producer: this.producerManager.onClose(client.getGroups(), remotingChannel); return; - case IOTCLIENT: - this.iotClientManager.onClose(client.getGroups(), remotingChannel); - return; +// case IOTCLIENT: +// this.iotClientManager.onClose(client.getGroups(), remotingChannel); +// return; default: } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionManager.java b/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionManager.java index a648ba812960f42ec1217ac03fecd43be3c513e7..4247a4d2fc1d3a434e0009d2afd9194839634818 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionManager.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionManager.java @@ -17,13 +17,13 @@ package org.apache.rocketmq.snode.client; import java.util.Set; +import org.apache.rocketmq.common.client.Subscription; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.remoting.RemotingChannel; -import org.apache.rocketmq.snode.client.impl.Subscription; public interface SubscriptionManager { boolean subscribe(String groupId, Set subscriptionDataSet, ConsumeType consumeType, diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ConsumerManagerImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ConsumerManagerImpl.java index fb6693c92ec9cdc026c30eff58ae3249bab29f56..6e6d8daab18f8cb5491061563955f930d9647946 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ConsumerManagerImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ConsumerManagerImpl.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.snode.client.impl; +import org.apache.rocketmq.common.client.ClientManagerImpl; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ProducerManagerImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ProducerManagerImpl.java index 150bc4277a04a67664eea8bc0b917b967c60a984..03ee085d0be81b40dcdb37aae82d0279184b1289 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ProducerManagerImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ProducerManagerImpl.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.snode.client.impl; +import org.apache.rocketmq.common.client.ClientManagerImpl; import org.apache.rocketmq.remoting.RemotingChannel; public class ProducerManagerImpl extends ClientManagerImpl { diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java index 4a4a35e881bd1860f25e988a9674fc9186e47d51..dffd210a9f0c1a0b02fbd0e66538ebacc4bd2fec 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java @@ -21,6 +21,7 @@ import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import org.apache.rocketmq.common.client.Subscription; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageQueue; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java b/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java index 7798c0df78c5348ce65c71b459cfce27d3626e64..789599fc08cd7a7210335a77a225512041f4222a 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java @@ -17,8 +17,8 @@ package org.apache.rocketmq.snode.constant; import io.netty.util.AttributeKey; -import org.apache.rocketmq.snode.client.Client; -import org.apache.rocketmq.snode.client.impl.ClientRole; +import org.apache.rocketmq.common.client.Client; +import org.apache.rocketmq.common.client.ClientRole; public class SnodeConstant { public static final long HEARTBEAT_TIME_OUT = 3000; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java index 3a7d822db37101e3e3c4e118c86cd1e0299409ef..4d9c3fec73a03ceeb55bef35bbdfcbed5c32d023 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java @@ -20,6 +20,8 @@ import io.netty.channel.Channel; import io.netty.util.Attribute; import java.util.HashSet; import java.util.Set; +import org.apache.rocketmq.common.client.Client; +import org.apache.rocketmq.common.client.ClientRole; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.protocol.RequestCode; @@ -41,8 +43,6 @@ import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl; import org.apache.rocketmq.remoting.netty.NettyChannelImpl; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.client.Client; -import org.apache.rocketmq.snode.client.impl.ClientRole; import org.apache.rocketmq.snode.constant.SnodeConstant; public class HeartbeatProcessor implements RequestProcessor { diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java index 95982bc7c7c6528ebe50bd89259d2beca0405f65..9b0f2720b20e7def252ed862e6cfd04ea774b2b8 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.snode.processor; import java.util.concurrent.CompletableFuture; +import org.apache.rocketmq.common.client.Subscription; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.help.FAQUrl; import org.apache.rocketmq.common.protocol.ResponseCode; @@ -36,7 +37,6 @@ import org.apache.rocketmq.remoting.interceptor.RequestContext; import org.apache.rocketmq.remoting.interceptor.ResponseContext; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.client.impl.Subscription; public class PullMessageProcessor implements RequestProcessor { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java index 24a5a9c290b83e983e3e82f7df6b7af3fc0c390d..4a5326c8bc8e8db6260053844317632ca1227c6f 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java @@ -24,6 +24,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.client.Client; +import org.apache.rocketmq.common.client.Subscription; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; @@ -40,8 +42,6 @@ import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.remoting.netty.NettyChannelImpl; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.snode.SnodeController; -import org.apache.rocketmq.snode.client.Client; -import org.apache.rocketmq.snode.client.impl.Subscription; import org.apache.rocketmq.snode.constant.SnodeConstant; import org.apache.rocketmq.snode.service.PushService;