diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerTest.java
index f060ad8e446bfe97a2e568392cd6442727c12f9b..6f378b2274a96962aabc7663da9464a08e4538ee 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerTest.java
@@ -51,7 +51,7 @@ import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;
-@RunWith(MockitoJUnitRunner.class)
+@RunWith(MockitoJUnitRunner.Silent.class)
public class DefaultMQPullConsumerTest {
@Spy
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
index 6c910ae0172450284f6c07a6196c735dac8953ca..7e03f000b3b3331d5fcaca4af7152b4a96939c55 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
@@ -73,7 +73,7 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
-@RunWith(MockitoJUnitRunner.class)
+@RunWith(MockitoJUnitRunner.Silent.class)
public class DefaultMQPushConsumerTest {
private String consumerGroup;
private String topic = "FooBar";
diff --git a/common/pom.xml b/common/pom.xml
index 684b4135bfb91bfab6950218bd35aa69996821ae..cced26d0d89f423954bb214f518d9576bd6e3fda 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -46,7 +46,7 @@
- org.apache.rocketmq
+ ${project.groupId}
rocketmq-remoting
@@ -54,5 +54,5 @@
snakeyaml
-
+
diff --git a/common/src/main/java/org/apache/rocketmq/common/MqttConfig.java b/common/src/main/java/org/apache/rocketmq/common/MqttConfig.java
index 2966a3ba72a6487d04e420e672c3a7dd85be468b..72dcaae4e47270cf0fce501ef4456d355d764c3e 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MqttConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MqttConfig.java
@@ -60,7 +60,6 @@ public class MqttConfig {
this.listenPort = listenPort;
}
-
public boolean isAclEnable() {
return aclEnable;
}
diff --git a/mqtt/pom.xml b/mqtt/pom.xml
index 01b19b8278fe43c6cfa17c2473ee17fda6691f64..13678904fe3ec4aa476aa2da73459260c92dcc67 100644
--- a/mqtt/pom.xml
+++ b/mqtt/pom.xml
@@ -99,5 +99,9 @@
${project.groupId}
rocketmq-broker
+
+ org.eclipse.paho
+ org.eclipse.paho.client.mqttv3
+
diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/IOTClientManagerImpl.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/IOTClientManagerImpl.java
index 5f56d16b041c910f82f2ab200f6581992a19cd7b..fd0507bb10acf6dbdbfb3642f4fe160725eb84eb 100644
--- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/IOTClientManagerImpl.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/IOTClientManagerImpl.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.mqtt.client;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -25,10 +26,10 @@ import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.common.client.ClientManagerImpl;
import org.apache.rocketmq.common.client.Subscription;
import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
+import org.eclipse.paho.client.mqttv3.MqttClient;
public class IOTClientManagerImpl extends ClientManagerImpl {
@@ -36,9 +37,12 @@ public class IOTClientManagerImpl extends ClientManagerImpl {
public static final String IOT_GROUP = "IOT_GROUP";
- private final ConcurrentHashMap>> topic2SubscriptionTable = new ConcurrentHashMap<>(
+ // private final ConcurrentHashMap>> topic2SubscriptionTable = new ConcurrentHashMap<>(
+// 1024);
+ private final ConcurrentHashMap> topic2Clients = new ConcurrentHashMap<>(
1024);
private final ConcurrentHashMap clientId2Subscription = new ConcurrentHashMap<>(1024);
+ private final Map snode2MqttClient = new HashMap<>();
public IOTClientManagerImpl() {
}
@@ -78,7 +82,7 @@ public class IOTClientManagerImpl extends ClientManagerImpl {
}
public void cleanSessionState(String clientId) {
- clientId2Subscription.remove(clientId);
+/* clientId2Subscription.remove(clientId);
for (Iterator>>> iterator = topic2SubscriptionTable.entrySet().iterator(); iterator.hasNext(); ) {
Map.Entry>> next = iterator.next();
for (Iterator>> iterator1 = next.getValue().entrySet().iterator(); iterator1.hasNext(); ) {
@@ -91,7 +95,18 @@ public class IOTClientManagerImpl extends ClientManagerImpl {
if (next.getValue() == null || next.getValue().size() == 0) {
iterator.remove();
}
+ }*/
+ for (Iterator>> iterator = topic2Clients.entrySet().iterator(); iterator.hasNext(); ) {
+ Map.Entry> next = iterator.next();
+ Iterator iterator1 = next.getValue().iterator();
+ while (iterator1.hasNext()) {
+ if (iterator1.next().getClientId().equals(clientId)) {
+ iterator1.remove();
+ }
+ }
}
+ clientId2Subscription.remove(clientId);
+
//remove offline messages
}
@@ -99,8 +114,12 @@ public class IOTClientManagerImpl extends ClientManagerImpl {
return clientId2Subscription.get(clientId);
}
- public ConcurrentHashMap>> getTopic2SubscriptionTable() {
- return topic2SubscriptionTable;
+ /* public ConcurrentHashMap>> getTopic2SubscriptionTable() {
+ return topic2SubscriptionTable;
+ }*/
+
+ public ConcurrentHashMap> getTopic2Clients() {
+ return topic2Clients;
}
public ConcurrentHashMap getClientId2Subscription() {
@@ -110,4 +129,8 @@ public class IOTClientManagerImpl extends ClientManagerImpl {
public void initSubscription(String clientId, Subscription subscription) {
clientId2Subscription.put(clientId, subscription);
}
+
+ public Map getSnode2MqttClient() {
+ return snode2MqttClient;
+ }
}
diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/MessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/MessageHandler.java
index 2598fdb474c578a4ffd29cd36ae900af56824438..6d737802e37a2de07b08db247eb7ab434eb7534e 100644
--- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/MessageHandler.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/MessageHandler.java
@@ -17,13 +17,25 @@
package org.apache.rocketmq.mqtt.mqtthandler;
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.rocketmq.common.client.Client;
+import org.apache.rocketmq.common.client.Subscription;
import org.apache.rocketmq.common.exception.MQClientException;
+import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl;
+import org.apache.rocketmq.mqtt.util.MqttUtil;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
public interface MessageHandler {
@@ -32,5 +44,46 @@ public interface MessageHandler {
*
* @param message
*/
- RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException;
+ RemotingCommand handleMessage(MqttMessage message,
+ RemotingChannel remotingChannel) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException;
+
+ default Set findCurrentNodeClientsTobePublish(String topic, IOTClientManagerImpl iotClientManager) {
+ //find those clients publishing the message to
+ ConcurrentHashMap> topic2Clients = iotClientManager.getTopic2Clients();
+ ConcurrentHashMap clientId2Subscription = iotClientManager.getClientId2Subscription();
+ Set clientsTobePush = new HashSet<>();
+ if (topic2Clients.containsKey(MqttUtil.getRootTopic(topic))) {
+ Set clients = topic2Clients.get(MqttUtil.getRootTopic(topic));
+ for (Client client : clients) {
+ Subscription subscription = clientId2Subscription.get(client.getClientId());
+ Enumeration keys = subscription.getSubscriptionTable().keys();
+ while (keys.hasMoreElements()) {
+ String topicFilter = keys.nextElement();
+ if (MqttUtil.isMatch(topicFilter, topic)) {
+ clientsTobePush.add(client);
+ }
+ }
+ }
+ }
+ return clientsTobePush;
+ }
+
+ default RemotingCommand doResponse(MqttFixedHeader fixedHeader) {
+ if (fixedHeader.qosLevel().value() > 0) {
+ RemotingCommand command = RemotingCommand.createResponseCommand(MqttHeader.class);
+ MqttHeader mqttHeader = (MqttHeader) command.readCustomHeader();
+ if (fixedHeader.qosLevel().equals(MqttQoS.AT_MOST_ONCE)) {
+ mqttHeader.setMessageType(MqttMessageType.PUBACK.value());
+ mqttHeader.setDup(false);
+ mqttHeader.setQosLevel(MqttQoS.AT_MOST_ONCE.value());
+ mqttHeader.setRetain(false);
+ mqttHeader.setRemainingLength(2);
+ mqttHeader.setPacketId(0);
+ } else if (fixedHeader.qosLevel().equals(MqttQoS.AT_LEAST_ONCE)) {
+ //PUBREC/PUBREL/PUBCOMP
+ }
+ return command;
+ }
+ return null;
+ }
}
diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttMessageForwarder.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttMessageForwarder.java
index 7cc9d0299cf3f028c4ef0067fb740d3d1c137795..5718fadb83f96bff4e69a59f74ed354f5cbca577 100644
--- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttMessageForwarder.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttMessageForwarder.java
@@ -17,21 +17,29 @@
package org.apache.rocketmq.mqtt.mqtthandler.impl;
+import io.netty.buffer.ByteBuf;
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
+import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import java.util.Set;
+import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl;
import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
-import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
+import org.apache.rocketmq.mqtt.processor.InnerMqttMessageProcessor;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class MqttMessageForwarder implements MessageHandler {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
- private final DefaultMqttMessageProcessor defaultMqttMessageProcessor;
+ private final InnerMqttMessageProcessor innerMqttMessageProcessor;
- public MqttMessageForwarder(DefaultMqttMessageProcessor processor) {
- this.defaultMqttMessageProcessor = processor;
+ public MqttMessageForwarder(InnerMqttMessageProcessor processor) {
+ this.innerMqttMessageProcessor = processor;
}
/**
@@ -41,6 +49,17 @@ public class MqttMessageForwarder implements MessageHandler {
* @return whether the message is handled successfully
*/
@Override public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
- return null;
+ MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) message;
+ MqttFixedHeader fixedHeader = mqttPublishMessage.fixedHeader();
+ MqttPublishVariableHeader variableHeader = mqttPublishMessage.variableHeader();
+ if (fixedHeader.qosLevel().equals(MqttQoS.AT_MOST_ONCE)) {
+ ByteBuf payload = mqttPublishMessage.payload();
+ //Publish message to clients
+ Set clientsTobePublish = findCurrentNodeClientsTobePublish(variableHeader.topicName(), (IOTClientManagerImpl) this.innerMqttMessageProcessor.getIotClientManager());
+ innerMqttMessageProcessor.getDefaultMqttMessageProcessor().getMqttPushService().pushMessageQos0(variableHeader.topicName(), payload, clientsTobePublish);
+ }else if(fixedHeader.qosLevel().equals(MqttQoS.AT_LEAST_ONCE)){
+ //TODO
+ }
+ return doResponse(fixedHeader);
}
}
diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPublishMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPublishMessageHandler.java
index 43cea3fc9616f44a40c1747580113041f22b8ed7..a8900ce3d6bb28978f42a7df304bc0adc241606f 100644
--- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPublishMessageHandler.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPublishMessageHandler.java
@@ -23,13 +23,21 @@ import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
-import io.netty.handler.codec.mqtt.MqttQoS;
+import io.netty.util.ReferenceCountUtil;
+import java.nio.ByteBuffer;
import java.util.Enumeration;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.rocketmq.common.MqttConfig;
+import org.apache.rocketmq.common.SnodeConfig;
+import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.common.client.Subscription;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.exception.MQClientException;
@@ -37,14 +45,22 @@ import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.GetMaxOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.MqttSubscriptionData;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.service.EnodeService;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl;
+import org.apache.rocketmq.mqtt.client.MQTTSession;
+import org.apache.rocketmq.mqtt.constant.MqttConstant;
import org.apache.rocketmq.mqtt.exception.WrongMessageTypeException;
import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
@@ -56,13 +72,21 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MqttPublishMessageHandler implements MessageHandler {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
private final DefaultMqttMessageProcessor defaultMqttMessageProcessor;
+ private IOTClientManagerImpl iotClientManager;
+ private EnodeService enodeService;
public MqttPublishMessageHandler(DefaultMqttMessageProcessor processor) {
this.defaultMqttMessageProcessor = processor;
+ this.iotClientManager = (IOTClientManagerImpl) processor.getIotClientManager();
+ this.enodeService = this.defaultMqttMessageProcessor.getEnodeService();
}
@Override
@@ -82,88 +106,178 @@ public class MqttPublishMessageHandler implements MessageHandler {
}
ByteBuf payload = mqttPublishMessage.payload();
- if (fixedHeader.qosLevel().equals(MqttQoS.AT_MOST_ONCE)) {
- defaultMqttMessageProcessor.getMqttPushService().pushMessageQos0(variableHeader.topicName(), payload);
- } else if (fixedHeader.qosLevel().equals(MqttQoS.AT_LEAST_ONCE)) {
- // Store msg and invoke callback to publish msg to subscribers
- // 1. Check if the root topic has been created
- String rootTopic = MqttUtil.getRootTopic(variableHeader.topicName());
- TopicRouteData topicRouteData = null;
-
- try {
- topicRouteData = this.defaultMqttMessageProcessor.getNnodeService().getTopicRouteDataByTopic(rootTopic, false);
- } catch (MQClientException e) {
- log.error("The rootTopic {} does not exist. Please create it first.", rootTopic);
- throw new MQClientException(e.getResponseCode(), e.getErrorMessage());
- }
+ MqttHeader mqttHeader = new MqttHeader();
+ mqttHeader.setTopicName(variableHeader.topicName());
+ mqttHeader.setMessageType(MqttMessageType.PUBLISH.value());
+ mqttHeader.setDup(false);
+ mqttHeader.setQosLevel(fixedHeader.qosLevel().value());
+ mqttHeader.setRetain(false); //set to false tempararily, need to be implemented.
+ switch (fixedHeader.qosLevel()) {
+ case AT_MOST_ONCE:
+ //For clients connected to the current snode and isConnected is true
+ Set clientsTobePublish = findCurrentNodeClientsTobePublish(variableHeader.topicName(), this.iotClientManager);
+
+ for (Client client : clientsTobePublish) {
+ ((MQTTSession) client).pushMessageAtQos(mqttHeader, payload, this.defaultMqttMessageProcessor);
+ }
+
+ //For clients that connected to other snodes, transfer the message to them
+ //pseudo code:
+ //step1. find clients that match the publish topic
+ //step2. remove the clients that connected to the current node(clientsTobePublish)
+ //step2. get snode ips by clients in step2.
+ Set snodeIpsTobeTransfer = new HashSet<>();
+ try {
+ transferMessage(snodeIpsTobeTransfer, variableHeader.topicName(), payload);
+ } catch (MqttException e) {
+ log.error("Transfer message failed: {}", e.getMessage());
+ } finally {
+ ReferenceCountUtil.release(message);
+ }
+ case AT_LEAST_ONCE:
+ // Store msg and invoke callback to publish msg to subscribers
+ // 1. Check if the root topic has been created
+ String rootTopic = MqttUtil.getRootTopic(variableHeader.topicName());
+ TopicRouteData topicRouteData = null;
+
+ try {
+ topicRouteData = this.defaultMqttMessageProcessor.getNnodeService().getTopicRouteDataByTopic(rootTopic, false);
+ } catch (MQClientException e) {
+ log.error("The rootTopic {} does not exist. Please create it first.", rootTopic);
+ throw new MQClientException(e.getResponseCode(), e.getErrorMessage());
+ }
+
+ //2. Store msg
+ List datas = topicRouteData.getBrokerDatas();
+ //select a broker randomly, need to be optimized here
+ BrokerData brokerData = datas.get(new Random().nextInt(datas.size()));
+ RemotingCommand request = createSendMessageRequest(rootTopic, fixedHeader, variableHeader, payload, brokerData.getBrokerName());
+ CompletableFuture responseFuture = this.defaultMqttMessageProcessor.getEnodeService().sendMessage(null, brokerData.getBrokerName(), request);
+ responseFuture.whenComplete((data, ex) -> {
+ if (ex == null) {
+ //publish msg to subscribers
+ try {
+ SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) data.decodeCommandCustomHeader(SendMessageResponseHeader.class);
+ //find clients that subscribed this topic from all snodes and put it to map.
+ Map> snodeAddr2Clients = new HashMap<>();
+
+ //for clientIds connected to current snode, trigger the logic of push message
+ List clients = snodeAddr2Clients.get(this.defaultMqttMessageProcessor.getSnodeConfig().getSnodeIP1());
+ for (Client client : clients) {
+ Subscription subscription = this.iotClientManager.getSubscriptionByClientId(client.getClientId());
+ ConcurrentHashMap subscriptionTable = subscription.getSubscriptionTable();
+
+ //for each client, wrap a task: pull messages from commitlog one by one, and push them if current client subscribe it.
+ Runnable task = new Runnable() {
+
+ @Override
+ public void run() {
+ //compare current consumeOffset of rootTopic@clientId with maxOffset, pull message if consumeOffset < maxOffset
+ long maxOffsetInQueue;
+ try {
+ maxOffsetInQueue = getMaxOffset(brokerData.getBrokerName(), rootTopic);
+ long consumeOffset = enodeService.queryOffset(brokerData.getBrokerName(), client.getClientId(), rootTopic, 0);
+ long i = consumeOffset;
+ while (i < maxOffsetInQueue) {
+ //TODO query messages from enode
+ RemotingCommand response = RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, null);
+ ByteBuffer byteBuffer = ByteBuffer.wrap(response.getBody());
+ MessageExt messageExt = MessageDecoder.clientDecode(byteBuffer, true);
+ final String realTopic = messageExt.getProperty(MessageConst.PROPERTY_REAL_TOPIC);
+
+ boolean needSkip = needSkip(realTopic);
+ if (needSkip) {
+ log.info("Current client doesn't subscribe topic:{}, skip this message", realTopic);
+ maxOffsetInQueue = getMaxOffset(brokerData.getBrokerName(), rootTopic);
+ i += 1;
+ continue;
+ }
+ Integer pushQos = lowerQosToTheSubscriptionDesired(realTopic, Integer.valueOf(messageExt.getProperty(MqttConstant.PROPERTY_MQTT_QOS)));
+ mqttHeader.setQosLevel(pushQos);
+ //push message
+ MQTTSession mqttSession = (MQTTSession) client;
+ mqttSession.pushMessageAtQos(mqttHeader, payload, defaultMqttMessageProcessor);
+
+ maxOffsetInQueue = getMaxOffset(brokerData.getBrokerName(), rootTopic);
+ i += 1;
+ }
+ } catch (Exception ex) {
+ log.error("Get max offset error, remoting: {} error: {} ", remotingChannel.remoteAddress(), ex);
+ }
+ }
+
+ private boolean needSkip(final String realTopic) {
+ Enumeration topicFilters = subscriptionTable.keys();
+ while (topicFilters.hasMoreElements()) {
+ if (MqttUtil.isMatch(topicFilters.nextElement(), realTopic)) {
+ return false;
+ }
+ }
+ return true;
+ }
- //2. Store msg
- List datas = topicRouteData.getBrokerDatas();
- BrokerData brokerData = datas.get(new Random().nextInt(datas.size()));
- RemotingCommand request = createSendMessageRequest(rootTopic, variableHeader, payload, brokerData.getBrokerName());
- CompletableFuture responseFuture = this.defaultMqttMessageProcessor.getEnodeService().sendMessage(null, brokerData.getBrokerName(), request);
- responseFuture.whenComplete((data, ex) -> {
- if (ex == null) {
- //publish msg to subscribers
- try {
- SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) data.decodeCommandCustomHeader(SendMessageResponseHeader.class);
- //find clients that subscribed this topic from all clients and put it to map.
- Map> snodeAddr2ClientIds = new HashMap<>();
-
- //for clientIds connected to current snode, publish msg directly
- List clientIds = snodeAddr2ClientIds.get(this.defaultMqttMessageProcessor.getSnodeConfig().getSnodeIP1() + this.defaultMqttMessageProcessor.getSnodeConfig().getListenPort());
- IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) this.defaultMqttMessageProcessor.getIotClientManager();
- for (String clientId : clientIds) {
- Subscription subscription = iotClientManager.getSubscriptionByClientId(clientId);
- Enumeration topicFilters = subscription.getSubscriptionTable().keys();
- while (topicFilters.hasMoreElements()) {
- String topicFilter = topicFilters.nextElement();
- if(MqttUtil.isMatch(topicFilter, variableHeader.topicName())) {
- long offset = this.defaultMqttMessageProcessor.getEnodeService().queryOffset(brokerData.getBrokerName(), clientId, topicFilter, 0);
- if (offset == -1) {
-// this.defaultMqttMessageProcessor.getEnodeService().persistOffset(null, brokerData.getBrokerName(), clientId, 0, );
+ private Integer lowerQosToTheSubscriptionDesired(String publishTopic,
+ Integer publishingQos) {
+ Integer pushQos = Integer.valueOf(publishingQos);
+ Iterator> iterator = subscriptionTable.entrySet().iterator();
+ Integer maxRequestedQos = 0;
+ while (iterator.hasNext()) {
+ final String topicFilter = iterator.next().getKey();
+ if (MqttUtil.isMatch(topicFilter, publishTopic)) {
+ MqttSubscriptionData mqttSubscriptionData = (MqttSubscriptionData) iterator.next().getValue();
+ maxRequestedQos = mqttSubscriptionData.getQos() > maxRequestedQos ? mqttSubscriptionData.getQos() : maxRequestedQos;
+ }
+ }
+ if (publishingQos > maxRequestedQos) {
+ pushQos = maxRequestedQos;
+ }
+ return pushQos;
}
- }
+ };
+
}
+ //for clientIds connected to other snodes, forward msg
+ } catch (RemotingCommandException e) {
+ e.printStackTrace();
}
- //for clientIds connected to other snodes, forward msg
-
- } catch (RemotingCommandException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (RemotingConnectException e) {
- e.printStackTrace();
- } catch (RemotingTimeoutException e) {
- e.printStackTrace();
- } catch (RemotingSendRequestException e) {
- e.printStackTrace();
+ } else {
+ log.error("Store Qos=1 Message error: {}", ex);
}
- } else {
- log.error("Store Qos=1 Message error: {}", ex);
- }
- });
+ });
}
- if (fixedHeader.qosLevel().value() > 0) {
- RemotingCommand command = RemotingCommand.createResponseCommand(MqttHeader.class);
- MqttHeader mqttHeader = (MqttHeader) command.readCustomHeader();
- if (fixedHeader.qosLevel().equals(MqttQoS.AT_MOST_ONCE)) {
- mqttHeader.setMessageType(MqttMessageType.PUBACK.value());
- mqttHeader.setDup(false);
- mqttHeader.setQosLevel(MqttQoS.AT_MOST_ONCE.value());
- mqttHeader.setRetain(false);
- mqttHeader.setRemainingLength(2);
- mqttHeader.setPacketId(0);
- } else if (fixedHeader.qosLevel().equals(MqttQoS.AT_LEAST_ONCE)) {
- //PUBREC/PUBREL/PUBCOMP
+ return doResponse(fixedHeader);
+ }
+
+ private void transferMessage(Set snodeAddresses, String topic, ByteBuf payload) throws MqttException {
+ SnodeConfig snodeConfig = defaultMqttMessageProcessor.getSnodeConfig();
+ MqttConfig mqttConfig = defaultMqttMessageProcessor.getMqttConfig();
+ String url = "tcp://" + snodeConfig.getSnodeIP1() + ":" + (mqttConfig.getListenPort() - 1);
+ String clientId = defaultMqttMessageProcessor.getSnodeConfig().getSnodeIP1();
+
+ for (String snodeAddress : snodeAddresses) {
+ final Map snode2MqttClient = iotClientManager.getSnode2MqttClient();
+ MqttClient client;
+ if (snode2MqttClient.containsKey(snodeAddresses)) {
+ client = snode2MqttClient.get(snodeAddress);
+ } else {
+ MemoryPersistence persistence = new MemoryPersistence();
+ client = new MqttClient(url, clientId, persistence);
+ MqttConnectOptions connOpts = new MqttConnectOptions();
+ client.connect(connOpts);
+ snode2MqttClient.put(snodeAddress, client);
}
- return command;
+
+ byte[] body = new byte[payload.readableBytes()];
+ payload.readBytes(body);
+ org.eclipse.paho.client.mqttv3.MqttMessage message = new org.eclipse.paho.client.mqttv3.MqttMessage(body);
+ message.setQos(0);
+ client.publish(topic, message);
}
- return null;
}
- private RemotingCommand createSendMessageRequest(String rootTopic, MqttPublishVariableHeader variableHeader,
+ private RemotingCommand createSendMessageRequest(String rootTopic, MqttFixedHeader fixedHeader,
+ MqttPublishVariableHeader variableHeader,
ByteBuf payload, String enodeName) {
byte[] body = new byte[payload.readableBytes()];
payload.readBytes(body);
@@ -176,9 +290,20 @@ public class MqttPublishMessageHandler implements MessageHandler {
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setBatch(false);
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, variableHeader.topicName());
+ MessageAccessor.putProperty(msg, MqttConstant.PROPERTY_MQTT_QOS, fixedHeader.qosLevel().name());
requestHeader.setEnodeName(enodeName);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
request.setBody(msg.getBody());
return request;
}
+
+ private long getMaxOffset(String enodeName,
+ String topic) throws InterruptedException, RemotingTimeoutException, RemotingCommandException, RemotingSendRequestException, RemotingConnectException {
+ GetMaxOffsetRequestHeader requestHeader = new GetMaxOffsetRequestHeader();
+ requestHeader.setTopic(topic);
+ requestHeader.setQueueId(0);
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MAX_OFFSET, requestHeader);
+
+ return this.defaultMqttMessageProcessor.getEnodeService().getMaxOffsetInQueue(enodeName, topic, 0, request);
+ }
}
diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttSubscribeMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttSubscribeMessageHandler.java
index 31cfd0c2c0774b742e6e599563116fc408e21916..370d7aa7de288aed6e211a8493010a42af80575d 100644
--- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttSubscribeMessageHandler.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttSubscribeMessageHandler.java
@@ -118,8 +118,8 @@ public class MqttSubscribeMessageHandler implements MessageHandler {
//do the logic when client sends subscribe packet.
//1.update clientId2Subscription
ConcurrentHashMap clientId2Subscription = iotClientManager.getClientId2Subscription();
- ConcurrentHashMap>> topic2SubscriptionTable = iotClientManager.getTopic2SubscriptionTable();
- Subscription subscription = null;
+ ConcurrentHashMap> topic2Clients = iotClientManager.getTopic2Clients();
+ Subscription subscription;
if (clientId2Subscription.containsKey(client.getClientId())) {
subscription = clientId2Subscription.get(client.getClientId());
} else {
@@ -133,23 +133,17 @@ public class MqttSubscribeMessageHandler implements MessageHandler {
grantQoss.add(actualQos);
SubscriptionData subscriptionData = new MqttSubscriptionData(mqttTopicSubscription.qualityOfService().value(), client.getClientId(), mqttTopicSubscription.topicName());
subscriptionDatas.put(mqttTopicSubscription.topicName(), subscriptionData);
- //2.update topic2SubscriptionTable
+ //2.update topic2ClientIds
String rootTopic = MqttUtil.getRootTopic(mqttTopicSubscription.topicName());
- ConcurrentHashMap> client2SubscriptionData = topic2SubscriptionTable.get(rootTopic);
- if (client2SubscriptionData == null || client2SubscriptionData.size() == 0) {
- client2SubscriptionData = new ConcurrentHashMap<>();
- ConcurrentHashMap> prev = topic2SubscriptionTable.putIfAbsent(rootTopic, client2SubscriptionData);
+ if (topic2Clients.contains(rootTopic)) {
+ final Set clientIds = topic2Clients.get(rootTopic);
+ clientIds.add(client);
+ } else {
+ Set clients = new HashSet<>();
+ clients.add(client);
+ Set prev = topic2Clients.putIfAbsent(rootTopic, clients);
if (prev != null) {
- client2SubscriptionData = prev;
- }
- Set subscriptionDataSet = client2SubscriptionData.get(client);
- if (subscriptionDataSet == null) {
- subscriptionDataSet = new HashSet<>();
- Set prevSubscriptionDataSet = client2SubscriptionData.putIfAbsent(client, subscriptionDataSet);
- if (prevSubscriptionDataSet != null) {
- subscriptionDataSet = prevSubscriptionDataSet;
- }
- subscriptionDataSet.add(subscriptionData);
+ prev.add(client);
}
}
}
diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttUnsubscribeMessagHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttUnsubscribeMessagHandler.java
index 34c7fdad84092dd670ed1118aba7d510838f919d..c745905d4e4645d303763c4fab2b4adab1ad9dfa 100644
--- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttUnsubscribeMessagHandler.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttUnsubscribeMessagHandler.java
@@ -25,6 +25,7 @@ import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import io.netty.handler.codec.mqtt.MqttUnsubscribePayload;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.client.Client;
@@ -99,27 +100,32 @@ public class MqttUnsubscribeMessagHandler implements MessageHandler {
private void doUnsubscribe(Client client, List topics, IOTClientManagerImpl iotClientManager) {
ConcurrentHashMap clientId2Subscription = iotClientManager.getClientId2Subscription();
- ConcurrentHashMap>> topic2SubscriptionTable = iotClientManager.getTopic2SubscriptionTable();
+ ConcurrentHashMap> topic2Clients = iotClientManager.getTopic2Clients();
+ Subscription subscription = clientId2Subscription.get(client.getClientId());
- for (String topicFilter : topics) {
- //1.update clientId2Subscription
- if (clientId2Subscription.containsKey(client.getClientId())) {
- Subscription subscription = clientId2Subscription.get(client.getClientId());
+ //1.update clientId2Subscription
+ if (clientId2Subscription.containsKey(client.getClientId())) {
+ for (String topicFilter : topics) {
subscription.getSubscriptionTable().remove(topicFilter);
}
- //2.update topic2SubscriptionTable
- String rootTopic = MqttUtil.getRootTopic(topicFilter);
- ConcurrentHashMap> client2SubscriptionData = topic2SubscriptionTable.get(rootTopic);
- if (client2SubscriptionData != null) {
- Set subscriptionDataSet = client2SubscriptionData.get(client);
- if (subscriptionDataSet != null) {
- Iterator iterator = subscriptionDataSet.iterator();
- while (iterator.hasNext()) {
- if (iterator.next().getTopic().equals(topicFilter))
- iterator.remove();
- }
+ }
+
+ for (Iterator>> iterator = topic2Clients.entrySet().iterator(); iterator.hasNext(); ) {
+ Map.Entry> next = iterator.next();
+ String rootTopic = next.getKey();
+ boolean needRemove = true;
+ for (Map.Entry entry : subscription.getSubscriptionTable().entrySet()) {
+ if (MqttUtil.getRootTopic(entry.getKey()).equals(rootTopic)) {
+ needRemove = false;
+ break;
}
}
+ if (needRemove) {
+ next.getValue().remove(client);
+ }
+ if (next.getValue().size() == 0) {
+ iterator.remove();
+ }
}
}
}
diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java
index 24994064ef008d181526b0f325248c67fe2bfb11..fb520e0da2f1c2bb084f346ffbe7685a6e79a28e 100644
--- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java
@@ -30,7 +30,6 @@ import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttSubscribePayload;
-import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Map;
import org.apache.rocketmq.common.MqttConfig;
@@ -61,7 +60,6 @@ import org.apache.rocketmq.mqtt.service.impl.WillMessageServiceImpl;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.RequestProcessor;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
@@ -85,7 +83,8 @@ public class DefaultMqttMessageProcessor implements RequestProcessor {
private EnodeService enodeService;
private NnodeService nnodeService;
- public DefaultMqttMessageProcessor(MqttConfig mqttConfig, SnodeConfig snodeConfig, RemotingServer mqttRemotingServer,
+ public DefaultMqttMessageProcessor(MqttConfig mqttConfig, SnodeConfig snodeConfig,
+ RemotingServer mqttRemotingServer,
EnodeService enodeService, NnodeService nnodeService) {
this.mqttConfig = mqttConfig;
this.snodeConfig = snodeConfig;
@@ -119,7 +118,7 @@ public class DefaultMqttMessageProcessor implements RequestProcessor {
@Override
public RemotingCommand processRequest(RemotingChannel remotingChannel, RemotingCommand message)
- throws RemotingCommandException, UnsupportedEncodingException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
+ throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
MqttHeader mqttHeader = (MqttHeader) message.readCustomHeader();
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.valueOf(mqttHeader.getMessageType()),
mqttHeader.isDup(), MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(),
@@ -132,7 +131,6 @@ public class DefaultMqttMessageProcessor implements RequestProcessor {
mqttHeader.isHasPassword(), mqttHeader.isWillRetain(),
mqttHeader.getWillQos(), mqttHeader.isWillFlag(),
mqttHeader.isCleanSession(), mqttHeader.getKeepAliveTimeSeconds());
-// MqttConnectPayload mqttConnectPayload = (MqttConnectPayload) message.getPayload();
MqttConnectPayload mqttConnectPayload = (MqttConnectPayload) MqttEncodeDecodeUtil.decode(message.getBody(), MqttConnectPayload.class);
mqttMessage = new MqttConnectMessage(fixedHeader, mqttConnectVariableHeader, mqttConnectPayload);
break;
diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/InnerMqttMessageProcessor.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/InnerMqttMessageProcessor.java
new file mode 100644
index 0000000000000000000000000000000000000000..0f50e8e252d10fb900c60d8b1e50cc7056d37b7e
--- /dev/null
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/InnerMqttMessageProcessor.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.processor;
+
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
+import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import org.apache.rocketmq.common.MqttConfig;
+import org.apache.rocketmq.common.SnodeConfig;
+import org.apache.rocketmq.common.client.ClientManager;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.exception.MQClientException;
+import org.apache.rocketmq.common.service.EnodeService;
+import org.apache.rocketmq.common.service.NnodeService;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttMessageForwarder;
+import org.apache.rocketmq.mqtt.service.WillMessageService;
+import org.apache.rocketmq.mqtt.service.impl.MqttPushServiceImpl;
+import org.apache.rocketmq.remoting.RemotingChannel;
+import org.apache.rocketmq.remoting.RemotingServer;
+import org.apache.rocketmq.remoting.RequestProcessor;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
+
+import static io.netty.handler.codec.mqtt.MqttMessageType.PUBLISH;
+
+public class InnerMqttMessageProcessor implements RequestProcessor {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
+
+ private final DefaultMqttMessageProcessor defaultMqttMessageProcessor;
+ private WillMessageService willMessageService;
+ private MqttPushServiceImpl mqttPushService;
+ private ClientManager iotClientManager;
+ private RemotingServer innerMqttRemotingServer;
+ private MqttConfig mqttConfig;
+ private SnodeConfig snodeConfig;
+ private EnodeService enodeService;
+ private NnodeService nnodeService;
+ private MqttMessageForwarder mqttMessageForwarder;
+
+ public InnerMqttMessageProcessor(DefaultMqttMessageProcessor defaultMqttMessageProcessor, RemotingServer innerMqttRemotingServer) {
+ this.defaultMqttMessageProcessor = defaultMqttMessageProcessor;
+ this.willMessageService = this.defaultMqttMessageProcessor.getWillMessageService();
+ this.mqttPushService = this.defaultMqttMessageProcessor.getMqttPushService();
+ this.iotClientManager = this.defaultMqttMessageProcessor.getIotClientManager();
+ this.innerMqttRemotingServer = innerMqttRemotingServer;
+ this.enodeService = this.defaultMqttMessageProcessor.getEnodeService();
+ this.nnodeService = this.defaultMqttMessageProcessor.getNnodeService();
+ this.mqttMessageForwarder = new MqttMessageForwarder(this);
+ }
+
+ @Override
+ public RemotingCommand processRequest(RemotingChannel remotingChannel, RemotingCommand message)
+ throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
+ MqttHeader mqttHeader = (MqttHeader) message.readCustomHeader();
+ if(mqttHeader.getMessageType().equals(PUBLISH)){
+ MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.valueOf(mqttHeader.getMessageType()),
+ mqttHeader.isDup(), MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(),
+ mqttHeader.getRemainingLength());
+ MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(mqttHeader.getTopicName(), mqttHeader.getPacketId());
+ MqttMessage mqttMessage = new MqttPublishMessage(fixedHeader, mqttPublishVariableHeader, Unpooled.copiedBuffer(message.getBody()));
+ return mqttMessageForwarder.handleMessage(mqttMessage, remotingChannel);
+ }else{
+ return defaultMqttMessageProcessor.processRequest(remotingChannel, message);
+ }
+
+ }
+
+ @Override
+ public boolean rejectRequest() {
+ return false;
+ }
+
+ public WillMessageService getWillMessageService() {
+ return willMessageService;
+ }
+
+ public MqttPushServiceImpl getMqttPushService() {
+ return mqttPushService;
+ }
+
+ public ClientManager getIotClientManager() {
+ return iotClientManager;
+ }
+
+ public MqttConfig getMqttConfig() {
+ return mqttConfig;
+ }
+
+ public void setMqttConfig(MqttConfig mqttConfig) {
+ this.mqttConfig = mqttConfig;
+ }
+
+ public SnodeConfig getSnodeConfig() {
+ return snodeConfig;
+ }
+
+ public void setSnodeConfig(SnodeConfig snodeConfig) {
+ this.snodeConfig = snodeConfig;
+ }
+
+ public EnodeService getEnodeService() {
+ return enodeService;
+ }
+
+ public void setEnodeService(EnodeService enodeService) {
+ this.enodeService = enodeService;
+ }
+
+ public NnodeService getNnodeService() {
+ return nnodeService;
+ }
+
+ public void setNnodeService(NnodeService nnodeService) {
+ this.nnodeService = nnodeService;
+ }
+
+ public DefaultMqttMessageProcessor getDefaultMqttMessageProcessor() {
+ return defaultMqttMessageProcessor;
+ }
+}
diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/service/impl/MqttPushServiceImpl.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/service/impl/MqttPushServiceImpl.java
index 9b9b17e10104e2eb3aa7bfe3caad266b5cae4fe8..4e2faacb0481389770d2ed922646ebcd7e2f1fe9 100644
--- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/service/impl/MqttPushServiceImpl.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/service/impl/MqttPushServiceImpl.java
@@ -19,11 +19,8 @@ package org.apache.rocketmq.mqtt.service.impl;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.util.ReferenceCountUtil;
-import java.util.HashSet;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -31,14 +28,11 @@ import org.apache.rocketmq.common.MqttConfig;
import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode;
-import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl;
import org.apache.rocketmq.mqtt.constant.MqttConstant;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
-import org.apache.rocketmq.mqtt.util.MqttUtil;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
@@ -49,7 +43,7 @@ public class MqttPushServiceImpl {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
private ExecutorService pushMqttMessageExecutorService;
- private DefaultMqttMessageProcessor defaultMqttMessageProcessor;
+ private static DefaultMqttMessageProcessor defaultMqttMessageProcessor;
public MqttPushServiceImpl(DefaultMqttMessageProcessor defaultMqttMessageProcessor, MqttConfig mqttConfig) {
this.defaultMqttMessageProcessor = defaultMqttMessageProcessor;
@@ -63,21 +57,23 @@ public class MqttPushServiceImpl {
false);
}
- public class MqttPushTask implements Runnable {
+ static class MqttPushTask implements Runnable {
private AtomicBoolean canceled = new AtomicBoolean(false);
private final ByteBuf message;
private final String topic;
private final Integer qos;
private boolean retain;
private Integer packetId;
+ private Client client;
public MqttPushTask(final String topic, final ByteBuf message, final Integer qos, boolean retain,
- Integer packetId) {
+ Integer packetId, Client client) {
this.message = message;
this.topic = topic;
this.qos = qos;
this.retain = retain;
this.packetId = packetId;
+ this.client = client;
}
@Override
@@ -86,32 +82,14 @@ public class MqttPushServiceImpl {
try {
RemotingCommand requestCommand = buildRequestCommand(topic, qos, retain, packetId);
- //find those clients publishing the message to
- IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) defaultMqttMessageProcessor.getIotClientManager();
- ConcurrentHashMap>> topic2SubscriptionTable = iotClientManager.getTopic2SubscriptionTable();
- Set clients = new HashSet<>();
- if (topic2SubscriptionTable.containsKey(MqttUtil.getRootTopic(topic))) {
- ConcurrentHashMap> client2SubscriptionDatas = topic2SubscriptionTable.get(MqttUtil.getRootTopic(topic));
- for (Map.Entry> entry : client2SubscriptionDatas.entrySet()) {
- Set subscriptionDatas = entry.getValue();
- for (SubscriptionData subscriptionData : subscriptionDatas) {
- if (MqttUtil.isMatch(subscriptionData.getTopic(), topic)) {
- clients.add(entry.getKey());
- break;
- }
- }
- }
- }
- for (Client client : clients) {
- RemotingChannel remotingChannel = client.getRemotingChannel();
- if (client.getRemotingChannel() instanceof NettyChannelHandlerContextImpl) {
- remotingChannel = new NettyChannelImpl(((NettyChannelHandlerContextImpl) client.getRemotingChannel()).getChannelHandlerContext().channel());
- }
- byte[] body = new byte[message.readableBytes()];
- message.readBytes(body);
- requestCommand.setBody(body);
- defaultMqttMessageProcessor.getMqttRemotingServer().push(remotingChannel, requestCommand, MqttConstant.DEFAULT_TIMEOUT_MILLS);
+ RemotingChannel remotingChannel = client.getRemotingChannel();
+ if (client.getRemotingChannel() instanceof NettyChannelHandlerContextImpl) {
+ remotingChannel = new NettyChannelImpl(((NettyChannelHandlerContextImpl) client.getRemotingChannel()).getChannelHandlerContext().channel());
}
+ byte[] body = new byte[message.readableBytes()];
+ message.readBytes(body);
+ requestCommand.setBody(body);
+ defaultMqttMessageProcessor.getMqttRemotingServer().push(remotingChannel, requestCommand, MqttConstant.DEFAULT_TIMEOUT_MILLS);
} catch (Exception ex) {
log.warn("Exception was thrown when pushing MQTT message to topic: {}, exception={}", topic, ex.getMessage());
} finally {
@@ -147,15 +125,21 @@ public class MqttPushServiceImpl {
}
- public void pushMessageQos0(final String topic, final ByteBuf message) {
- MqttPushTask pushTask = new MqttPushTask(topic, message, 0, false, 0);
- pushMqttMessageExecutorService.submit(pushTask);
+ public void pushMessageQos0(final String topic, final ByteBuf message, Set clientsTobePublish) {
+ //For clientIds connected to the current snode
+ for (Client client : clientsTobePublish) {
+ MqttPushTask pushTask = new MqttPushTask(topic, message, 0, false, 0, client);
+ pushMqttMessageExecutorService.submit(pushTask);
+ }
+
}
- public void pushMessageQos1(final String topic, final ByteBuf message, final Integer qos, boolean retain,
- Integer packetId) {
- MqttPushTask pushTask = new MqttPushTask(topic, message, qos, retain, packetId);
- pushMqttMessageExecutorService.submit(pushTask);
+ public void pushMessageQos1(final String topic, final ByteBuf message, boolean retain, Integer packetId,
+ Set clientsTobePublish) {
+ for (Client client : clientsTobePublish) {
+ MqttPushTask pushTask = new MqttPushTask(topic, message, 1, retain, packetId, client);
+ pushMqttMessageExecutorService.submit(pushTask);
+ }
}
public void shutdown() {
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
index 03dac9e05d904cb772c86d8a609989a1af834544..2d0ae0c6435c6264af7dfe1f63737e837ce19b4e 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
@@ -30,10 +30,17 @@ import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.client.ClientManager;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.service.ClientService;
+import org.apache.rocketmq.common.service.EnodeService;
+import org.apache.rocketmq.common.service.MetricsService;
+import org.apache.rocketmq.common.service.NnodeService;
+import org.apache.rocketmq.common.service.PushService;
+import org.apache.rocketmq.common.service.ScheduledService;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
+import org.apache.rocketmq.mqtt.processor.InnerMqttMessageProcessor;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.RemotingClientFactory;
@@ -62,12 +69,6 @@ import org.apache.rocketmq.snode.processor.ConsumerManageProcessor;
import org.apache.rocketmq.snode.processor.HeartbeatProcessor;
import org.apache.rocketmq.snode.processor.PullMessageProcessor;
import org.apache.rocketmq.snode.processor.SendMessageProcessor;
-import org.apache.rocketmq.common.service.ClientService;
-import org.apache.rocketmq.common.service.EnodeService;
-import org.apache.rocketmq.common.service.MetricsService;
-import org.apache.rocketmq.common.service.NnodeService;
-import org.apache.rocketmq.common.service.PushService;
-import org.apache.rocketmq.common.service.ScheduledService;
import org.apache.rocketmq.snode.service.impl.ClientServiceImpl;
import org.apache.rocketmq.snode.service.impl.LocalEnodeServiceImpl;
import org.apache.rocketmq.snode.service.impl.MetricsServiceImpl;
@@ -91,6 +92,7 @@ public class SnodeController {
private RemotingServer snodeServer;
private RemotingClient mqttRemotingClient;
private RemotingServer mqttRemotingServer;
+ private RemotingServer innerMqttRemotingServer;
private ExecutorService sendMessageExecutor;
private ExecutorService handleMqttMessageExecutor;
private ExecutorService heartbeatExecutor;
@@ -101,7 +103,6 @@ public class SnodeController {
private ScheduledService scheduledService;
private ClientManager producerManager;
private ClientManager consumerManager;
-// private ClientManager iotClientManager;
private SubscriptionManager subscriptionManager;
private ClientHousekeepingService clientHousekeepingService;
private SubscriptionGroupManager subscriptionGroupManager;
@@ -111,6 +112,7 @@ public class SnodeController {
private PullMessageProcessor pullMessageProcessor;
private HeartbeatProcessor heartbeatProcessor;
private DefaultMqttMessageProcessor defaultMqttMessageProcessor;
+ private InnerMqttMessageProcessor innerMqttMessageProcessor;
private InterceptorGroup remotingServerInterceptorGroup;
private InterceptorGroup consumeMessageInterceptorGroup;
private InterceptorGroup sendMessageInterceptorGroup;
@@ -118,14 +120,12 @@ public class SnodeController {
private ClientService clientService;
private SlowConsumerService slowConsumerService;
private MetricsService metricsService;
-// private WillMessageService willMessageService;
-// private MqttPushServiceImpl mqttPushService;
private final ScheduledExecutorService scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"SnodeControllerScheduledThread"));
- public SnodeController(SnodeConfig snodeConfig, MqttConfig mqttConfig) {
+ public SnodeController(SnodeConfig snodeConfig, MqttConfig mqttConfig) throws CloneNotSupportedException {
this.nettyClientConfig = snodeConfig.getNettyClientConfig();
this.nettyServerConfig = snodeConfig.getNettyServerConfig();
this.mqttServerConfig = mqttConfig.getMqttServerConfig();
@@ -155,6 +155,14 @@ public class SnodeController {
this.mqttRemotingServer.init(this.mqttServerConfig, this.clientHousekeepingService);
this.mqttRemotingServer.registerInterceptorGroup(this.remotingServerInterceptorGroup);
}
+ this.innerMqttRemotingServer = RemotingServerFactory.getInstance().createRemotingServer(
+ RemotingUtil.MQTT_PROTOCOL);
+ ServerConfig innerMqttServerConfig = (ServerConfig)mqttServerConfig.clone();
+ innerMqttServerConfig.setListenPort(mqttServerConfig.getListenPort() - 1);
+ if (this.innerMqttRemotingServer != null) {
+ this.innerMqttRemotingServer.init(innerMqttServerConfig, this.clientHousekeepingService);
+ this.innerMqttRemotingServer.registerInterceptorGroup(this.remotingServerInterceptorGroup);
+ }
this.sendMessageExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeSendMessageMinPoolSize(),
snodeConfig.getSnodeSendMessageMaxPoolSize(),
@@ -212,7 +220,8 @@ public class SnodeController {
this.sendMessageProcessor = new SendMessageProcessor(this);
this.heartbeatProcessor = new HeartbeatProcessor(this);
this.pullMessageProcessor = new PullMessageProcessor(this);
- this.defaultMqttMessageProcessor = new DefaultMqttMessageProcessor(this.mqttConfig, mqttRemotingServer, enodeService, nnodeService);
+ this.defaultMqttMessageProcessor = new DefaultMqttMessageProcessor(this.mqttConfig, this.snodeConfig, mqttRemotingServer, enodeService, nnodeService);
+ this.innerMqttMessageProcessor = new InnerMqttMessageProcessor(this.defaultMqttMessageProcessor, innerMqttRemotingServer);
this.pushService = new PushServiceImpl(this);
this.clientService = new ClientServiceImpl(this);
this.subscriptionManager = new SubscriptionManagerImpl();
@@ -352,6 +361,9 @@ public class SnodeController {
if (mqttRemotingServer != null) {
this.mqttRemotingServer.registerProcessor(RequestCode.MQTT_MESSAGE, defaultMqttMessageProcessor, handleMqttMessageExecutor);
}
+ if (innerMqttRemotingServer != null) {
+ this.innerMqttRemotingServer.registerProcessor(RequestCode.MQTT_MESSAGE, innerMqttMessageProcessor, handleMqttMessageExecutor);
+ }
}
public void start() {