提交 5813a8ba 编写于 作者: A Aaron-He 提交者: Heng Du

[RIP-11] Add persistence interface for mqtt session (#1297)

* implements mqtt subscription persistence

* Refactor code follow RocketMQ coding style

* Add quering mqtt client interface & reformat code
上级 53a15b63
......@@ -34,6 +34,7 @@
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-remoting</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-client</artifactId>
......@@ -50,6 +51,10 @@
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-acl</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-mqtt</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
......
......@@ -58,6 +58,7 @@ import org.apache.rocketmq.broker.processor.AdminBrokerProcessor;
import org.apache.rocketmq.broker.processor.ClientManageProcessor;
import org.apache.rocketmq.broker.processor.ConsumerManageProcessor;
import org.apache.rocketmq.broker.processor.EndTransactionProcessor;
import org.apache.rocketmq.broker.processor.MQTTProcessor;
import org.apache.rocketmq.broker.processor.PullMessageProcessor;
import org.apache.rocketmq.broker.processor.QueryMessageProcessor;
import org.apache.rocketmq.broker.processor.SendMessageProcessor;
......@@ -97,7 +98,9 @@ import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingServer;
import org.apache.rocketmq.remoting.util.ServiceProvider;
import org.apache.rocketmq.srvutil.FileWatchService;
import org.apache.rocketmq.store.DefaultMQTTInfoStore;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MQTTInfoStore;
import org.apache.rocketmq.store.MessageArrivingListener;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.BrokerRole;
......@@ -139,11 +142,14 @@ public class BrokerController {
private final BlockingQueue<Runnable> heartbeatThreadPoolQueue;
private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue;
private final BlockingQueue<Runnable> endTransactionThreadPoolQueue;
private final BlockingQueue<Runnable> mqttThreadPoolQueue;
private final FilterServerManager filterServerManager;
private final BrokerStatsManager brokerStatsManager;
private final List<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
private final List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
private MessageStore messageStore;
private MQTTProcessor mqttProcessor;
private MQTTInfoStore mqttInfoStore;
private RemotingServer remotingServer;
private RemotingServer fastRemotingServer;
private TopicConfigManager topicConfigManager;
......@@ -155,6 +161,7 @@ public class BrokerController {
private ExecutorService heartbeatExecutor;
private ExecutorService consumerManageExecutor;
private ExecutorService endTransactionExecutor;
private ExecutorService mqttMessageExecutor;
private boolean updateMasterHAServerAddrPeriodically = false;
private BrokerStats brokerStats;
private InetSocketAddress storeHost;
......@@ -204,6 +211,7 @@ public class BrokerController {
this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());
this.mqttThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getMqttThreadPoolQueueCapacity());
this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
......@@ -319,6 +327,15 @@ public class BrokerController {
this.endTransactionThreadPoolQueue,
new ThreadFactoryImpl("EndTransactionThread_"));
this.mqttMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getMqttMessageThreadPoolNums(),
this.brokerConfig.getMqttMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.mqttThreadPoolQueue,
new ThreadFactoryImpl("MQTTThread_")
);
this.consumerManageExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
"ConsumerManageThread_"));
......@@ -475,6 +492,14 @@ public class BrokerController {
}
initialTransaction();
}
try {
this.mqttInfoStore = new DefaultMQTTInfoStore();
mqttInfoStore.load();
mqttInfoStore.start();
} catch (Exception e) {
log.error("Open MQTT Database failed, error: {}", e);
}
return result;
}
......@@ -608,6 +633,30 @@ public class BrokerController {
this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
/**
* MQTTProcessor
*/
mqttProcessor = new MQTTProcessor(this);
this.remotingServer.registerProcessor(RequestCode.MQTT_ADD_OR_UPDATE_CLIENT2SUBSCRIPTION, mqttProcessor, this.mqttMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.MQTT_ADD_OR_UPDATE_CLIENT2SUBSCRIPTION, mqttProcessor, this.mqttMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.MQTT_IS_CLIENT2SUBSCRIPTION_PERSISTED, mqttProcessor, this.mqttMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.MQTT_IS_CLIENT2SUBSCRIPTION_PERSISTED, mqttProcessor, this.mqttMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.MQTT_DELETE_CLIENT2SUBSCRIPTION, mqttProcessor, this.mqttMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.MQTT_DELETE_CLIENT2SUBSCRIPTION, mqttProcessor, this.mqttMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.MQTT_GET_SNODEADDRESS2CLIENT, mqttProcessor, this.mqttMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.MQTT_GET_SNODEADDRESS2CLIENT, mqttProcessor, this.mqttMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.MQTT_CLIENT_UNSUBSRIBE, mqttProcessor, this.mqttMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.MQTT_CLIENT_UNSUBSRIBE, mqttProcessor, this.mqttMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.MQTT_ADD_OR_UPDATE_ROOTTOPIC2CLIENTS, mqttProcessor, this.mqttMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.MQTT_ADD_OR_UPDATE_ROOTTOPIC2CLIENTS, mqttProcessor, this.mqttMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.MQTT_DELETE_ROOTTOPIC2CLIENT, mqttProcessor, this.mqttMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.MQTT_DELETE_ROOTTOPIC2CLIENT, mqttProcessor, this.mqttMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.MQTT_GET_ROOTTOPIC2CLIENTS, mqttProcessor, this.mqttMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.MQTT_GET_ROOTTOPIC2CLIENTS, mqttProcessor, this.mqttMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.MQTT_GET_SUBSCRIPTION_BY_CLIENT_ID, mqttProcessor, this.mqttMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.MQTT_GET_SUBSCRIPTION_BY_CLIENT_ID, mqttProcessor, this.mqttMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.MQTT_GET_CLIENT_BY_CLIENTID_ID, mqttProcessor, this.mqttMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.MQTT_GET_CLIENT_BY_CLIENTID_ID, mqttProcessor, this.mqttMessageExecutor);
/**
* Default
*/
......@@ -1243,4 +1292,13 @@ public class BrokerController {
public ConsumerManageProcessor getConsumerManageProcessor() {
return consumerManageProcessor;
}
public MQTTInfoStore getMqttInfoStore() {
return mqttInfoStore;
}
public MQTTProcessor getMqttProcessor() {
return mqttProcessor;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.processor;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.lang.reflect.Type;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.common.client.Subscription;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.header.mqtt.AddOrUpdateClient2SubscriptionRequestHeader;
import org.apache.rocketmq.common.protocol.header.mqtt.AddOrUpdateClient2SubscriptionResponseHeader;
import org.apache.rocketmq.common.protocol.header.mqtt.AddOrUpdateRootTopic2ClientsRequestHeader;
import org.apache.rocketmq.common.protocol.header.mqtt.AddOrUpdateRootTopic2ClientsResponseHeader;
import org.apache.rocketmq.common.protocol.header.mqtt.ClientUnsubscribeRequestHeader;
import org.apache.rocketmq.common.protocol.header.mqtt.ClientUnsubscribeResponseHeader;
import org.apache.rocketmq.common.protocol.header.mqtt.DeleteClientRequestHeader;
import org.apache.rocketmq.common.protocol.header.mqtt.DeleteClientResponseHeader;
import org.apache.rocketmq.common.protocol.header.mqtt.DeleteRootTopic2ClientRequestHeader;
import org.apache.rocketmq.common.protocol.header.mqtt.DeleteRootTopic2ClientResponseHeader;
import org.apache.rocketmq.common.protocol.header.mqtt.GetClientByClientIdRequestHeader;
import org.apache.rocketmq.common.protocol.header.mqtt.GetClientByClientIdResponseHeader;
import org.apache.rocketmq.common.protocol.header.mqtt.GetRootTopic2ClientsRequestHeader;
import org.apache.rocketmq.common.protocol.header.mqtt.GetRootTopic2ClientsResponseHeader;
import org.apache.rocketmq.common.protocol.header.mqtt.GetSnodeAddress2ClientsRequestHeader;
import org.apache.rocketmq.common.protocol.header.mqtt.GetSnodeAddress2ClientsResponseHeader;
import org.apache.rocketmq.common.protocol.header.mqtt.GetSubscriptionByClientIdRequestHeader;
import org.apache.rocketmq.common.protocol.header.mqtt.GetSubscriptionByClientIdResponseHeader;
import org.apache.rocketmq.common.protocol.header.mqtt.IsClient2SubscriptionPersistedRequestHeader;
import org.apache.rocketmq.common.protocol.header.mqtt.IsClient2SubscriptionPersistedResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.mqtt.client.MQTTSession;
import org.apache.rocketmq.mqtt.constant.MqttConstant;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.MQTTInfoStore;
public class MQTTProcessor implements RequestProcessor {
private final BrokerController brokerController;
private static final Gson GSON = new Gson();
private final MQTTInfoStore mqttInfoStore;
public MQTTProcessor(final BrokerController brokerController) {
this.brokerController = brokerController;
this.mqttInfoStore = this.brokerController.getMqttInfoStore();
}
@Override public boolean rejectRequest() {
return false;
}
@Override
public RemotingCommand processRequest(RemotingChannel remotingChannel,
RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.MQTT_IS_CLIENT2SUBSCRIPTION_PERSISTED:
return this.isClient2SubscriptionPersistedHandler(remotingChannel, request);
case RequestCode.MQTT_ADD_OR_UPDATE_CLIENT2SUBSCRIPTION:
return this.addOrUpdateClient2Subscription(remotingChannel, request);
case RequestCode.MQTT_DELETE_CLIENT2SUBSCRIPTION:
return this.deleteClient2Subscription(request);
case RequestCode.MQTT_GET_SNODEADDRESS2CLIENT:
return this.getSnodeAddress2Clients(request);
case RequestCode.MQTT_CLIENT_UNSUBSRIBE:
return this.clientUnsubscribe(request);
case RequestCode.MQTT_ADD_OR_UPDATE_ROOTTOPIC2CLIENTS:
return this.addorUpdateRootTopic2Clients(request);
case RequestCode.MQTT_GET_ROOTTOPIC2CLIENTS:
return this.getRootTopic2Clients(request);
case RequestCode.MQTT_DELETE_ROOTTOPIC2CLIENT:
return this.deleteRootTopic2Client(request);
case RequestCode.MQTT_GET_SUBSCRIPTION_BY_CLIENT_ID:
return this.getSubscriptionByClientId(request);
case RequestCode.MQTT_GET_CLIENT_BY_CLIENTID_ID:
return this.getClientByClientId(request);
default:
return null;
}
}
private RemotingCommand getClientByClientId(RemotingCommand request) throws RemotingCommandException {
GetClientByClientIdRequestHeader requestHeader = (GetClientByClientIdRequestHeader) request.decodeCommandCustomHeader(GetClientByClientIdRequestHeader.class);
String clientId = requestHeader.getClientId();
Client client = GSON.fromJson(this.mqttInfoStore.getValue(clientId + MqttConstant.PERSIST_CLIENT_SUFFIX), MQTTSession.class);
RemotingCommand response = RemotingCommand.createResponseCommand(GetClientByClientIdResponseHeader.class);
GetClientByClientIdResponseHeader responseHeader = (GetClientByClientIdResponseHeader) response.readCustomHeader();
responseHeader.setClient(client);
return response;
}
private RemotingCommand getSubscriptionByClientId(RemotingCommand request) throws RemotingCommandException {
GetSubscriptionByClientIdRequestHeader requestHeader = (GetSubscriptionByClientIdRequestHeader) request.decodeCommandCustomHeader(GetSubscriptionByClientIdRequestHeader.class);
String clientId = requestHeader.getClientId();
Subscription subscription = GSON.fromJson(this.mqttInfoStore.getValue(clientId + MqttConstant.PERSIST_SUBSCRIPTION_SUFFIX), Subscription.class);
RemotingCommand response = RemotingCommand.createResponseCommand(GetSubscriptionByClientIdResponseHeader.class);
GetSubscriptionByClientIdResponseHeader responseHeader = (GetSubscriptionByClientIdResponseHeader) response.readCustomHeader();
responseHeader.setSubscription(subscription);
return response;
}
private RemotingCommand isClient2SubscriptionPersistedHandler(final RemotingChannel remotingChannel,
final RemotingCommand request) throws RemotingCommandException {
RemotingCommand response = RemotingCommand.createResponseCommand(IsClient2SubscriptionPersistedResponseHeader.class);
IsClient2SubscriptionPersistedResponseHeader responseHeader = (IsClient2SubscriptionPersistedResponseHeader) response.readCustomHeader();
IsClient2SubscriptionPersistedRequestHeader requestHeader = (IsClient2SubscriptionPersistedRequestHeader) request.decodeCommandCustomHeader(IsClient2SubscriptionPersistedRequestHeader.class);
String clientId = requestHeader.getClientId();
boolean cleanSession = requestHeader.isCleanSession();
String clientJson = mqttInfoStore.getValue(clientId + MqttConstant.PERSIST_CLIENT_SUFFIX);
if (clientJson != null) {
MQTTSession client = GSON.fromJson(clientJson, MQTTSession.class);
if (client.isCleanSession() != cleanSession) {
client.setCleanSession(cleanSession);
mqttInfoStore.putData(clientId + MqttConstant.PERSIST_CLIENT_SUFFIX, GSON.toJson(client));
}
responseHeader.setPersisted(true);
} else {
responseHeader.setPersisted(false);
}
mqttInfoStore.putData(clientId + MqttConstant.PERSIST_SNODEADDRESS_SUFFIX, remotingChannel.remoteAddress().toString());
return response;
}
private RemotingCommand addOrUpdateClient2Subscription(final RemotingChannel remotingChannel,
final RemotingCommand request) throws RemotingCommandException {
AddOrUpdateClient2SubscriptionRequestHeader requestHeader = (AddOrUpdateClient2SubscriptionRequestHeader) request.decodeCommandCustomHeader(AddOrUpdateClient2SubscriptionRequestHeader.class);
Client client = requestHeader.getClient();
Subscription subscription = requestHeader.getSubscription();
boolean client2SubResult = this.mqttInfoStore.putData(client.getClientId() + MqttConstant.PERSIST_SUBSCRIPTION_SUFFIX, GSON.toJson(subscription));
boolean client2SnoResult = this.mqttInfoStore.putData(client.getClientId() + MqttConstant.PERSIST_SNODEADDRESS_SUFFIX, remotingChannel.remoteAddress().toString());
boolean client2EntityResult = this.mqttInfoStore.putData(client.getClientId() + MqttConstant.PERSIST_CLIENT_SUFFIX, GSON.toJson(client));
RemotingCommand response = RemotingCommand.createResponseCommand(AddOrUpdateClient2SubscriptionResponseHeader.class);
AddOrUpdateClient2SubscriptionResponseHeader responseHeader = (AddOrUpdateClient2SubscriptionResponseHeader) response.readCustomHeader();
responseHeader.setResult(client2SubResult && client2SnoResult && client2EntityResult);
return response;
}
private RemotingCommand deleteClient2Subscription(final RemotingCommand request) throws RemotingCommandException {
DeleteClientRequestHeader requestHeader = (DeleteClientRequestHeader) request.decodeCommandCustomHeader(DeleteClientRequestHeader.class);
String clientId = requestHeader.getClientId();
String subscriptionJson = this.mqttInfoStore.getValue(clientId + MqttConstant.PERSIST_SUBSCRIPTION_SUFFIX);
Subscription subscription = GSON.fromJson(subscriptionJson, Subscription.class);
boolean operationSuccess = this.mqttInfoStore.deleteData(clientId + MqttConstant.PERSIST_SUBSCRIPTION_SUFFIX) && this.mqttInfoStore.deleteData(clientId + MqttConstant.PERSIST_SNODEADDRESS_SUFFIX) && this.mqttInfoStore.deleteData(clientId + MqttConstant.PERSIST_CLIENT_SUFFIX);
RemotingCommand response = RemotingCommand.createResponseCommand(DeleteClientResponseHeader.class);
DeleteClientResponseHeader responseHeader = (DeleteClientResponseHeader) response.readCustomHeader();
responseHeader.setOperationSuccess(operationSuccess);
responseHeader.setSubscription(subscription);
return response;
}
private RemotingCommand getSnodeAddress2Clients(final RemotingCommand request) throws RemotingCommandException {
Map<String, Set<Client>> snodeAddress2Clients = new ConcurrentHashMap<>();
Set<Client> clients = new HashSet<>();
GetSnodeAddress2ClientsRequestHeader requestHeader = (GetSnodeAddress2ClientsRequestHeader) request.decodeCommandCustomHeader(GetSnodeAddress2ClientsRequestHeader.class);
String topic = requestHeader.getTopic();
Set<String> clientsId = requestHeader.getClientsId();
for (String clientId : clientsId) {
ConcurrentHashMap<String/*Topic*/, SubscriptionData> subscriptionTable = GSON.fromJson(this.mqttInfoStore.getValue(clientId + MqttConstant.PERSIST_SUBSCRIPTION_SUFFIX), Subscription.class).getSubscriptionTable();
for (String topicFilter : subscriptionTable.keySet()) {
if (isMatch(topicFilter, topic)) {
clients.add(GSON.fromJson(this.mqttInfoStore.getValue(clientId + MqttConstant.PERSIST_CLIENT_SUFFIX), MQTTSession.class));
}
}
}
for (Client client : clients) {
String snodeAddress = this.mqttInfoStore.getValue(client.getClientId() + MqttConstant.PERSIST_SNODEADDRESS_SUFFIX);
Set<Client> clientsTmp = snodeAddress2Clients.getOrDefault(snodeAddress, new HashSet<>());
clientsTmp.add(client);
}
RemotingCommand response = RemotingCommand.createResponseCommand(GetSnodeAddress2ClientsResponseHeader.class);
GetSnodeAddress2ClientsResponseHeader responseHeader = (GetSnodeAddress2ClientsResponseHeader) response.readCustomHeader();
responseHeader.setSnodeAddress2Clients(snodeAddress2Clients);
return response;
}
private RemotingCommand clientUnsubscribe(final RemotingCommand request) throws RemotingCommandException {
ClientUnsubscribeRequestHeader requestHeader = (ClientUnsubscribeRequestHeader) request.decodeCommandCustomHeader(ClientUnsubscribeRequestHeader.class);
String clientId = requestHeader.getClientId();
List<String> topics = requestHeader.getTopics();
Subscription subscription = GSON.fromJson(this.mqttInfoStore.getValue(clientId + MqttConstant.PERSIST_SUBSCRIPTION_SUFFIX), Subscription.class);
ConcurrentHashMap<String, SubscriptionData> subscriptionTable = subscription.getSubscriptionTable();
Set<String> rootTopicsBefore = subscriptionTable.keySet().stream().map(t -> t.split(MqttConstant.SUBSCRIPTION_SEPARATOR)[0]).collect(Collectors.toSet());
for (String topic : topics) {
subscriptionTable.remove(topic);
}
Set<String> rootTopicAfter = subscriptionTable.keySet().stream().map(t -> t.split(MqttConstant.SUBSCRIPTION_SEPARATOR)[0]).collect(Collectors.toSet());
Set<String> rootTopicsDiff = new HashSet<>();
rootTopicsDiff.addAll(rootTopicsBefore);
rootTopicsDiff.removeAll(rootTopicAfter);
subscription.setSubscriptionTable(subscriptionTable);
boolean result = this.mqttInfoStore.putData(clientId + MqttConstant.PERSIST_SUBSCRIPTION_SUFFIX, GSON.toJson(subscription));
RemotingCommand response = RemotingCommand.createResponseCommand(ClientUnsubscribeResponseHeader.class);
ClientUnsubscribeResponseHeader responseHeader = (ClientUnsubscribeResponseHeader) response.readCustomHeader();
responseHeader.setOperationSuccess(result);
if (rootTopicsDiff.size() != 0) {
responseHeader.setRootTopicDiffExists(true);
responseHeader.setRootTopicsDiff(rootTopicsDiff);
}
return response;
}
private RemotingCommand addorUpdateRootTopic2Clients(
final RemotingCommand request) throws RemotingCommandException {
AddOrUpdateRootTopic2ClientsRequestHeader requestHeader = (AddOrUpdateRootTopic2ClientsRequestHeader) request.decodeCommandCustomHeader(AddOrUpdateRootTopic2ClientsRequestHeader.class);
String rootTopic = requestHeader.getRootTopic();
String clientId = requestHeader.getClientId();
String value = this.mqttInfoStore.getValue(rootTopic);
Set<String> clientsId;
if (value != null) {
clientsId = GSON.fromJson(value, new TypeToken<Set<String>>() {
}.getType());
} else {
clientsId = new HashSet<>();
}
clientsId.add(clientId);
RemotingCommand response = RemotingCommand.createResponseCommand(AddOrUpdateRootTopic2ClientsResponseHeader.class);
AddOrUpdateRootTopic2ClientsResponseHeader responseHeader = (AddOrUpdateRootTopic2ClientsResponseHeader) response.readCustomHeader();
responseHeader.setOperationSuccess(this.mqttInfoStore.putData(rootTopic, GSON.toJson(clientsId)));
return response;
}
private RemotingCommand getRootTopic2Clients(final RemotingCommand request) throws RemotingCommandException {
GetRootTopic2ClientsRequestHeader requestHeader = (GetRootTopic2ClientsRequestHeader) request.decodeCommandCustomHeader(GetRootTopic2ClientsRequestHeader.class);
String rootTopic = requestHeader.getRootTopic();
String json = this.mqttInfoStore.getValue(rootTopic);
RemotingCommand response = RemotingCommand.createResponseCommand(GetRootTopic2ClientsResponseHeader.class);
GetRootTopic2ClientsResponseHeader responseHeader = (GetRootTopic2ClientsResponseHeader) response.readCustomHeader();
if (json != null) {
Set<String> clientsId = GSON.fromJson(json, new TypeToken<Set<String>>() {
}.getType());
responseHeader.setOperationSuccess(true);
responseHeader.setClientsId(clientsId);
} else {
responseHeader.setOperationSuccess(false);
}
return response;
}
private RemotingCommand deleteRootTopic2Client(final RemotingCommand request) throws RemotingCommandException {
DeleteRootTopic2ClientRequestHeader requestHeader = (DeleteRootTopic2ClientRequestHeader) request.decodeCommandCustomHeader(DeleteRootTopic2ClientRequestHeader.class);
String rootTopic = requestHeader.getRootTopic();
String clientId = requestHeader.getClientId();
Set<String> clientsId = GSON.fromJson(this.mqttInfoStore.getValue(rootTopic), new TypeToken<Set<String>>() {
}.getType());
Set<String> clientsIdAfterDelete = clientsId.stream().filter(c -> c != clientId).collect(Collectors.toSet());
boolean result;
if (clientsIdAfterDelete.size() == 0) {
result = this.mqttInfoStore.deleteData(rootTopic);
} else {
result = this.mqttInfoStore.putData(rootTopic, GSON.toJson(clientsIdAfterDelete));
}
RemotingCommand response = RemotingCommand.createResponseCommand(DeleteRootTopic2ClientResponseHeader.class);
DeleteRootTopic2ClientResponseHeader responseHeader = (DeleteRootTopic2ClientResponseHeader) response.readCustomHeader();
responseHeader.setOperationSuccess(result);
return response;
}
private Set<Client> clientsStringToClientsSet(String clientsString) {
Set<Client> clients = new HashSet<>();
Type type = new TypeToken<Set<Client>>() {
}.getType();
clients = GSON.fromJson(clientsString, type);
return clients;
}
private boolean isMatch(String topicFiter, String topic) {
if (!topicFiter.contains("+") && !topicFiter.contains("#")) {
return topicFiter.equals(topic);
}
String[] filterTopics = topicFiter.split("/");
String[] actualTopics = topic.split("/");
int i = 0;
for (; i < filterTopics.length && i < actualTopics.length; i++) {
if ("+".equals(filterTopics[i])) {
continue;
}
if ("#".equals(filterTopics[i])) {
return true;
}
if (!filterTopics[i].equals(actualTopics[i])) {
return false;
}
}
return i == actualTopics.length;
}
}
......@@ -25,8 +25,6 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import static org.apache.rocketmq.common.SnodeConfig.localHostName;
public class BrokerConfig {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
......@@ -70,6 +68,11 @@ public class BrokerConfig {
private int consumerManageThreadPoolNums = 32;
private int heartbeatThreadPoolNums = Math.min(32, Runtime.getRuntime().availableProcessors());
/**
* thread numbers for mqtt message thread pool.
*/
private int mqttMessageThreadPoolNums = 1;
/**
* Thread numbers for EndTransactionProcessor
*/
......@@ -90,6 +93,7 @@ public class BrokerConfig {
private int consumerManagerThreadPoolQueueCapacity = 1000000;
private int heartbeatThreadPoolQueueCapacity = 50000;
private int endTransactionPoolQueueCapacity = 100000;
private int mqttThreadPoolQueueCapacity = 10000;
private int filterServerNums = 0;
......@@ -777,4 +781,20 @@ public class BrokerConfig {
public void setAclEnable(boolean aclEnable) {
this.aclEnable = aclEnable;
}
public int getMqttMessageThreadPoolNums() {
return mqttMessageThreadPoolNums;
}
public void setMqttMessageThreadPoolNums(int mqttMessageThreadPoolNums) {
this.mqttMessageThreadPoolNums = mqttMessageThreadPoolNums;
}
public int getMqttThreadPoolQueueCapacity() {
return mqttThreadPoolQueueCapacity;
}
public void setMqttThreadPoolQueueCapacity(int mqttThreadPoolQueueCapacity) {
this.mqttThreadPoolQueueCapacity = mqttThreadPoolQueueCapacity;
}
}
......@@ -181,4 +181,25 @@ public class RequestCode {
public static final int CREATE_RETRY_TOPIC = 355;
public static final int MQTT_MESSAGE = 1000;
public static final int MQTT_IS_CLIENT2SUBSCRIPTION_PERSISTED = 1001;
public static final int MQTT_ADD_OR_UPDATE_CLIENT2SUBSCRIPTION = 1002;
public static final int MQTT_DELETE_CLIENT2SUBSCRIPTION = 1003;
public static final int MQTT_GET_SNODEADDRESS2CLIENT = 1004;
public static final int MQTT_CLIENT_UNSUBSRIBE = 1005;
public static final int MQTT_ADD_OR_UPDATE_ROOTTOPIC2CLIENTS = 1006;
public static final int MQTT_GET_ROOTTOPIC2CLIENTS = 1007;
public static final int MQTT_DELETE_ROOTTOPIC2CLIENT = 1008;
public static final int MQTT_GET_SUBSCRIPTION_BY_CLIENT_ID = 1009;
public static final int MQTT_GET_CLIENT_BY_CLIENTID_ID = 1010;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header.mqtt;
import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.common.client.Subscription;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class AddOrUpdateClient2SubscriptionRequestHeader implements CommandCustomHeader {
@CFNotNull
private Client client;
@CFNotNull
private Subscription subscription;
public Client getClient() {
return client;
}
public void setClient(Client client) {
this.client = client;
}
public Subscription getSubscription() {
return subscription;
}
public void setSubscription(Subscription subscription) {
this.subscription = subscription;
}
@Override public void checkFields() throws RemotingCommandException {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header.mqtt;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class AddOrUpdateClient2SubscriptionResponseHeader implements CommandCustomHeader {
private boolean result;
public boolean isResult() {
return result;
}
public void setResult(boolean result) {
this.result = result;
}
@Override public void checkFields() throws RemotingCommandException {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header.mqtt;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class AddOrUpdateRootTopic2ClientsRequestHeader implements CommandCustomHeader {
@CFNotNull
private String rootTopic;
@CFNotNull
private String clientId;
public String getRootTopic() {
return rootTopic;
}
public void setRootTopic(String rootTopic) {
this.rootTopic = rootTopic;
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
@Override public void checkFields() throws RemotingCommandException {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header.mqtt;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class AddOrUpdateRootTopic2ClientsResponseHeader implements CommandCustomHeader {
private boolean operationSuccess;
public boolean isOperationSuccess() {
return operationSuccess;
}
public void setOperationSuccess(boolean operationSuccess) {
this.operationSuccess = operationSuccess;
}
@Override public void checkFields() throws RemotingCommandException {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header.mqtt;
import java.util.List;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class ClientUnsubscribeRequestHeader implements CommandCustomHeader {
@CFNotNull
private String clientId;
@CFNotNull
private List<String> topics;
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public List<String> getTopics() {
return topics;
}
public void setTopics(List<String> topics) {
this.topics = topics;
}
@Override public void checkFields() throws RemotingCommandException {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header.mqtt;
import java.util.Set;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class ClientUnsubscribeResponseHeader implements CommandCustomHeader {
private boolean operationSuccess;
private boolean rootTopicDiffExists;
private Set<String> rootTopicsDiff;
public boolean isOperationSuccess() {
return operationSuccess;
}
public void setOperationSuccess(boolean operationSuccess) {
this.operationSuccess = operationSuccess;
}
public boolean isRootTopicDiffExists() {
return rootTopicDiffExists;
}
public void setRootTopicDiffExists(boolean rootTopicDiffExists) {
this.rootTopicDiffExists = rootTopicDiffExists;
}
public Set<String> getRootTopicsDiff() {
return rootTopicsDiff;
}
public void setRootTopicsDiff(Set<String> rootTopicsDiff) {
this.rootTopicsDiff = rootTopicsDiff;
}
@Override public void checkFields() throws RemotingCommandException {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header.mqtt;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class DeleteClientRequestHeader implements CommandCustomHeader {
@CFNotNull
private String clientId;
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
@Override public void checkFields() throws RemotingCommandException {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header.mqtt;
import org.apache.rocketmq.common.client.Subscription;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class DeleteClientResponseHeader implements CommandCustomHeader {
private Subscription subscription;
private boolean operationSuccess;
public Subscription getSubscription() {
return subscription;
}
public void setSubscription(Subscription subscription) {
this.subscription = subscription;
}
public boolean isOperationSuccess() {
return operationSuccess;
}
public void setOperationSuccess(boolean operationSuccess) {
this.operationSuccess = operationSuccess;
}
@Override public void checkFields() throws RemotingCommandException {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header.mqtt;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class DeleteRootTopic2ClientRequestHeader implements CommandCustomHeader {
@CFNotNull
private String rootTopic;
@CFNotNull
private String clientId;
public String getRootTopic() {
return rootTopic;
}
public void setRootTopic(String rootTopic) {
this.rootTopic = rootTopic;
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
@Override public void checkFields() throws RemotingCommandException {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header.mqtt;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class DeleteRootTopic2ClientResponseHeader implements CommandCustomHeader {
private boolean operationSuccess;
public boolean isOperationSuccess() {
return operationSuccess;
}
public void setOperationSuccess(boolean operationSuccess) {
this.operationSuccess = operationSuccess;
}
@Override public void checkFields() throws RemotingCommandException {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header.mqtt;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class GetClientByClientIdRequestHeader implements CommandCustomHeader {
@CFNotNull
private String clientId;
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
@Override public void checkFields() throws RemotingCommandException {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header.mqtt;
import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class GetClientByClientIdResponseHeader implements CommandCustomHeader {
private Client client;
public Client getClient() {
return client;
}
public void setClient(Client client) {
this.client = client;
}
@Override public void checkFields() throws RemotingCommandException {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header.mqtt;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class GetRootTopic2ClientsRequestHeader implements CommandCustomHeader {
@CFNotNull
private String rootTopic;
public String getRootTopic() {
return rootTopic;
}
public void setRootTopic(String rootTopic) {
this.rootTopic = rootTopic;
}
@Override public void checkFields() throws RemotingCommandException {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header.mqtt;
import java.util.Set;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class GetRootTopic2ClientsResponseHeader implements CommandCustomHeader {
private boolean operationSuccess;
private Set<String> clientsId;
public boolean isOperationSuccess() {
return operationSuccess;
}
public Set<String> getClientsId() {
return clientsId;
}
public void setClientsId(Set<String> clientsId) {
this.clientsId = clientsId;
}
public void setOperationSuccess(boolean operationSuccess) {
this.operationSuccess = operationSuccess;
}
@Override public void checkFields() throws RemotingCommandException {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header.mqtt;
import java.util.Set;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class GetSnodeAddress2ClientsRequestHeader implements CommandCustomHeader {
@CFNotNull
private Set<String> clientsId;
@CFNotNull
private String topic;
public Set<String> getClientsId() {
return clientsId;
}
public void setClientsId(Set<String> clientsId) {
this.clientsId = clientsId;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
@Override public void checkFields() throws RemotingCommandException {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header.mqtt;
import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class GetSnodeAddress2ClientsResponseHeader implements CommandCustomHeader {
private Map<String, Set<Client>> snodeAddress2Clients;
public Map<String, Set<Client>> getSnodeAddress2Clients() {
return snodeAddress2Clients;
}
public void setSnodeAddress2Clients(
Map<String, Set<Client>> snodeAddress2Clients) {
this.snodeAddress2Clients = snodeAddress2Clients;
}
@Override public void checkFields() throws RemotingCommandException {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header.mqtt;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class GetSubscriptionByClientIdRequestHeader implements CommandCustomHeader {
@CFNotNull
private String clientId;
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
@Override public void checkFields() throws RemotingCommandException {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header.mqtt;
import org.apache.rocketmq.common.client.Subscription;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class GetSubscriptionByClientIdResponseHeader implements CommandCustomHeader {
private Subscription subscription;
public Subscription getSubscription() {
return subscription;
}
public void setSubscription(Subscription subscription) {
this.subscription = subscription;
}
@Override public void checkFields() throws RemotingCommandException {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header.mqtt;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class IsClient2SubscriptionPersistedRequestHeader implements CommandCustomHeader {
@CFNotNull
private String clientId;
private boolean cleanSession;
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public boolean isCleanSession() {
return cleanSession;
}
public void setCleanSession(boolean cleanSession) {
this.cleanSession = cleanSession;
}
@Override public void checkFields() throws RemotingCommandException {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header.mqtt;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class IsClient2SubscriptionPersistedResponseHeader implements CommandCustomHeader {
private boolean isPersisted;
public boolean isPersisted() {
return isPersisted;
}
public void setPersisted(boolean persisted) {
isPersisted = persisted;
}
@Override public void checkFields() throws RemotingCommandException {
}
}
......@@ -137,4 +137,10 @@ public interface EnodeService {
RemotingCommand unlockBatchMQ(final RemotingChannel remotingChannel,
final RemotingCommand remotingCommand) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
RemotingCommand requestMQTTInfoSync(final RemotingCommand request)
throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
CompletableFuture<RemotingCommand> requestMQTTInfoAsync(final RemotingCommand request)
throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
}
......@@ -95,10 +95,6 @@
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_hotspot</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-broker</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
......
......@@ -30,4 +30,8 @@ public class MqttConstant {
public static final int FLIGHT_BEFORE_RESEND_MS = 5_000;
public static final String PROPERTY_MQTT_QOS = "PROPERTY_MQTT_QOS";
public static final AttributeKey<Client> MQTT_CLIENT_ATTRIBUTE_KEY = AttributeKey.valueOf("mqtt.client");
public static final String ENODE_NAME = "enodeName";
public static final String PERSIST_SUBSCRIPTION_SUFFIX = "-sub";
public static final String PERSIST_SNODEADDRESS_SUFFIX = "-sno";
public static final String PERSIST_CLIENT_SUFFIX = "-cli";
}
......@@ -150,6 +150,8 @@ public class MqttSubscribeMessageHandler implements MessageHandler {
}
}
//TODO update persistent store of topic2Clients and clientId2Subscription
this.defaultMqttMessageProcessor.getPersistService().addOrUpdateClient2Susbscription(client, subscription);
return grantQoss;
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.mqtt.persistence;
import java.util.Map;
import org.apache.rocketmq.mqtt.persistence.service.PersistService;
import org.apache.rocketmq.remoting.util.ServiceProvider;
public class PersistServiceFactory {
private static PersistServiceFactory instance = new PersistServiceFactory();
public static PersistServiceFactory getInstance() {
return instance;
}
private PersistServiceFactory() {
}
private static Map<String, String> paths;
private static final String SERVICE_LOCATION = "META-INF/service/org.apache.rocketmq.mqtt.PersistService";
static {
paths = ServiceProvider.loadPath(SERVICE_LOCATION);
}
public PersistService createPersistService() {
return ServiceProvider.createInstance(paths.get("persistService"), PersistService.class);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.mqtt.persistence.rebalance;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Set;
import org.apache.rocketmq.common.consistenthash.ConsistentHashRouter;
import org.apache.rocketmq.common.consistenthash.HashFunction;
import org.apache.rocketmq.common.consistenthash.Node;
public class AllocatePersistentDataConsistentHash implements AllocatePersistentDataStrategy {
private final int virtualNodeCnt;
private final HashFunction customHashFunction;
public AllocatePersistentDataConsistentHash() {
this(10);
}
public AllocatePersistentDataConsistentHash(int virtualNodeCnt) {
this(virtualNodeCnt, null);
}
public AllocatePersistentDataConsistentHash(int virtualNodeCnt, HashFunction customHashFunction) {
if (virtualNodeCnt < 0) {
throw new IllegalArgumentException("illegal virtualNodeCnt :" + virtualNodeCnt);
}
this.virtualNodeCnt = virtualNodeCnt;
this.customHashFunction = customHashFunction;
}
@Override
public String allocate(String dataKey, Set<String> enodeNames) {
Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();
for (String enodeName : enodeNames) {
cidNodes.add(new ClientNode(enodeName));
}
final ConsistentHashRouter<ClientNode> router; //for building hash ring
if (customHashFunction != null) {
router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);
} else {
router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);
}
ClientNode clientNode = router.routeNode(dataKey);
if (clientNode != null) {
return clientNode.getKey();
}
return null;
}
@Override
public String getName() {
return "CONSISTENT_HASH";
}
private static class ClientNode implements Node {
private final String clientID;
public ClientNode(String clientID) {
this.clientID = clientID;
}
@Override
public String getKey() {
return clientID;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.mqtt.persistence.rebalance;
import java.util.Set;
public interface AllocatePersistentDataStrategy {
String allocate(final String dataKey, final Set<String> enodeNames);
String getName();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.mqtt.persistence.service;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.common.client.Subscription;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
public interface PersistService {
/**
* Init Persist Service
* @param processor MQTT messages processor
*/
void init(DefaultMqttMessageProcessor processor);
boolean isClient2SubsriptionPersisted(Client client);
boolean addOrUpdateClient2Susbscription(Client client, Subscription subscription);
boolean deleteClient(Client client);
Map<String, Set<Client>> getSnodeAddress2Clients(String topic);
boolean clientUnsubscribe(Client client, List<String> topics);
Subscription getSubscriptionByClientId(String clientId);
Client getClientByClientId(String clientId);
}
......@@ -55,6 +55,8 @@ import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPubrecMessageHandler;
import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPubrelMessageHandler;
import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttSubscribeMessageHandler;
import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttUnsubscribeMessagHandler;
import org.apache.rocketmq.mqtt.persistence.PersistServiceFactory;
import org.apache.rocketmq.mqtt.persistence.service.PersistService;
import org.apache.rocketmq.mqtt.service.WillMessageService;
import org.apache.rocketmq.mqtt.service.impl.MqttScheduledServiceImpl;
import org.apache.rocketmq.mqtt.service.impl.WillMessageServiceImpl;
......@@ -84,6 +86,7 @@ public class DefaultMqttMessageProcessor implements RequestProcessor {
private EnodeService enodeService;
private NnodeService nnodeService;
private ScheduledService mqttScheduledService;
private PersistService persistService;
private final OrderedExecutor orderedExecutor;
......@@ -97,6 +100,8 @@ public class DefaultMqttMessageProcessor implements RequestProcessor {
this.mqttRemotingServer = mqttRemotingServer;
this.enodeService = enodeService;
this.nnodeService = nnodeService;
this.persistService = PersistServiceFactory.getInstance().createPersistService();
this.persistService.init(this);
this.mqttClientHousekeepingService = new MqttClientHousekeepingService(iotClientManager);
this.mqttClientHousekeepingService.start(mqttConfig.getHouseKeepingInterval());
......@@ -214,4 +219,8 @@ public class DefaultMqttMessageProcessor implements RequestProcessor {
public OrderedExecutor getOrderedExecutor() {
return orderedExecutor;
}
public PersistService getPersistService() {
return persistService;
}
}
persistService=org.apache.rocketmq.mqtt.persistence.service.DefaultPersistService
\ No newline at end of file
......@@ -127,9 +127,7 @@ public class MqttPublishMessageHandlerTest {
TopicRouteData topicRouteData = buildTopicRouteData();
Mockito.when(nnodeService.getTopicRouteDataByTopic(anyString(), anyBoolean())).thenReturn(topicRouteData);
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
// RemotingCommand response = Mockito.mock(RemotingCommand.class);
Mockito.when(this.defaultMqttMessageProcessor.getEnodeService().sendMessage(any(RemotingChannel.class), anyString(), any(RemotingCommand.class))).thenReturn(future);
// doAnswer(mock -> future.complete(response)).when(this.defaultMqttMessageProcessor.getEnodeService().sendMessage(any(RemotingChannel.class), anyString(), any(RemotingCommand.class)));
Mockito.when(this.defaultMqttMessageProcessor.getEnodeService().sendMessage(any(), anyString(), any(RemotingCommand.class))).thenReturn(future);
RemotingCommand remotingCommand = mqttPublishMessageHandler.handleMessage(mqttPublishMessage, remotingChannel);
assert remotingCommand != null;
MqttHeader mqttHeader = (MqttHeader) remotingCommand.readCustomHeader();
......
......@@ -35,4 +35,5 @@ public class MqttEncodeDecodeUtil {
final String json = new String(body, Charset.forName("UTF-8"));
return GSON.fromJson(json, classOfT);
}
}
......@@ -20,14 +20,17 @@ import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.service.EnodeService;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.common.service.EnodeService;
public class LocalEnodeServiceImpl implements EnodeService {
......@@ -147,4 +150,28 @@ public class LocalEnodeServiceImpl implements EnodeService {
log.info("un");
return this.brokerController.getAdminProcessor().lockBatchMQ(ctx, request);
}
@Override public RemotingCommand requestMQTTInfoSync(
RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
try {
return this.brokerController.getMqttProcessor().processRequest(null, request);
} catch (Exception e) {
log.error("[Local]RequestMQTTInfo failed, error: {}", e);
}
return null;
}
@Override public CompletableFuture<RemotingCommand> requestMQTTInfoAsync(
RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
CompletableFuture<RemotingCommand> completableFuture = new CompletableFuture<>();
try {
RemotingCommand remotingCommand = this.brokerController.getMqttProcessor().processRequest(null, request);
CodecHelper.encodeHeader(remotingCommand);
completableFuture.complete(remotingCommand);
} catch (Exception ex) {
log.error("[Local]RequestMQTTInfo failed, error: {}", ex);
completableFuture.completeExceptionally(ex);
}
return completableFuture;
}
}
......@@ -38,6 +38,7 @@ import org.apache.rocketmq.common.service.EnodeService;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.mqtt.constant.MqttConstant;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
......@@ -295,4 +296,16 @@ public class RemoteEnodeServiceImpl implements EnodeService {
return this.snodeController.getRemotingClient().invokeSync(address,
request, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
}
@Override
public RemotingCommand requestMQTTInfoSync(
final RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
return this.transferToEnode(request);
}
@Override public CompletableFuture<RemotingCommand> requestMQTTInfoAsync(
RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
return this.sendMessage(null, request.getExtFields().get(MqttConstant.ENODE_NAME), request);
}
}
......@@ -51,6 +51,11 @@
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
</dependency>
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<version>5.5.1</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store;
import java.io.File;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
public class DefaultMQTTInfoStore implements MQTTInfoStore {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private RocksDB db;
private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
private String storePathRocksDB = storePathRootDir
+ File.separator + "RocksDB";
@Override public void load() {
RocksDB.loadLibrary();
}
@Override public void start() throws Exception {
try (final Options options = new Options().setCreateIfMissing(true)) {
if (!Files.isSymbolicLink(Paths.get(storePathRocksDB))) {
Files.createDirectories(Paths.get(storePathRocksDB));
}
db = RocksDB.open(options, storePathRocksDB);
} catch (RocksDBException e) {
log.error("Open RocksDb failed. Error:{}", e);
throw e;
}
}
@Override public boolean putData(String key, String value) {
try {
db.put(key.getBytes(), value.getBytes());
return true;
} catch (Exception e) {
log.error("RocksDB put data failed. Error:{}", e);
return false;
}
}
@Override public String getValue(String key) {
try {
byte[] value = db.get(key.getBytes());
if (value != null) {
return new String(value, Charset.forName("UTF-8"));
} else {
return null;
}
} catch (Exception e) {
log.error("RocksDB get value failed. Error:{}", e);
return null;
}
}
@Override public boolean deleteData(String key) {
boolean result = false;
try {
db.delete(key.getBytes());
result = true;
} catch (Exception e) {
log.error("RocksDB delete data failed. Error:{}", e);
}
return result;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store;
public interface MQTTInfoStore {
void load();
void start() throws Exception;
boolean putData(String key, String value);
String getValue(String key);
boolean deleteData(String key);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册