diff --git a/broker/pom.xml b/broker/pom.xml index 01390fd3c534a63ebb65f87636f4b7bc4913a909..ff7eb6b2ebe936ed401af5cec5c1f0015b0e2b9f 100644 --- a/broker/pom.xml +++ b/broker/pom.xml @@ -34,6 +34,7 @@ ${project.groupId} rocketmq-remoting + ${project.groupId} rocketmq-client @@ -50,6 +51,10 @@ ${project.groupId} rocketmq-acl + + ${project.groupId} + rocketmq-mqtt + ch.qos.logback logback-classic diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index d9a618b65bd776afaa8354ca748da572de48481d..53b1357a6ea25b202ea6b2cd0d5b1a6c20832639 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -58,6 +58,7 @@ import org.apache.rocketmq.broker.processor.AdminBrokerProcessor; import org.apache.rocketmq.broker.processor.ClientManageProcessor; import org.apache.rocketmq.broker.processor.ConsumerManageProcessor; import org.apache.rocketmq.broker.processor.EndTransactionProcessor; +import org.apache.rocketmq.broker.processor.MQTTProcessor; import org.apache.rocketmq.broker.processor.PullMessageProcessor; import org.apache.rocketmq.broker.processor.QueryMessageProcessor; import org.apache.rocketmq.broker.processor.SendMessageProcessor; @@ -97,7 +98,9 @@ import org.apache.rocketmq.remoting.netty.TlsSystemConfig; import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingServer; import org.apache.rocketmq.remoting.util.ServiceProvider; import org.apache.rocketmq.srvutil.FileWatchService; +import org.apache.rocketmq.store.DefaultMQTTInfoStore; import org.apache.rocketmq.store.DefaultMessageStore; +import org.apache.rocketmq.store.MQTTInfoStore; import org.apache.rocketmq.store.MessageArrivingListener; import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.config.BrokerRole; @@ -139,11 +142,14 @@ public class BrokerController { private final BlockingQueue heartbeatThreadPoolQueue; private final BlockingQueue consumerManagerThreadPoolQueue; private final BlockingQueue endTransactionThreadPoolQueue; + private final BlockingQueue mqttThreadPoolQueue; private final FilterServerManager filterServerManager; private final BrokerStatsManager brokerStatsManager; private final List sendMessageHookList = new ArrayList(); private final List consumeMessageHookList = new ArrayList(); private MessageStore messageStore; + private MQTTProcessor mqttProcessor; + private MQTTInfoStore mqttInfoStore; private RemotingServer remotingServer; private RemotingServer fastRemotingServer; private TopicConfigManager topicConfigManager; @@ -155,6 +161,7 @@ public class BrokerController { private ExecutorService heartbeatExecutor; private ExecutorService consumerManageExecutor; private ExecutorService endTransactionExecutor; + private ExecutorService mqttMessageExecutor; private boolean updateMasterHAServerAddrPeriodically = false; private BrokerStats brokerStats; private InetSocketAddress storeHost; @@ -204,6 +211,7 @@ public class BrokerController { this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity()); this.heartbeatThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity()); this.endTransactionThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getEndTransactionPoolQueueCapacity()); + this.mqttThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getMqttThreadPoolQueueCapacity()); this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName()); this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort())); @@ -319,6 +327,15 @@ public class BrokerController { this.endTransactionThreadPoolQueue, new ThreadFactoryImpl("EndTransactionThread_")); + this.mqttMessageExecutor = new BrokerFixedThreadPoolExecutor( + this.brokerConfig.getMqttMessageThreadPoolNums(), + this.brokerConfig.getMqttMessageThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.mqttThreadPoolQueue, + new ThreadFactoryImpl("MQTTThread_") + ); + this.consumerManageExecutor = Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl( "ConsumerManageThread_")); @@ -475,6 +492,14 @@ public class BrokerController { } initialTransaction(); } + try { + this.mqttInfoStore = new DefaultMQTTInfoStore(); + mqttInfoStore.load(); + mqttInfoStore.start(); + } catch (Exception e) { + log.error("Open MQTT Database failed, error: {}", e); + } + return result; } @@ -608,6 +633,30 @@ public class BrokerController { this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor); this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor); + /** + * MQTTProcessor + */ + mqttProcessor = new MQTTProcessor(this); + this.remotingServer.registerProcessor(RequestCode.MQTT_ADD_OR_UPDATE_CLIENT2SUBSCRIPTION, mqttProcessor, this.mqttMessageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.MQTT_ADD_OR_UPDATE_CLIENT2SUBSCRIPTION, mqttProcessor, this.mqttMessageExecutor); + this.remotingServer.registerProcessor(RequestCode.MQTT_IS_CLIENT2SUBSCRIPTION_PERSISTED, mqttProcessor, this.mqttMessageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.MQTT_IS_CLIENT2SUBSCRIPTION_PERSISTED, mqttProcessor, this.mqttMessageExecutor); + this.remotingServer.registerProcessor(RequestCode.MQTT_DELETE_CLIENT2SUBSCRIPTION, mqttProcessor, this.mqttMessageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.MQTT_DELETE_CLIENT2SUBSCRIPTION, mqttProcessor, this.mqttMessageExecutor); + this.remotingServer.registerProcessor(RequestCode.MQTT_GET_SNODEADDRESS2CLIENT, mqttProcessor, this.mqttMessageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.MQTT_GET_SNODEADDRESS2CLIENT, mqttProcessor, this.mqttMessageExecutor); + this.remotingServer.registerProcessor(RequestCode.MQTT_CLIENT_UNSUBSRIBE, mqttProcessor, this.mqttMessageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.MQTT_CLIENT_UNSUBSRIBE, mqttProcessor, this.mqttMessageExecutor); + this.remotingServer.registerProcessor(RequestCode.MQTT_ADD_OR_UPDATE_ROOTTOPIC2CLIENTS, mqttProcessor, this.mqttMessageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.MQTT_ADD_OR_UPDATE_ROOTTOPIC2CLIENTS, mqttProcessor, this.mqttMessageExecutor); + this.remotingServer.registerProcessor(RequestCode.MQTT_DELETE_ROOTTOPIC2CLIENT, mqttProcessor, this.mqttMessageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.MQTT_DELETE_ROOTTOPIC2CLIENT, mqttProcessor, this.mqttMessageExecutor); + this.remotingServer.registerProcessor(RequestCode.MQTT_GET_ROOTTOPIC2CLIENTS, mqttProcessor, this.mqttMessageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.MQTT_GET_ROOTTOPIC2CLIENTS, mqttProcessor, this.mqttMessageExecutor); + this.remotingServer.registerProcessor(RequestCode.MQTT_GET_SUBSCRIPTION_BY_CLIENT_ID, mqttProcessor, this.mqttMessageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.MQTT_GET_SUBSCRIPTION_BY_CLIENT_ID, mqttProcessor, this.mqttMessageExecutor); + this.remotingServer.registerProcessor(RequestCode.MQTT_GET_CLIENT_BY_CLIENTID_ID, mqttProcessor, this.mqttMessageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.MQTT_GET_CLIENT_BY_CLIENTID_ID, mqttProcessor, this.mqttMessageExecutor); /** * Default */ @@ -1243,4 +1292,13 @@ public class BrokerController { public ConsumerManageProcessor getConsumerManageProcessor() { return consumerManageProcessor; } + + public MQTTInfoStore getMqttInfoStore() { + return mqttInfoStore; + } + + public MQTTProcessor getMqttProcessor() { + return mqttProcessor; + } + } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/MQTTProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/MQTTProcessor.java new file mode 100644 index 0000000000000000000000000000000000000000..0ebc062cfdd4321902a2ebb702901e9b3505964b --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/MQTTProcessor.java @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.processor; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import java.lang.reflect.Type; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.client.Client; +import org.apache.rocketmq.common.client.Subscription; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.header.mqtt.AddOrUpdateClient2SubscriptionRequestHeader; +import org.apache.rocketmq.common.protocol.header.mqtt.AddOrUpdateClient2SubscriptionResponseHeader; +import org.apache.rocketmq.common.protocol.header.mqtt.AddOrUpdateRootTopic2ClientsRequestHeader; +import org.apache.rocketmq.common.protocol.header.mqtt.AddOrUpdateRootTopic2ClientsResponseHeader; +import org.apache.rocketmq.common.protocol.header.mqtt.ClientUnsubscribeRequestHeader; +import org.apache.rocketmq.common.protocol.header.mqtt.ClientUnsubscribeResponseHeader; +import org.apache.rocketmq.common.protocol.header.mqtt.DeleteClientRequestHeader; +import org.apache.rocketmq.common.protocol.header.mqtt.DeleteClientResponseHeader; +import org.apache.rocketmq.common.protocol.header.mqtt.DeleteRootTopic2ClientRequestHeader; +import org.apache.rocketmq.common.protocol.header.mqtt.DeleteRootTopic2ClientResponseHeader; +import org.apache.rocketmq.common.protocol.header.mqtt.GetClientByClientIdRequestHeader; +import org.apache.rocketmq.common.protocol.header.mqtt.GetClientByClientIdResponseHeader; +import org.apache.rocketmq.common.protocol.header.mqtt.GetRootTopic2ClientsRequestHeader; +import org.apache.rocketmq.common.protocol.header.mqtt.GetRootTopic2ClientsResponseHeader; +import org.apache.rocketmq.common.protocol.header.mqtt.GetSnodeAddress2ClientsRequestHeader; +import org.apache.rocketmq.common.protocol.header.mqtt.GetSnodeAddress2ClientsResponseHeader; +import org.apache.rocketmq.common.protocol.header.mqtt.GetSubscriptionByClientIdRequestHeader; +import org.apache.rocketmq.common.protocol.header.mqtt.GetSubscriptionByClientIdResponseHeader; +import org.apache.rocketmq.common.protocol.header.mqtt.IsClient2SubscriptionPersistedRequestHeader; +import org.apache.rocketmq.common.protocol.header.mqtt.IsClient2SubscriptionPersistedResponseHeader; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.mqtt.client.MQTTSession; +import org.apache.rocketmq.mqtt.constant.MqttConstant; +import org.apache.rocketmq.remoting.RemotingChannel; +import org.apache.rocketmq.remoting.RequestProcessor; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.store.MQTTInfoStore; + +public class MQTTProcessor implements RequestProcessor { + private final BrokerController brokerController; + private static final Gson GSON = new Gson(); + private final MQTTInfoStore mqttInfoStore; + + public MQTTProcessor(final BrokerController brokerController) { + this.brokerController = brokerController; + this.mqttInfoStore = this.brokerController.getMqttInfoStore(); + } + + @Override public boolean rejectRequest() { + return false; + } + + @Override + public RemotingCommand processRequest(RemotingChannel remotingChannel, + RemotingCommand request) throws RemotingCommandException { + + switch (request.getCode()) { + case RequestCode.MQTT_IS_CLIENT2SUBSCRIPTION_PERSISTED: + return this.isClient2SubscriptionPersistedHandler(remotingChannel, request); + case RequestCode.MQTT_ADD_OR_UPDATE_CLIENT2SUBSCRIPTION: + return this.addOrUpdateClient2Subscription(remotingChannel, request); + case RequestCode.MQTT_DELETE_CLIENT2SUBSCRIPTION: + return this.deleteClient2Subscription(request); + case RequestCode.MQTT_GET_SNODEADDRESS2CLIENT: + return this.getSnodeAddress2Clients(request); + case RequestCode.MQTT_CLIENT_UNSUBSRIBE: + return this.clientUnsubscribe(request); + case RequestCode.MQTT_ADD_OR_UPDATE_ROOTTOPIC2CLIENTS: + return this.addorUpdateRootTopic2Clients(request); + case RequestCode.MQTT_GET_ROOTTOPIC2CLIENTS: + return this.getRootTopic2Clients(request); + case RequestCode.MQTT_DELETE_ROOTTOPIC2CLIENT: + return this.deleteRootTopic2Client(request); + case RequestCode.MQTT_GET_SUBSCRIPTION_BY_CLIENT_ID: + return this.getSubscriptionByClientId(request); + case RequestCode.MQTT_GET_CLIENT_BY_CLIENTID_ID: + return this.getClientByClientId(request); + default: + return null; + } + } + + private RemotingCommand getClientByClientId(RemotingCommand request) throws RemotingCommandException { + GetClientByClientIdRequestHeader requestHeader = (GetClientByClientIdRequestHeader) request.decodeCommandCustomHeader(GetClientByClientIdRequestHeader.class); + String clientId = requestHeader.getClientId(); + Client client = GSON.fromJson(this.mqttInfoStore.getValue(clientId + MqttConstant.PERSIST_CLIENT_SUFFIX), MQTTSession.class); + RemotingCommand response = RemotingCommand.createResponseCommand(GetClientByClientIdResponseHeader.class); + GetClientByClientIdResponseHeader responseHeader = (GetClientByClientIdResponseHeader) response.readCustomHeader(); + responseHeader.setClient(client); + return response; + } + + private RemotingCommand getSubscriptionByClientId(RemotingCommand request) throws RemotingCommandException { + GetSubscriptionByClientIdRequestHeader requestHeader = (GetSubscriptionByClientIdRequestHeader) request.decodeCommandCustomHeader(GetSubscriptionByClientIdRequestHeader.class); + String clientId = requestHeader.getClientId(); + Subscription subscription = GSON.fromJson(this.mqttInfoStore.getValue(clientId + MqttConstant.PERSIST_SUBSCRIPTION_SUFFIX), Subscription.class); + RemotingCommand response = RemotingCommand.createResponseCommand(GetSubscriptionByClientIdResponseHeader.class); + GetSubscriptionByClientIdResponseHeader responseHeader = (GetSubscriptionByClientIdResponseHeader) response.readCustomHeader(); + responseHeader.setSubscription(subscription); + return response; + } + + private RemotingCommand isClient2SubscriptionPersistedHandler(final RemotingChannel remotingChannel, + final RemotingCommand request) throws RemotingCommandException { + RemotingCommand response = RemotingCommand.createResponseCommand(IsClient2SubscriptionPersistedResponseHeader.class); + IsClient2SubscriptionPersistedResponseHeader responseHeader = (IsClient2SubscriptionPersistedResponseHeader) response.readCustomHeader(); + IsClient2SubscriptionPersistedRequestHeader requestHeader = (IsClient2SubscriptionPersistedRequestHeader) request.decodeCommandCustomHeader(IsClient2SubscriptionPersistedRequestHeader.class); + + String clientId = requestHeader.getClientId(); + boolean cleanSession = requestHeader.isCleanSession(); + + String clientJson = mqttInfoStore.getValue(clientId + MqttConstant.PERSIST_CLIENT_SUFFIX); + if (clientJson != null) { + MQTTSession client = GSON.fromJson(clientJson, MQTTSession.class); + if (client.isCleanSession() != cleanSession) { + client.setCleanSession(cleanSession); + mqttInfoStore.putData(clientId + MqttConstant.PERSIST_CLIENT_SUFFIX, GSON.toJson(client)); + } + responseHeader.setPersisted(true); + } else { + responseHeader.setPersisted(false); + } + + mqttInfoStore.putData(clientId + MqttConstant.PERSIST_SNODEADDRESS_SUFFIX, remotingChannel.remoteAddress().toString()); + + return response; + } + + private RemotingCommand addOrUpdateClient2Subscription(final RemotingChannel remotingChannel, + final RemotingCommand request) throws RemotingCommandException { + AddOrUpdateClient2SubscriptionRequestHeader requestHeader = (AddOrUpdateClient2SubscriptionRequestHeader) request.decodeCommandCustomHeader(AddOrUpdateClient2SubscriptionRequestHeader.class); + Client client = requestHeader.getClient(); + Subscription subscription = requestHeader.getSubscription(); + + boolean client2SubResult = this.mqttInfoStore.putData(client.getClientId() + MqttConstant.PERSIST_SUBSCRIPTION_SUFFIX, GSON.toJson(subscription)); + boolean client2SnoResult = this.mqttInfoStore.putData(client.getClientId() + MqttConstant.PERSIST_SNODEADDRESS_SUFFIX, remotingChannel.remoteAddress().toString()); + boolean client2EntityResult = this.mqttInfoStore.putData(client.getClientId() + MqttConstant.PERSIST_CLIENT_SUFFIX, GSON.toJson(client)); + + RemotingCommand response = RemotingCommand.createResponseCommand(AddOrUpdateClient2SubscriptionResponseHeader.class); + AddOrUpdateClient2SubscriptionResponseHeader responseHeader = (AddOrUpdateClient2SubscriptionResponseHeader) response.readCustomHeader(); + responseHeader.setResult(client2SubResult && client2SnoResult && client2EntityResult); + return response; + } + + private RemotingCommand deleteClient2Subscription(final RemotingCommand request) throws RemotingCommandException { + DeleteClientRequestHeader requestHeader = (DeleteClientRequestHeader) request.decodeCommandCustomHeader(DeleteClientRequestHeader.class); + String clientId = requestHeader.getClientId(); + String subscriptionJson = this.mqttInfoStore.getValue(clientId + MqttConstant.PERSIST_SUBSCRIPTION_SUFFIX); + Subscription subscription = GSON.fromJson(subscriptionJson, Subscription.class); + boolean operationSuccess = this.mqttInfoStore.deleteData(clientId + MqttConstant.PERSIST_SUBSCRIPTION_SUFFIX) && this.mqttInfoStore.deleteData(clientId + MqttConstant.PERSIST_SNODEADDRESS_SUFFIX) && this.mqttInfoStore.deleteData(clientId + MqttConstant.PERSIST_CLIENT_SUFFIX); + RemotingCommand response = RemotingCommand.createResponseCommand(DeleteClientResponseHeader.class); + DeleteClientResponseHeader responseHeader = (DeleteClientResponseHeader) response.readCustomHeader(); + responseHeader.setOperationSuccess(operationSuccess); + responseHeader.setSubscription(subscription); + return response; + } + + private RemotingCommand getSnodeAddress2Clients(final RemotingCommand request) throws RemotingCommandException { + Map> snodeAddress2Clients = new ConcurrentHashMap<>(); + Set clients = new HashSet<>(); + GetSnodeAddress2ClientsRequestHeader requestHeader = (GetSnodeAddress2ClientsRequestHeader) request.decodeCommandCustomHeader(GetSnodeAddress2ClientsRequestHeader.class); + String topic = requestHeader.getTopic(); + Set clientsId = requestHeader.getClientsId(); + for (String clientId : clientsId) { + ConcurrentHashMap subscriptionTable = GSON.fromJson(this.mqttInfoStore.getValue(clientId + MqttConstant.PERSIST_SUBSCRIPTION_SUFFIX), Subscription.class).getSubscriptionTable(); + for (String topicFilter : subscriptionTable.keySet()) { + if (isMatch(topicFilter, topic)) { + clients.add(GSON.fromJson(this.mqttInfoStore.getValue(clientId + MqttConstant.PERSIST_CLIENT_SUFFIX), MQTTSession.class)); + } + } + } + for (Client client : clients) { + String snodeAddress = this.mqttInfoStore.getValue(client.getClientId() + MqttConstant.PERSIST_SNODEADDRESS_SUFFIX); + Set clientsTmp = snodeAddress2Clients.getOrDefault(snodeAddress, new HashSet<>()); + clientsTmp.add(client); + } + + RemotingCommand response = RemotingCommand.createResponseCommand(GetSnodeAddress2ClientsResponseHeader.class); + GetSnodeAddress2ClientsResponseHeader responseHeader = (GetSnodeAddress2ClientsResponseHeader) response.readCustomHeader(); + responseHeader.setSnodeAddress2Clients(snodeAddress2Clients); + + return response; + } + + private RemotingCommand clientUnsubscribe(final RemotingCommand request) throws RemotingCommandException { + ClientUnsubscribeRequestHeader requestHeader = (ClientUnsubscribeRequestHeader) request.decodeCommandCustomHeader(ClientUnsubscribeRequestHeader.class); + String clientId = requestHeader.getClientId(); + List topics = requestHeader.getTopics(); + Subscription subscription = GSON.fromJson(this.mqttInfoStore.getValue(clientId + MqttConstant.PERSIST_SUBSCRIPTION_SUFFIX), Subscription.class); + ConcurrentHashMap subscriptionTable = subscription.getSubscriptionTable(); + Set rootTopicsBefore = subscriptionTable.keySet().stream().map(t -> t.split(MqttConstant.SUBSCRIPTION_SEPARATOR)[0]).collect(Collectors.toSet()); + for (String topic : topics) { + subscriptionTable.remove(topic); + } + Set rootTopicAfter = subscriptionTable.keySet().stream().map(t -> t.split(MqttConstant.SUBSCRIPTION_SEPARATOR)[0]).collect(Collectors.toSet()); + Set rootTopicsDiff = new HashSet<>(); + rootTopicsDiff.addAll(rootTopicsBefore); + rootTopicsDiff.removeAll(rootTopicAfter); + + subscription.setSubscriptionTable(subscriptionTable); + boolean result = this.mqttInfoStore.putData(clientId + MqttConstant.PERSIST_SUBSCRIPTION_SUFFIX, GSON.toJson(subscription)); + RemotingCommand response = RemotingCommand.createResponseCommand(ClientUnsubscribeResponseHeader.class); + ClientUnsubscribeResponseHeader responseHeader = (ClientUnsubscribeResponseHeader) response.readCustomHeader(); + responseHeader.setOperationSuccess(result); + if (rootTopicsDiff.size() != 0) { + responseHeader.setRootTopicDiffExists(true); + responseHeader.setRootTopicsDiff(rootTopicsDiff); + } + return response; + } + + private RemotingCommand addorUpdateRootTopic2Clients( + final RemotingCommand request) throws RemotingCommandException { + AddOrUpdateRootTopic2ClientsRequestHeader requestHeader = (AddOrUpdateRootTopic2ClientsRequestHeader) request.decodeCommandCustomHeader(AddOrUpdateRootTopic2ClientsRequestHeader.class); + + String rootTopic = requestHeader.getRootTopic(); + String clientId = requestHeader.getClientId(); + String value = this.mqttInfoStore.getValue(rootTopic); + Set clientsId; + + if (value != null) { + clientsId = GSON.fromJson(value, new TypeToken>() { + }.getType()); + } else { + clientsId = new HashSet<>(); + } + clientsId.add(clientId); + + RemotingCommand response = RemotingCommand.createResponseCommand(AddOrUpdateRootTopic2ClientsResponseHeader.class); + AddOrUpdateRootTopic2ClientsResponseHeader responseHeader = (AddOrUpdateRootTopic2ClientsResponseHeader) response.readCustomHeader(); + responseHeader.setOperationSuccess(this.mqttInfoStore.putData(rootTopic, GSON.toJson(clientsId))); + + return response; + } + + private RemotingCommand getRootTopic2Clients(final RemotingCommand request) throws RemotingCommandException { + GetRootTopic2ClientsRequestHeader requestHeader = (GetRootTopic2ClientsRequestHeader) request.decodeCommandCustomHeader(GetRootTopic2ClientsRequestHeader.class); + String rootTopic = requestHeader.getRootTopic(); + String json = this.mqttInfoStore.getValue(rootTopic); + RemotingCommand response = RemotingCommand.createResponseCommand(GetRootTopic2ClientsResponseHeader.class); + GetRootTopic2ClientsResponseHeader responseHeader = (GetRootTopic2ClientsResponseHeader) response.readCustomHeader(); + if (json != null) { + Set clientsId = GSON.fromJson(json, new TypeToken>() { + }.getType()); + responseHeader.setOperationSuccess(true); + responseHeader.setClientsId(clientsId); + } else { + responseHeader.setOperationSuccess(false); + } + + return response; + } + + private RemotingCommand deleteRootTopic2Client(final RemotingCommand request) throws RemotingCommandException { + DeleteRootTopic2ClientRequestHeader requestHeader = (DeleteRootTopic2ClientRequestHeader) request.decodeCommandCustomHeader(DeleteRootTopic2ClientRequestHeader.class); + String rootTopic = requestHeader.getRootTopic(); + String clientId = requestHeader.getClientId(); + Set clientsId = GSON.fromJson(this.mqttInfoStore.getValue(rootTopic), new TypeToken>() { + }.getType()); + Set clientsIdAfterDelete = clientsId.stream().filter(c -> c != clientId).collect(Collectors.toSet()); + boolean result; + if (clientsIdAfterDelete.size() == 0) { + result = this.mqttInfoStore.deleteData(rootTopic); + } else { + result = this.mqttInfoStore.putData(rootTopic, GSON.toJson(clientsIdAfterDelete)); + } + RemotingCommand response = RemotingCommand.createResponseCommand(DeleteRootTopic2ClientResponseHeader.class); + DeleteRootTopic2ClientResponseHeader responseHeader = (DeleteRootTopic2ClientResponseHeader) response.readCustomHeader(); + responseHeader.setOperationSuccess(result); + return response; + } + + private Set clientsStringToClientsSet(String clientsString) { + Set clients = new HashSet<>(); + Type type = new TypeToken>() { + }.getType(); + clients = GSON.fromJson(clientsString, type); + return clients; + } + + private boolean isMatch(String topicFiter, String topic) { + if (!topicFiter.contains("+") && !topicFiter.contains("#")) { + return topicFiter.equals(topic); + } + String[] filterTopics = topicFiter.split("/"); + String[] actualTopics = topic.split("/"); + + int i = 0; + for (; i < filterTopics.length && i < actualTopics.length; i++) { + if ("+".equals(filterTopics[i])) { + continue; + } + if ("#".equals(filterTopics[i])) { + return true; + } + if (!filterTopics[i].equals(actualTopics[i])) { + return false; + } + } + return i == actualTopics.length; + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 32a008c5f37a20b8fdbb0952952e75d18362e231..bb27df6249981244715f1f0f29a7b9d3dde163d1 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -25,8 +25,6 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.common.RemotingUtil; -import static org.apache.rocketmq.common.SnodeConfig.localHostName; - public class BrokerConfig { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); @@ -70,6 +68,11 @@ public class BrokerConfig { private int consumerManageThreadPoolNums = 32; private int heartbeatThreadPoolNums = Math.min(32, Runtime.getRuntime().availableProcessors()); + /** + * thread numbers for mqtt message thread pool. + */ + private int mqttMessageThreadPoolNums = 1; + /** * Thread numbers for EndTransactionProcessor */ @@ -90,6 +93,7 @@ public class BrokerConfig { private int consumerManagerThreadPoolQueueCapacity = 1000000; private int heartbeatThreadPoolQueueCapacity = 50000; private int endTransactionPoolQueueCapacity = 100000; + private int mqttThreadPoolQueueCapacity = 10000; private int filterServerNums = 0; @@ -777,4 +781,20 @@ public class BrokerConfig { public void setAclEnable(boolean aclEnable) { this.aclEnable = aclEnable; } + + public int getMqttMessageThreadPoolNums() { + return mqttMessageThreadPoolNums; + } + + public void setMqttMessageThreadPoolNums(int mqttMessageThreadPoolNums) { + this.mqttMessageThreadPoolNums = mqttMessageThreadPoolNums; + } + + public int getMqttThreadPoolQueueCapacity() { + return mqttThreadPoolQueueCapacity; + } + + public void setMqttThreadPoolQueueCapacity(int mqttThreadPoolQueueCapacity) { + this.mqttThreadPoolQueueCapacity = mqttThreadPoolQueueCapacity; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java index 66c4a406622f7e25b00f20ca6ff2d5426821da35..67f0919ac3af86091d9100c75a8d51c5f2406ff2 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java @@ -181,4 +181,25 @@ public class RequestCode { public static final int CREATE_RETRY_TOPIC = 355; public static final int MQTT_MESSAGE = 1000; + + public static final int MQTT_IS_CLIENT2SUBSCRIPTION_PERSISTED = 1001; + + public static final int MQTT_ADD_OR_UPDATE_CLIENT2SUBSCRIPTION = 1002; + + public static final int MQTT_DELETE_CLIENT2SUBSCRIPTION = 1003; + + public static final int MQTT_GET_SNODEADDRESS2CLIENT = 1004; + + public static final int MQTT_CLIENT_UNSUBSRIBE = 1005; + + public static final int MQTT_ADD_OR_UPDATE_ROOTTOPIC2CLIENTS = 1006; + + public static final int MQTT_GET_ROOTTOPIC2CLIENTS = 1007; + + public static final int MQTT_DELETE_ROOTTOPIC2CLIENT = 1008; + + public static final int MQTT_GET_SUBSCRIPTION_BY_CLIENT_ID = 1009; + + public static final int MQTT_GET_CLIENT_BY_CLIENTID_ID = 1010; + } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/AddOrUpdateClient2SubscriptionRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/AddOrUpdateClient2SubscriptionRequestHeader.java new file mode 100644 index 0000000000000000000000000000000000000000..340fa7a3d73ab25f387ff1395750e4f74e5ce7a3 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/AddOrUpdateClient2SubscriptionRequestHeader.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.header.mqtt; + +import org.apache.rocketmq.common.client.Client; +import org.apache.rocketmq.common.client.Subscription; +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class AddOrUpdateClient2SubscriptionRequestHeader implements CommandCustomHeader { + @CFNotNull + private Client client; + @CFNotNull + private Subscription subscription; + + public Client getClient() { + return client; + } + + public void setClient(Client client) { + this.client = client; + } + + public Subscription getSubscription() { + return subscription; + } + + public void setSubscription(Subscription subscription) { + this.subscription = subscription; + } + + @Override public void checkFields() throws RemotingCommandException { + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/AddOrUpdateClient2SubscriptionResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/AddOrUpdateClient2SubscriptionResponseHeader.java new file mode 100644 index 0000000000000000000000000000000000000000..f0c5962ff9a8f4d536eb8a03945993a880ea602a --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/AddOrUpdateClient2SubscriptionResponseHeader.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.header.mqtt; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class AddOrUpdateClient2SubscriptionResponseHeader implements CommandCustomHeader { + private boolean result; + + public boolean isResult() { + return result; + } + + public void setResult(boolean result) { + this.result = result; + } + + @Override public void checkFields() throws RemotingCommandException { + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/AddOrUpdateRootTopic2ClientsRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/AddOrUpdateRootTopic2ClientsRequestHeader.java new file mode 100644 index 0000000000000000000000000000000000000000..f1425c7fb4049d3ce1f9e21a370f492e19a35aa9 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/AddOrUpdateRootTopic2ClientsRequestHeader.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.header.mqtt; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class AddOrUpdateRootTopic2ClientsRequestHeader implements CommandCustomHeader { + @CFNotNull + private String rootTopic; + @CFNotNull + private String clientId; + + public String getRootTopic() { + return rootTopic; + } + + public void setRootTopic(String rootTopic) { + this.rootTopic = rootTopic; + } + + public String getClientId() { + return clientId; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + @Override public void checkFields() throws RemotingCommandException { + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/AddOrUpdateRootTopic2ClientsResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/AddOrUpdateRootTopic2ClientsResponseHeader.java new file mode 100644 index 0000000000000000000000000000000000000000..ab5fc67d803cd6f4e6ad7c170e2f6bf8bcfbe3e9 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/AddOrUpdateRootTopic2ClientsResponseHeader.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.header.mqtt; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class AddOrUpdateRootTopic2ClientsResponseHeader implements CommandCustomHeader { + private boolean operationSuccess; + + public boolean isOperationSuccess() { + return operationSuccess; + } + + public void setOperationSuccess(boolean operationSuccess) { + this.operationSuccess = operationSuccess; + } + + @Override public void checkFields() throws RemotingCommandException { + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/ClientUnsubscribeRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/ClientUnsubscribeRequestHeader.java new file mode 100644 index 0000000000000000000000000000000000000000..74d56fb481b8058447e871bbe175a27cc62e8136 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/ClientUnsubscribeRequestHeader.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.header.mqtt; + +import java.util.List; +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class ClientUnsubscribeRequestHeader implements CommandCustomHeader { + @CFNotNull + private String clientId; + @CFNotNull + private List topics; + + public String getClientId() { + return clientId; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + public List getTopics() { + return topics; + } + + public void setTopics(List topics) { + this.topics = topics; + } + + @Override public void checkFields() throws RemotingCommandException { + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/ClientUnsubscribeResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/ClientUnsubscribeResponseHeader.java new file mode 100644 index 0000000000000000000000000000000000000000..39b3572da4a7f1714c9a0ae4f304d4b0e934b455 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/ClientUnsubscribeResponseHeader.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.header.mqtt; + +import java.util.Set; +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class ClientUnsubscribeResponseHeader implements CommandCustomHeader { + private boolean operationSuccess; + private boolean rootTopicDiffExists; + private Set rootTopicsDiff; + + public boolean isOperationSuccess() { + return operationSuccess; + } + + public void setOperationSuccess(boolean operationSuccess) { + this.operationSuccess = operationSuccess; + } + + public boolean isRootTopicDiffExists() { + return rootTopicDiffExists; + } + + public void setRootTopicDiffExists(boolean rootTopicDiffExists) { + this.rootTopicDiffExists = rootTopicDiffExists; + } + + public Set getRootTopicsDiff() { + return rootTopicsDiff; + } + + public void setRootTopicsDiff(Set rootTopicsDiff) { + this.rootTopicsDiff = rootTopicsDiff; + } + + @Override public void checkFields() throws RemotingCommandException { + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/DeleteClientRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/DeleteClientRequestHeader.java new file mode 100644 index 0000000000000000000000000000000000000000..2136667637329f8d194faf58583cd5679e50e923 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/DeleteClientRequestHeader.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.header.mqtt; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class DeleteClientRequestHeader implements CommandCustomHeader { + @CFNotNull + private String clientId; + + public String getClientId() { + return clientId; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + @Override public void checkFields() throws RemotingCommandException { + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/DeleteClientResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/DeleteClientResponseHeader.java new file mode 100644 index 0000000000000000000000000000000000000000..ecaa95b807124332589e77d54e38f98d334141ce --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/DeleteClientResponseHeader.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.header.mqtt; + +import org.apache.rocketmq.common.client.Subscription; +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class DeleteClientResponseHeader implements CommandCustomHeader { + private Subscription subscription; + private boolean operationSuccess; + + public Subscription getSubscription() { + return subscription; + } + + public void setSubscription(Subscription subscription) { + this.subscription = subscription; + } + + public boolean isOperationSuccess() { + return operationSuccess; + } + + public void setOperationSuccess(boolean operationSuccess) { + this.operationSuccess = operationSuccess; + } + + @Override public void checkFields() throws RemotingCommandException { + + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/DeleteRootTopic2ClientRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/DeleteRootTopic2ClientRequestHeader.java new file mode 100644 index 0000000000000000000000000000000000000000..a94ef5c9d49ecca2eed6742dc005dd5eeec355c5 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/DeleteRootTopic2ClientRequestHeader.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.header.mqtt; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class DeleteRootTopic2ClientRequestHeader implements CommandCustomHeader { + @CFNotNull + private String rootTopic; + @CFNotNull + private String clientId; + + public String getRootTopic() { + return rootTopic; + } + + public void setRootTopic(String rootTopic) { + this.rootTopic = rootTopic; + } + + public String getClientId() { + return clientId; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + @Override public void checkFields() throws RemotingCommandException { + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/DeleteRootTopic2ClientResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/DeleteRootTopic2ClientResponseHeader.java new file mode 100644 index 0000000000000000000000000000000000000000..8ee4dc9eb6d7bb0c00e5c46e657108fcc03ad7a4 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/DeleteRootTopic2ClientResponseHeader.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.header.mqtt; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class DeleteRootTopic2ClientResponseHeader implements CommandCustomHeader { + private boolean operationSuccess; + + public boolean isOperationSuccess() { + return operationSuccess; + } + + public void setOperationSuccess(boolean operationSuccess) { + this.operationSuccess = operationSuccess; + } + + @Override public void checkFields() throws RemotingCommandException { + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetClientByClientIdRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetClientByClientIdRequestHeader.java new file mode 100644 index 0000000000000000000000000000000000000000..cf9724259b55864434e36995351f85bd28b70dc9 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetClientByClientIdRequestHeader.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.header.mqtt; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class GetClientByClientIdRequestHeader implements CommandCustomHeader { + @CFNotNull + private String clientId; + + public String getClientId() { + return clientId; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + @Override public void checkFields() throws RemotingCommandException { + + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetClientByClientIdResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetClientByClientIdResponseHeader.java new file mode 100644 index 0000000000000000000000000000000000000000..9326bf61fd30b5a8c0330ce131e2807a51490019 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetClientByClientIdResponseHeader.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.header.mqtt; + +import org.apache.rocketmq.common.client.Client; +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class GetClientByClientIdResponseHeader implements CommandCustomHeader { + private Client client; + + public Client getClient() { + return client; + } + + public void setClient(Client client) { + this.client = client; + } + + @Override public void checkFields() throws RemotingCommandException { + + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetRootTopic2ClientsRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetRootTopic2ClientsRequestHeader.java new file mode 100644 index 0000000000000000000000000000000000000000..ed708c6a63722e2d024fdaf4a10bf6e7ea1911dc --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetRootTopic2ClientsRequestHeader.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.header.mqtt; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class GetRootTopic2ClientsRequestHeader implements CommandCustomHeader { + @CFNotNull + private String rootTopic; + + public String getRootTopic() { + return rootTopic; + } + + public void setRootTopic(String rootTopic) { + this.rootTopic = rootTopic; + } + + @Override public void checkFields() throws RemotingCommandException { + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetRootTopic2ClientsResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetRootTopic2ClientsResponseHeader.java new file mode 100644 index 0000000000000000000000000000000000000000..96ca54db8021bce4f52ed11af35116de83d43701 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetRootTopic2ClientsResponseHeader.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.header.mqtt; + +import java.util.Set; +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class GetRootTopic2ClientsResponseHeader implements CommandCustomHeader { + private boolean operationSuccess; + private Set clientsId; + + public boolean isOperationSuccess() { + return operationSuccess; + } + + public Set getClientsId() { + return clientsId; + } + + public void setClientsId(Set clientsId) { + this.clientsId = clientsId; + } + + public void setOperationSuccess(boolean operationSuccess) { + this.operationSuccess = operationSuccess; + } + + @Override public void checkFields() throws RemotingCommandException { + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetSnodeAddress2ClientsRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetSnodeAddress2ClientsRequestHeader.java new file mode 100644 index 0000000000000000000000000000000000000000..3f00c08b9d85e9f25bb7c5e6912ebe913ee423c9 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetSnodeAddress2ClientsRequestHeader.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.header.mqtt; + +import java.util.Set; +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class GetSnodeAddress2ClientsRequestHeader implements CommandCustomHeader { + @CFNotNull + private Set clientsId; + @CFNotNull + private String topic; + + public Set getClientsId() { + return clientsId; + } + + public void setClientsId(Set clientsId) { + this.clientsId = clientsId; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + @Override public void checkFields() throws RemotingCommandException { + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetSnodeAddress2ClientsResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetSnodeAddress2ClientsResponseHeader.java new file mode 100644 index 0000000000000000000000000000000000000000..63ee2c2daee1d9e87c443a08b8478816d925238e --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetSnodeAddress2ClientsResponseHeader.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.header.mqtt; + +import java.util.Map; +import java.util.Set; +import org.apache.rocketmq.common.client.Client; +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class GetSnodeAddress2ClientsResponseHeader implements CommandCustomHeader { + private Map> snodeAddress2Clients; + + public Map> getSnodeAddress2Clients() { + return snodeAddress2Clients; + } + + public void setSnodeAddress2Clients( + Map> snodeAddress2Clients) { + this.snodeAddress2Clients = snodeAddress2Clients; + } + + @Override public void checkFields() throws RemotingCommandException { + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetSubscriptionByClientIdRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetSubscriptionByClientIdRequestHeader.java new file mode 100644 index 0000000000000000000000000000000000000000..dc0ecf6062353198ac6191b394edf077772e22fd --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetSubscriptionByClientIdRequestHeader.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.header.mqtt; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class GetSubscriptionByClientIdRequestHeader implements CommandCustomHeader { + @CFNotNull + private String clientId; + + public String getClientId() { + return clientId; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + @Override public void checkFields() throws RemotingCommandException { + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetSubscriptionByClientIdResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetSubscriptionByClientIdResponseHeader.java new file mode 100644 index 0000000000000000000000000000000000000000..4e114723aeb697a066ecbf802b696ae28919f148 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetSubscriptionByClientIdResponseHeader.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.header.mqtt; + +import org.apache.rocketmq.common.client.Subscription; +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class GetSubscriptionByClientIdResponseHeader implements CommandCustomHeader { + private Subscription subscription; + + public Subscription getSubscription() { + return subscription; + } + + public void setSubscription(Subscription subscription) { + this.subscription = subscription; + } + + @Override public void checkFields() throws RemotingCommandException { + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/IsClient2SubscriptionPersistedRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/IsClient2SubscriptionPersistedRequestHeader.java new file mode 100644 index 0000000000000000000000000000000000000000..c835f4bd11d2668d68a5774e9348cb8843a6a638 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/IsClient2SubscriptionPersistedRequestHeader.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.header.mqtt; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class IsClient2SubscriptionPersistedRequestHeader implements CommandCustomHeader { + @CFNotNull + private String clientId; + private boolean cleanSession; + + public String getClientId() { + return clientId; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + public boolean isCleanSession() { + return cleanSession; + } + + public void setCleanSession(boolean cleanSession) { + this.cleanSession = cleanSession; + } + + @Override public void checkFields() throws RemotingCommandException { + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/IsClient2SubscriptionPersistedResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/IsClient2SubscriptionPersistedResponseHeader.java new file mode 100644 index 0000000000000000000000000000000000000000..2586bd064adbbfa0eb7466dec4cb3329abffcf5d --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/IsClient2SubscriptionPersistedResponseHeader.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.header.mqtt; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class IsClient2SubscriptionPersistedResponseHeader implements CommandCustomHeader { + private boolean isPersisted; + + public boolean isPersisted() { + return isPersisted; + } + + public void setPersisted(boolean persisted) { + isPersisted = persisted; + } + + @Override public void checkFields() throws RemotingCommandException { + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/service/EnodeService.java b/common/src/main/java/org/apache/rocketmq/common/service/EnodeService.java index 259ca95fc4faf892c5947ed54482f8634db2bce3..9575580e53302786c9dc78a77c045897afb87b8f 100644 --- a/common/src/main/java/org/apache/rocketmq/common/service/EnodeService.java +++ b/common/src/main/java/org/apache/rocketmq/common/service/EnodeService.java @@ -137,4 +137,10 @@ public interface EnodeService { RemotingCommand unlockBatchMQ(final RemotingChannel remotingChannel, final RemotingCommand remotingCommand) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException; + + RemotingCommand requestMQTTInfoSync(final RemotingCommand request) + throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException; + + CompletableFuture requestMQTTInfoAsync(final RemotingCommand request) + throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException; } diff --git a/mqtt/pom.xml b/mqtt/pom.xml index eee314e48d5da096d286690c37be92b6e52291f9..17a986c762f6fc8ac203827abe2ae72b80569383 100644 --- a/mqtt/pom.xml +++ b/mqtt/pom.xml @@ -95,10 +95,6 @@ io.prometheus simpleclient_hotspot - - ${project.groupId} - rocketmq-broker - org.eclipse.paho org.eclipse.paho.client.mqttv3 diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/constant/MqttConstant.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/constant/MqttConstant.java index 3d53f5fd48fd706509aa2cca4d9d2af20f41047d..d564c8ca47e6eb407835c763b4d09426959ea377 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/constant/MqttConstant.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/constant/MqttConstant.java @@ -30,4 +30,8 @@ public class MqttConstant { public static final int FLIGHT_BEFORE_RESEND_MS = 5_000; public static final String PROPERTY_MQTT_QOS = "PROPERTY_MQTT_QOS"; public static final AttributeKey MQTT_CLIENT_ATTRIBUTE_KEY = AttributeKey.valueOf("mqtt.client"); + public static final String ENODE_NAME = "enodeName"; + public static final String PERSIST_SUBSCRIPTION_SUFFIX = "-sub"; + public static final String PERSIST_SNODEADDRESS_SUFFIX = "-sno"; + public static final String PERSIST_CLIENT_SUFFIX = "-cli"; } diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttSubscribeMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttSubscribeMessageHandler.java index d55400aabe357faddac3fa5180a618d34f4b82bd..9a4252aa8b8f2c77078b60ab9b523aba0db315b6 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttSubscribeMessageHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttSubscribeMessageHandler.java @@ -150,6 +150,8 @@ public class MqttSubscribeMessageHandler implements MessageHandler { } } //TODO update persistent store of topic2Clients and clientId2Subscription + this.defaultMqttMessageProcessor.getPersistService().addOrUpdateClient2Susbscription(client, subscription); + return grantQoss; } diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/persistence/PersistServiceFactory.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/persistence/PersistServiceFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..907d29543518400c2465041b3d23ea69a30a3df3 --- /dev/null +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/persistence/PersistServiceFactory.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.mqtt.persistence; + +import java.util.Map; +import org.apache.rocketmq.mqtt.persistence.service.PersistService; +import org.apache.rocketmq.remoting.util.ServiceProvider; + +public class PersistServiceFactory { + private static PersistServiceFactory instance = new PersistServiceFactory(); + + public static PersistServiceFactory getInstance() { + return instance; + } + + private PersistServiceFactory() { + } + + private static Map paths; + + private static final String SERVICE_LOCATION = "META-INF/service/org.apache.rocketmq.mqtt.PersistService"; + + static { + paths = ServiceProvider.loadPath(SERVICE_LOCATION); + } + + public PersistService createPersistService() { + return ServiceProvider.createInstance(paths.get("persistService"), PersistService.class); + } +} diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/persistence/rebalance/AllocatePersistentDataConsistentHash.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/persistence/rebalance/AllocatePersistentDataConsistentHash.java new file mode 100644 index 0000000000000000000000000000000000000000..a432642b4ec043647b00f1dda8cd0e13aa79bbb2 --- /dev/null +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/persistence/rebalance/AllocatePersistentDataConsistentHash.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.mqtt.persistence.rebalance; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Set; +import org.apache.rocketmq.common.consistenthash.ConsistentHashRouter; +import org.apache.rocketmq.common.consistenthash.HashFunction; +import org.apache.rocketmq.common.consistenthash.Node; + +public class AllocatePersistentDataConsistentHash implements AllocatePersistentDataStrategy { + private final int virtualNodeCnt; + private final HashFunction customHashFunction; + + public AllocatePersistentDataConsistentHash() { + this(10); + } + + public AllocatePersistentDataConsistentHash(int virtualNodeCnt) { + this(virtualNodeCnt, null); + } + + public AllocatePersistentDataConsistentHash(int virtualNodeCnt, HashFunction customHashFunction) { + if (virtualNodeCnt < 0) { + throw new IllegalArgumentException("illegal virtualNodeCnt :" + virtualNodeCnt); + } + this.virtualNodeCnt = virtualNodeCnt; + this.customHashFunction = customHashFunction; + } + + @Override + public String allocate(String dataKey, Set enodeNames) { + Collection cidNodes = new ArrayList(); + for (String enodeName : enodeNames) { + cidNodes.add(new ClientNode(enodeName)); + } + final ConsistentHashRouter router; //for building hash ring + if (customHashFunction != null) { + router = new ConsistentHashRouter(cidNodes, virtualNodeCnt, customHashFunction); + } else { + router = new ConsistentHashRouter(cidNodes, virtualNodeCnt); + } + + ClientNode clientNode = router.routeNode(dataKey); + if (clientNode != null) { + return clientNode.getKey(); + } + return null; + } + + @Override + public String getName() { + return "CONSISTENT_HASH"; + } + + private static class ClientNode implements Node { + private final String clientID; + + public ClientNode(String clientID) { + this.clientID = clientID; + } + + @Override + public String getKey() { + return clientID; + } + } +} diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/persistence/rebalance/AllocatePersistentDataStrategy.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/persistence/rebalance/AllocatePersistentDataStrategy.java new file mode 100644 index 0000000000000000000000000000000000000000..9ebc92203426fa67b273fffa3121f68e7d19273f --- /dev/null +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/persistence/rebalance/AllocatePersistentDataStrategy.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.mqtt.persistence.rebalance; + +import java.util.Set; + +public interface AllocatePersistentDataStrategy { + String allocate(final String dataKey, final Set enodeNames); + + String getName(); + +} diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/persistence/service/DefaultPersistService.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/persistence/service/DefaultPersistService.java new file mode 100644 index 0000000000000000000000000000000000000000..bca9750456cf0bea9861a3846baf676f279de5be --- /dev/null +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/persistence/service/DefaultPersistService.java @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.mqtt.persistence.service; + +import com.google.gson.Gson; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.stream.Collectors; +import org.apache.rocketmq.common.client.Client; +import org.apache.rocketmq.common.client.Subscription; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.header.mqtt.AddOrUpdateClient2SubscriptionRequestHeader; +import org.apache.rocketmq.common.protocol.header.mqtt.AddOrUpdateClient2SubscriptionResponseHeader; +import org.apache.rocketmq.common.protocol.header.mqtt.AddOrUpdateRootTopic2ClientsRequestHeader; +import org.apache.rocketmq.common.protocol.header.mqtt.AddOrUpdateRootTopic2ClientsResponseHeader; +import org.apache.rocketmq.common.protocol.header.mqtt.ClientUnsubscribeRequestHeader; +import org.apache.rocketmq.common.protocol.header.mqtt.ClientUnsubscribeResponseHeader; +import org.apache.rocketmq.common.protocol.header.mqtt.DeleteClientRequestHeader; +import org.apache.rocketmq.common.protocol.header.mqtt.DeleteClientResponseHeader; +import org.apache.rocketmq.common.protocol.header.mqtt.DeleteRootTopic2ClientRequestHeader; +import org.apache.rocketmq.common.protocol.header.mqtt.DeleteRootTopic2ClientResponseHeader; +import org.apache.rocketmq.common.protocol.header.mqtt.GetClientByClientIdRequestHeader; +import org.apache.rocketmq.common.protocol.header.mqtt.GetClientByClientIdResponseHeader; +import org.apache.rocketmq.common.protocol.header.mqtt.GetRootTopic2ClientsRequestHeader; +import org.apache.rocketmq.common.protocol.header.mqtt.GetRootTopic2ClientsResponseHeader; +import org.apache.rocketmq.common.protocol.header.mqtt.GetSnodeAddress2ClientsRequestHeader; +import org.apache.rocketmq.common.protocol.header.mqtt.GetSnodeAddress2ClientsResponseHeader; +import org.apache.rocketmq.common.protocol.header.mqtt.GetSubscriptionByClientIdRequestHeader; +import org.apache.rocketmq.common.protocol.header.mqtt.GetSubscriptionByClientIdResponseHeader; +import org.apache.rocketmq.common.protocol.header.mqtt.IsClient2SubscriptionPersistedRequestHeader; +import org.apache.rocketmq.common.protocol.header.mqtt.IsClient2SubscriptionPersistedResponseHeader; +import org.apache.rocketmq.common.service.EnodeService; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.mqtt.client.MQTTSession; +import org.apache.rocketmq.mqtt.constant.MqttConstant; +import org.apache.rocketmq.mqtt.persistence.rebalance.AllocatePersistentDataConsistentHash; +import org.apache.rocketmq.mqtt.persistence.rebalance.AllocatePersistentDataStrategy; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +public class DefaultPersistService implements PersistService { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); + private DefaultMqttMessageProcessor defaultMqttMessageProcessor; + private EnodeService enodeService; + private AllocatePersistentDataStrategy allocatePersistentDataStrategy; + + private final Gson GSON = new Gson(); + + public DefaultPersistService() { + } + + @Override + public void init(DefaultMqttMessageProcessor defaultMqttMessageProcessor) { + this.defaultMqttMessageProcessor = defaultMqttMessageProcessor; + this.enodeService = defaultMqttMessageProcessor.getEnodeService(); + this.allocatePersistentDataStrategy = new AllocatePersistentDataConsistentHash(); + } + + @Override public boolean isClient2SubsriptionPersisted(Client client) { + + String clientId = client.getClientId(); + String enodeName = this.getAllocateEnodeName(clientId); + boolean cleanSession = ((MQTTSession) client).isCleanSession(); + + IsClient2SubscriptionPersistedRequestHeader requestHeader = new IsClient2SubscriptionPersistedRequestHeader(); + requestHeader.setClientId(clientId); + requestHeader.setCleanSession(cleanSession); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.MQTT_IS_CLIENT2SUBSCRIPTION_PERSISTED, requestHeader); + request.addExtField(MqttConstant.ENODE_NAME, enodeName); + + try { + RemotingCommand response = enodeService.requestMQTTInfoSync(request); + IsClient2SubscriptionPersistedResponseHeader responseHeader = (IsClient2SubscriptionPersistedResponseHeader) response.decodeCommandCustomHeader(IsClient2SubscriptionPersistedResponseHeader.class); + return responseHeader.isPersisted(); + } catch (Exception e) { + log.error("Transfer MQTT info to Enode: {} failed, Err: {} ", enodeName, e); + } + return false; + } + + @Override public boolean addOrUpdateClient2Susbscription(Client client, Subscription subscription) { + // client2Subscription request + boolean client2SubscriptionResult = false; + String enodeName = this.getAllocateEnodeName(client.getClientId()); + AddOrUpdateClient2SubscriptionRequestHeader requestHeader = new AddOrUpdateClient2SubscriptionRequestHeader(); + requestHeader.setClient(client); + requestHeader.setSubscription(subscription); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.MQTT_ADD_OR_UPDATE_CLIENT2SUBSCRIPTION, requestHeader); + request.addExtField(MqttConstant.ENODE_NAME, enodeName); + + try { + RemotingCommand response = enodeService.requestMQTTInfoSync(request); + AddOrUpdateClient2SubscriptionResponseHeader responseHeader = (AddOrUpdateClient2SubscriptionResponseHeader) response.decodeCommandCustomHeader(AddOrUpdateClient2SubscriptionResponseHeader.class); + client2SubscriptionResult = responseHeader.isResult(); + } catch (Exception e) { + log.error("Transfer MQTT info to Enode: {} failed, Err: {} ", enodeName, e); + } + + // rootTopic2Clients request + boolean rootTopic2ClientsResult = true; + for (String rootTopic : subscription.getSubscriptionTable().keySet().stream().map(t -> t.split(MqttConstant.SUBSCRIPTION_SEPARATOR)[0]).distinct().collect(Collectors.toList())) { + String enodeNameForRootTopic = this.getAllocateEnodeName(rootTopic); + AddOrUpdateRootTopic2ClientsRequestHeader addOrUpdateRootTopic2ClientsRequestHeader = new AddOrUpdateRootTopic2ClientsRequestHeader(); + addOrUpdateRootTopic2ClientsRequestHeader.setRootTopic(rootTopic); + addOrUpdateRootTopic2ClientsRequestHeader.setClientId(client.getClientId()); + RemotingCommand requestForRootTopic = RemotingCommand.createRequestCommand(RequestCode.MQTT_ADD_OR_UPDATE_ROOTTOPIC2CLIENTS, addOrUpdateRootTopic2ClientsRequestHeader); + requestForRootTopic.addExtField(MqttConstant.ENODE_NAME, enodeNameForRootTopic); + try { + AddOrUpdateRootTopic2ClientsResponseHeader responseHeader = (AddOrUpdateRootTopic2ClientsResponseHeader) enodeService.requestMQTTInfoSync(requestForRootTopic).decodeCommandCustomHeader(AddOrUpdateRootTopic2ClientsResponseHeader.class); + rootTopic2ClientsResult = rootTopic2ClientsResult && responseHeader.isOperationSuccess(); + } catch (Exception ex) { + log.error("Transfer MQTT rootTopic2Clients info to Enode: {} failed, Err: {} ", enodeName, ex); + } + } + + return rootTopic2ClientsResult && client2SubscriptionResult; + } + + @Override public boolean deleteClient(Client client) { + // delete client2subscription and client2snodeAddress + DeleteClientRequestHeader deleteClientRequestHeader = new DeleteClientRequestHeader(); + deleteClientRequestHeader.setClientId(client.getClientId()); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.MQTT_DELETE_CLIENT2SUBSCRIPTION, deleteClientRequestHeader); + String enodeName = this.getAllocateEnodeName(client.getClientId()); + request.addExtField(MqttConstant.ENODE_NAME, enodeName); + RemotingCommand response = null; + try { + response = this.enodeService.requestMQTTInfoSync(request); + } catch (Exception e) { + log.error("Transfer MQTT rootTopic2Clients info to Enode: {} failed, Err: {} ", enodeName, e); + } + + // delete rootTopic2Clients + if (response != null) { + boolean client2SubResult; + try { + DeleteClientResponseHeader deleteClientResponseHeader = (DeleteClientResponseHeader) response.decodeCommandCustomHeader(DeleteClientResponseHeader.class); + client2SubResult = deleteClientResponseHeader.isOperationSuccess(); + Subscription subscription = deleteClientResponseHeader.getSubscription(); + boolean rootTopic2ClientsResult = true; + for (String rootTopic : subscription.getSubscriptionTable().keySet().stream().map(t -> t.split(MqttConstant.SUBSCRIPTION_SEPARATOR)[0]).distinct().collect(Collectors.toList())) { + String enodeNameForRootTopic = this.getAllocateEnodeName(rootTopic); + DeleteRootTopic2ClientRequestHeader deleteRootTopic2ClientRequestHeader = new DeleteRootTopic2ClientRequestHeader(); + deleteRootTopic2ClientRequestHeader.setClientId(client.getClientId()); + deleteRootTopic2ClientRequestHeader.setRootTopic(rootTopic); + request = RemotingCommand.createRequestCommand(RequestCode.MQTT_DELETE_ROOTTOPIC2CLIENT, deleteRootTopic2ClientRequestHeader); + request.addExtField(MqttConstant.ENODE_NAME, enodeNameForRootTopic); + try { + DeleteRootTopic2ClientResponseHeader deleteRootTopic2ClientResponseHeader = (DeleteRootTopic2ClientResponseHeader) enodeService.requestMQTTInfoSync(request).decodeCommandCustomHeader(DeleteClientResponseHeader.class); + rootTopic2ClientsResult = rootTopic2ClientsResult && deleteRootTopic2ClientResponseHeader.isOperationSuccess(); + } catch (Exception ex) { + log.error("Transfer MQTT rootTopic2Clients info to Enode: {} failed, Err: {} ", enodeName, ex); + } + } + return client2SubResult && rootTopic2ClientsResult; + } catch (Exception e) { + log.error("Decode deleteClient response header failed, error:{}", e); + } + } + + return false; + } + + @Override + public Map> getSnodeAddress2Clients(String topic) { + final Map> snodeAddress2Clients = new ConcurrentHashMap<>(); + // step1: get rootTopic2Clients + String rootTopic = topic.split(MqttConstant.SUBSCRIPTION_SEPARATOR)[0]; + GetRootTopic2ClientsRequestHeader requestHeader = new GetRootTopic2ClientsRequestHeader(); + requestHeader.setRootTopic(rootTopic); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.MQTT_GET_ROOTTOPIC2CLIENTS, requestHeader); + String enodeName = this.getAllocateEnodeName(rootTopic); + request.addExtField(MqttConstant.ENODE_NAME, enodeName); + try { + RemotingCommand response = this.enodeService.requestMQTTInfoSync(request); + GetRootTopic2ClientsResponseHeader responseHeader = (GetRootTopic2ClientsResponseHeader) response.decodeCommandCustomHeader(GetRootTopic2ClientsResponseHeader.class); + if (responseHeader.isOperationSuccess()) { + + Set clientsId = responseHeader.getClientsId(); + HashMap> enodeName2ClientsIdSet = new HashMap<>(); + for (String clientId : clientsId) { + String enodeNameTmp = this.getAllocateEnodeName(clientId); + if (enodeName2ClientsIdSet.get(enodeNameTmp) == null) { + Set clientsIdTmp = new HashSet<>(); + clientsIdTmp.add(clientId); + enodeName2ClientsIdSet.put(enodeNameTmp, clientsIdTmp); + } else { + enodeName2ClientsIdSet.get(enodeNameTmp).add(clientId); + } + } + // step2: get snodeAddress2ClientsId + final CountDownLatch countDownLatch = new CountDownLatch(enodeName2ClientsIdSet.size()); + for (String enodeNameToSend : enodeName2ClientsIdSet.keySet()) { + GetSnodeAddress2ClientsRequestHeader getSnodeAddress2ClientsRequestHeader = new GetSnodeAddress2ClientsRequestHeader(); + getSnodeAddress2ClientsRequestHeader.setClientsId(enodeName2ClientsIdSet.get(enodeNameToSend)); + getSnodeAddress2ClientsRequestHeader.setTopic(topic); + RemotingCommand requestToSend = RemotingCommand.createRequestCommand(RequestCode.MQTT_GET_SNODEADDRESS2CLIENT, getSnodeAddress2ClientsRequestHeader); + requestToSend.addExtField(MqttConstant.ENODE_NAME, enodeNameToSend); + CompletableFuture responseFuture = this.enodeService.requestMQTTInfoAsync(requestToSend); + responseFuture.whenComplete((data, ex) -> { + if (ex == null) { + try { + GetSnodeAddress2ClientsResponseHeader getSnodeAddress2ClientsResponseHeader = (GetSnodeAddress2ClientsResponseHeader) data.decodeCommandCustomHeader(GetSnodeAddress2ClientsResponseHeader.class); + Map> snodeAddress2ClientsTmp = getSnodeAddress2ClientsResponseHeader.getSnodeAddress2Clients(); + for (String snodeAddress : snodeAddress2ClientsTmp.keySet()) { + snodeAddress2Clients.getOrDefault(snodeAddress, new HashSet<>()).addAll(snodeAddress2ClientsTmp.get(snodeAddress)); + } + } catch (Exception e) { + log.error("Transfer MQTT snodeAddress2Clients info to Enode: {} failed, Err: {} ", enodeNameToSend, e); + } + + } + countDownLatch.countDown(); + }); + } + countDownLatch.await(); + + } + } catch (Exception e) { + log.error("Transfer MQTT rootTopic2Clients info to Enode: {} failed, Err: {} ", enodeName, e); + } + return snodeAddress2Clients; + } + + @Override public boolean clientUnsubscribe(Client client, List topics) { + boolean result = false; + // step1: delete client2sub + ClientUnsubscribeRequestHeader requestHeader = new ClientUnsubscribeRequestHeader(); + requestHeader.setClientId(client.getClientId()); + requestHeader.setTopics(topics); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.MQTT_CLIENT_UNSUBSRIBE, requestHeader); + request.addExtField(MqttConstant.ENODE_NAME, this.getAllocateEnodeName(client.getClientId())); + try { + RemotingCommand response = this.enodeService.requestMQTTInfoSync(request); + ClientUnsubscribeResponseHeader responseHeader = (ClientUnsubscribeResponseHeader) response.decodeCommandCustomHeader(ClientUnsubscribeResponseHeader.class); + result = responseHeader.isOperationSuccess(); + // step2: delete rootTopic2Clients + if (responseHeader.isRootTopicDiffExists()) { + Set rootTopicsDiff = responseHeader.getRootTopicsDiff(); + for (String rootTopic : rootTopicsDiff) { + DeleteRootTopic2ClientRequestHeader deleteRootTopic2ClientRequestHeader = new DeleteRootTopic2ClientRequestHeader(); + deleteRootTopic2ClientRequestHeader.setRootTopic(rootTopic); + deleteRootTopic2ClientRequestHeader.setClientId(client.getClientId()); + RemotingCommand requestForDeleteRootTopic = RemotingCommand.createRequestCommand(RequestCode.MQTT_DELETE_ROOTTOPIC2CLIENT, deleteRootTopic2ClientRequestHeader); + requestForDeleteRootTopic.addExtField(MqttConstant.ENODE_NAME, this.getAllocateEnodeName(rootTopic)); + try { + this.enodeService.requestMQTTInfoSync(requestForDeleteRootTopic); + } catch (Exception ex) { + log.error("Transfer MQTT rootTopic2Clients info failed, Err: {} ", ex); + } + } + } + } catch (Exception e) { + log.error("Transfer MQTT rootTopic2Clients info failed, Err: {} ", e); + } + + return result; + } + + @Override public Subscription getSubscriptionByClientId(String clientId) { + GetSubscriptionByClientIdRequestHeader requestHeader = new GetSubscriptionByClientIdRequestHeader(); + requestHeader.setClientId(clientId); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.MQTT_GET_SUBSCRIPTION_BY_CLIENT_ID, requestHeader); + request.addExtField(MqttConstant.ENODE_NAME, this.getAllocateEnodeName(clientId)); + try { + RemotingCommand response = this.enodeService.requestMQTTInfoSync(request); + GetSubscriptionByClientIdResponseHeader responseHeader = (GetSubscriptionByClientIdResponseHeader) response.decodeCommandCustomHeader(GetSubscriptionByClientIdResponseHeader.class); + return responseHeader.getSubscription(); + } catch (Exception e) { + log.error("Get Subscription failed, error: {}", e); + } + return null; + } + + @Override public Client getClientByClientId(String clientId) { + GetClientByClientIdRequestHeader requestHeader = new GetClientByClientIdRequestHeader(); + requestHeader.setClientId(clientId); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.MQTT_GET_CLIENT_BY_CLIENTID_ID, requestHeader); + request.addExtField(MqttConstant.ENODE_NAME, this.getAllocateEnodeName(clientId)); + try { + RemotingCommand response = this.enodeService.requestMQTTInfoSync(request); + GetClientByClientIdResponseHeader responseHeader = (GetClientByClientIdResponseHeader) response.decodeCommandCustomHeader(GetClientByClientIdResponseHeader.class); + return responseHeader.getClient(); + } catch (Exception e) { + log.error("Get Client failed, error: {}", e); + } + return null; + } + + private String getAllocateEnodeName(String key) { + String clusterName = defaultMqttMessageProcessor.getSnodeConfig().getClusterName(); + Set enodeNames = defaultMqttMessageProcessor.getNnodeService().getEnodeClusterInfo(clusterName); + String enodeName = allocatePersistentDataStrategy.allocate(key, enodeNames); + return enodeName; + } +} diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/persistence/service/PersistService.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/persistence/service/PersistService.java new file mode 100644 index 0000000000000000000000000000000000000000..f25a9a2d640050ed8aaccc1a5c3610118fe7c269 --- /dev/null +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/persistence/service/PersistService.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.mqtt.persistence.service; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.rocketmq.common.client.Client; +import org.apache.rocketmq.common.client.Subscription; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; + +public interface PersistService { + /** + * Init Persist Service + * @param processor MQTT messages processor + */ + void init(DefaultMqttMessageProcessor processor); + + boolean isClient2SubsriptionPersisted(Client client); + + boolean addOrUpdateClient2Susbscription(Client client, Subscription subscription); + + boolean deleteClient(Client client); + + Map> getSnodeAddress2Clients(String topic); + + boolean clientUnsubscribe(Client client, List topics); + + Subscription getSubscriptionByClientId(String clientId); + + Client getClientByClientId(String clientId); +} diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java index 0ccb3545e2d1e5c5cea2af0385cbf54d6b64a18c..17d968b7d53b7fd815bdb4f570a20b9d4a07c9c1 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java @@ -55,6 +55,8 @@ import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPubrecMessageHandler; import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPubrelMessageHandler; import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttSubscribeMessageHandler; import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttUnsubscribeMessagHandler; +import org.apache.rocketmq.mqtt.persistence.PersistServiceFactory; +import org.apache.rocketmq.mqtt.persistence.service.PersistService; import org.apache.rocketmq.mqtt.service.WillMessageService; import org.apache.rocketmq.mqtt.service.impl.MqttScheduledServiceImpl; import org.apache.rocketmq.mqtt.service.impl.WillMessageServiceImpl; @@ -84,6 +86,7 @@ public class DefaultMqttMessageProcessor implements RequestProcessor { private EnodeService enodeService; private NnodeService nnodeService; private ScheduledService mqttScheduledService; + private PersistService persistService; private final OrderedExecutor orderedExecutor; @@ -97,6 +100,8 @@ public class DefaultMqttMessageProcessor implements RequestProcessor { this.mqttRemotingServer = mqttRemotingServer; this.enodeService = enodeService; this.nnodeService = nnodeService; + this.persistService = PersistServiceFactory.getInstance().createPersistService(); + this.persistService.init(this); this.mqttClientHousekeepingService = new MqttClientHousekeepingService(iotClientManager); this.mqttClientHousekeepingService.start(mqttConfig.getHouseKeepingInterval()); @@ -214,4 +219,8 @@ public class DefaultMqttMessageProcessor implements RequestProcessor { public OrderedExecutor getOrderedExecutor() { return orderedExecutor; } + + public PersistService getPersistService() { + return persistService; + } } diff --git a/mqtt/src/main/resources/META-INF/service/org.apache.rocketmq.mqtt.PersistService b/mqtt/src/main/resources/META-INF/service/org.apache.rocketmq.mqtt.PersistService new file mode 100644 index 0000000000000000000000000000000000000000..f4a44810e9801052ed4f50065506a22e964949eb --- /dev/null +++ b/mqtt/src/main/resources/META-INF/service/org.apache.rocketmq.mqtt.PersistService @@ -0,0 +1 @@ +persistService=org.apache.rocketmq.mqtt.persistence.service.DefaultPersistService \ No newline at end of file diff --git a/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttPublishMessageHandlerTest.java b/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttPublishMessageHandlerTest.java index 2103d3133af0c2e830e8d6cd7faf62f2dd05f60b..4255f84268a91d9ffb3bbd8d46aec8642da92624 100644 --- a/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttPublishMessageHandlerTest.java +++ b/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttPublishMessageHandlerTest.java @@ -127,9 +127,7 @@ public class MqttPublishMessageHandlerTest { TopicRouteData topicRouteData = buildTopicRouteData(); Mockito.when(nnodeService.getTopicRouteDataByTopic(anyString(), anyBoolean())).thenReturn(topicRouteData); CompletableFuture future = new CompletableFuture<>(); -// RemotingCommand response = Mockito.mock(RemotingCommand.class); - Mockito.when(this.defaultMqttMessageProcessor.getEnodeService().sendMessage(any(RemotingChannel.class), anyString(), any(RemotingCommand.class))).thenReturn(future); -// doAnswer(mock -> future.complete(response)).when(this.defaultMqttMessageProcessor.getEnodeService().sendMessage(any(RemotingChannel.class), anyString(), any(RemotingCommand.class))); + Mockito.when(this.defaultMqttMessageProcessor.getEnodeService().sendMessage(any(), anyString(), any(RemotingCommand.class))).thenReturn(future); RemotingCommand remotingCommand = mqttPublishMessageHandler.handleMessage(mqttPublishMessage, remotingChannel); assert remotingCommand != null; MqttHeader mqttHeader = (MqttHeader) remotingCommand.readCustomHeader(); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java b/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java index 0eb1730a86211e1cbd6dbd814d7f575db9c17923..3c2add0795ff699a9186a3e217e96d7394782a06 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java @@ -35,4 +35,5 @@ public class MqttEncodeDecodeUtil { final String json = new String(body, Charset.forName("UTF-8")); return GSON.fromJson(json, classOfT); } + } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java index 726251df2ffcdcbd8fd51bf40d265eca440a006b..47166d1787fd3469016d1cfd958a19d3b92f6518 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java @@ -20,14 +20,17 @@ import io.netty.channel.ChannelHandlerContext; import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.service.EnodeService; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.RemotingChannel; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.netty.CodecHelper; import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.apache.rocketmq.common.service.EnodeService; public class LocalEnodeServiceImpl implements EnodeService { @@ -147,4 +150,28 @@ public class LocalEnodeServiceImpl implements EnodeService { log.info("un"); return this.brokerController.getAdminProcessor().lockBatchMQ(ctx, request); } + + @Override public RemotingCommand requestMQTTInfoSync( + RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { + try { + return this.brokerController.getMqttProcessor().processRequest(null, request); + } catch (Exception e) { + log.error("[Local]RequestMQTTInfo failed, error: {}", e); + } + return null; + } + + @Override public CompletableFuture requestMQTTInfoAsync( + RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { + CompletableFuture completableFuture = new CompletableFuture<>(); + try { + RemotingCommand remotingCommand = this.brokerController.getMqttProcessor().processRequest(null, request); + CodecHelper.encodeHeader(remotingCommand); + completableFuture.complete(remotingCommand); + } catch (Exception ex) { + log.error("[Local]RequestMQTTInfo failed, error: {}", ex); + completableFuture.completeExceptionally(ex); + } + return completableFuture; + } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/RemoteEnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/RemoteEnodeServiceImpl.java index 66ee7e25c860e94fa76884e96be19b76dde7391f..2e9c9d198595c1b524933219610633e053b12802 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/RemoteEnodeServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/RemoteEnodeServiceImpl.java @@ -38,6 +38,7 @@ import org.apache.rocketmq.common.service.EnodeService; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.mqtt.constant.MqttConstant; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.exception.RemotingCommandException; @@ -295,4 +296,16 @@ public class RemoteEnodeServiceImpl implements EnodeService { return this.snodeController.getRemotingClient().invokeSync(address, request, SnodeConstant.DEFAULT_TIMEOUT_MILLS); } + + @Override + public RemotingCommand requestMQTTInfoSync( + final RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { + return this.transferToEnode(request); + } + + @Override public CompletableFuture requestMQTTInfoAsync( + RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { + return this.sendMessage(null, request.getExtFields().get(MqttConstant.ENODE_NAME), request); + } + } diff --git a/store/pom.xml b/store/pom.xml index d90e7fbda26dccbd647aaad60a70f3c7363f7f1c..4af4e68821c648c76dc997a665a3ff3d350abeca 100644 --- a/store/pom.xml +++ b/store/pom.xml @@ -51,6 +51,11 @@ net.java.dev.jna jna + + org.rocksdb + rocksdbjni + 5.5.1 + ch.qos.logback logback-classic diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMQTTInfoStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMQTTInfoStore.java new file mode 100644 index 0000000000000000000000000000000000000000..6861f5f5426300100ad3400c6917b649c3e9830c --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMQTTInfoStore.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.store; + +import java.io.File; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Paths; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.rocksdb.Options; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; + +public class DefaultMQTTInfoStore implements MQTTInfoStore { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); + private RocksDB db; + private String storePathRootDir = System.getProperty("user.home") + File.separator + "store"; + private String storePathRocksDB = storePathRootDir + + File.separator + "RocksDB"; + + @Override public void load() { + RocksDB.loadLibrary(); + } + + @Override public void start() throws Exception { + + try (final Options options = new Options().setCreateIfMissing(true)) { + if (!Files.isSymbolicLink(Paths.get(storePathRocksDB))) { + Files.createDirectories(Paths.get(storePathRocksDB)); + } + db = RocksDB.open(options, storePathRocksDB); + } catch (RocksDBException e) { + log.error("Open RocksDb failed. Error:{}", e); + throw e; + } + + } + + @Override public boolean putData(String key, String value) { + try { + db.put(key.getBytes(), value.getBytes()); + return true; + } catch (Exception e) { + log.error("RocksDB put data failed. Error:{}", e); + return false; + } + } + + @Override public String getValue(String key) { + try { + byte[] value = db.get(key.getBytes()); + if (value != null) { + return new String(value, Charset.forName("UTF-8")); + } else { + return null; + } + } catch (Exception e) { + log.error("RocksDB get value failed. Error:{}", e); + return null; + } + } + + @Override public boolean deleteData(String key) { + boolean result = false; + try { + db.delete(key.getBytes()); + result = true; + } catch (Exception e) { + log.error("RocksDB delete data failed. Error:{}", e); + } + return result; + } +} diff --git a/store/src/main/java/org/apache/rocketmq/store/MQTTInfoStore.java b/store/src/main/java/org/apache/rocketmq/store/MQTTInfoStore.java new file mode 100644 index 0000000000000000000000000000000000000000..6603e61b27bdc8f879363fca3088f9e9c599d85d --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/MQTTInfoStore.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.store; + +public interface MQTTInfoStore { + + void load(); + + void start() throws Exception; + + boolean putData(String key, String value); + + String getValue(String key); + + boolean deleteData(String key); +}