From e010ca74d6e19eb6c5ee1c729e7a8be7e79ac356 Mon Sep 17 00:00:00 2001 From: chengxiangwang Date: Tue, 7 May 2019 15:02:43 +0800 Subject: [PATCH] polish data structure of IOTClientManager;add message forward logic --- .../consumer/DefaultMQPullConsumerTest.java | 2 +- .../consumer/DefaultMQPushConsumerTest.java | 2 +- common/pom.xml | 4 +- .../apache/rocketmq/common/MqttConfig.java | 1 - mqtt/pom.xml | 4 + .../mqtt/client/IOTClientManagerImpl.java | 33 ++- .../mqtt/mqtthandler/MessageHandler.java | 55 +++- .../impl/MqttMessageForwarder.java | 29 +- .../impl/MqttPublishMessageHandler.java | 269 +++++++++++++----- .../impl/MqttSubscribeMessageHandler.java | 28 +- .../impl/MqttUnsubscribeMessagHandler.java | 38 +-- .../DefaultMqttMessageProcessor.java | 8 +- .../processor/InnerMqttMessageProcessor.java | 144 ++++++++++ .../service/impl/MqttPushServiceImpl.java | 66 ++--- .../rocketmq/snode/SnodeController.java | 34 ++- 15 files changed, 539 insertions(+), 178 deletions(-) create mode 100644 mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/InnerMqttMessageProcessor.java diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerTest.java index f060ad8e..6f378b22 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerTest.java @@ -51,7 +51,7 @@ import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.when; -@RunWith(MockitoJUnitRunner.class) +@RunWith(MockitoJUnitRunner.Silent.class) public class DefaultMQPullConsumerTest { @Spy private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java index 6c910ae0..7e03f000 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java @@ -73,7 +73,7 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; -@RunWith(MockitoJUnitRunner.class) +@RunWith(MockitoJUnitRunner.Silent.class) public class DefaultMQPushConsumerTest { private String consumerGroup; private String topic = "FooBar"; diff --git a/common/pom.xml b/common/pom.xml index 684b4135..cced26d0 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -46,7 +46,7 @@ - org.apache.rocketmq + ${project.groupId} rocketmq-remoting @@ -54,5 +54,5 @@ snakeyaml - + diff --git a/common/src/main/java/org/apache/rocketmq/common/MqttConfig.java b/common/src/main/java/org/apache/rocketmq/common/MqttConfig.java index 2966a3ba..72dcaae4 100644 --- a/common/src/main/java/org/apache/rocketmq/common/MqttConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/MqttConfig.java @@ -60,7 +60,6 @@ public class MqttConfig { this.listenPort = listenPort; } - public boolean isAclEnable() { return aclEnable; } diff --git a/mqtt/pom.xml b/mqtt/pom.xml index 01b19b82..13678904 100644 --- a/mqtt/pom.xml +++ b/mqtt/pom.xml @@ -99,5 +99,9 @@ ${project.groupId} rocketmq-broker + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/IOTClientManagerImpl.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/IOTClientManagerImpl.java index 5f56d16b..fd0507bb 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/IOTClientManagerImpl.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/IOTClientManagerImpl.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.mqtt.client; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -25,10 +26,10 @@ import org.apache.rocketmq.common.client.Client; import org.apache.rocketmq.common.client.ClientManagerImpl; import org.apache.rocketmq.common.client.Subscription; import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.RemotingChannel; +import org.eclipse.paho.client.mqttv3.MqttClient; public class IOTClientManagerImpl extends ClientManagerImpl { @@ -36,9 +37,12 @@ public class IOTClientManagerImpl extends ClientManagerImpl { public static final String IOT_GROUP = "IOT_GROUP"; - private final ConcurrentHashMap>> topic2SubscriptionTable = new ConcurrentHashMap<>( + // private final ConcurrentHashMap>> topic2SubscriptionTable = new ConcurrentHashMap<>( +// 1024); + private final ConcurrentHashMap> topic2Clients = new ConcurrentHashMap<>( 1024); private final ConcurrentHashMap clientId2Subscription = new ConcurrentHashMap<>(1024); + private final Map snode2MqttClient = new HashMap<>(); public IOTClientManagerImpl() { } @@ -78,7 +82,7 @@ public class IOTClientManagerImpl extends ClientManagerImpl { } public void cleanSessionState(String clientId) { - clientId2Subscription.remove(clientId); +/* clientId2Subscription.remove(clientId); for (Iterator>>> iterator = topic2SubscriptionTable.entrySet().iterator(); iterator.hasNext(); ) { Map.Entry>> next = iterator.next(); for (Iterator>> iterator1 = next.getValue().entrySet().iterator(); iterator1.hasNext(); ) { @@ -91,7 +95,18 @@ public class IOTClientManagerImpl extends ClientManagerImpl { if (next.getValue() == null || next.getValue().size() == 0) { iterator.remove(); } + }*/ + for (Iterator>> iterator = topic2Clients.entrySet().iterator(); iterator.hasNext(); ) { + Map.Entry> next = iterator.next(); + Iterator iterator1 = next.getValue().iterator(); + while (iterator1.hasNext()) { + if (iterator1.next().getClientId().equals(clientId)) { + iterator1.remove(); + } + } } + clientId2Subscription.remove(clientId); + //remove offline messages } @@ -99,8 +114,12 @@ public class IOTClientManagerImpl extends ClientManagerImpl { return clientId2Subscription.get(clientId); } - public ConcurrentHashMap>> getTopic2SubscriptionTable() { - return topic2SubscriptionTable; + /* public ConcurrentHashMap>> getTopic2SubscriptionTable() { + return topic2SubscriptionTable; + }*/ + + public ConcurrentHashMap> getTopic2Clients() { + return topic2Clients; } public ConcurrentHashMap getClientId2Subscription() { @@ -110,4 +129,8 @@ public class IOTClientManagerImpl extends ClientManagerImpl { public void initSubscription(String clientId, Subscription subscription) { clientId2Subscription.put(clientId, subscription); } + + public Map getSnode2MqttClient() { + return snode2MqttClient; + } } diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/MessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/MessageHandler.java index 2598fdb4..6d737802 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/MessageHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/MessageHandler.java @@ -17,13 +17,25 @@ package org.apache.rocketmq.mqtt.mqtthandler; +import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttQoS; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.rocketmq.common.client.Client; +import org.apache.rocketmq.common.client.Subscription; import org.apache.rocketmq.common.exception.MQClientException; +import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl; +import org.apache.rocketmq.mqtt.util.MqttUtil; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; public interface MessageHandler { @@ -32,5 +44,46 @@ public interface MessageHandler { * * @param message */ - RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException; + RemotingCommand handleMessage(MqttMessage message, + RemotingChannel remotingChannel) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException; + + default Set findCurrentNodeClientsTobePublish(String topic, IOTClientManagerImpl iotClientManager) { + //find those clients publishing the message to + ConcurrentHashMap> topic2Clients = iotClientManager.getTopic2Clients(); + ConcurrentHashMap clientId2Subscription = iotClientManager.getClientId2Subscription(); + Set clientsTobePush = new HashSet<>(); + if (topic2Clients.containsKey(MqttUtil.getRootTopic(topic))) { + Set clients = topic2Clients.get(MqttUtil.getRootTopic(topic)); + for (Client client : clients) { + Subscription subscription = clientId2Subscription.get(client.getClientId()); + Enumeration keys = subscription.getSubscriptionTable().keys(); + while (keys.hasMoreElements()) { + String topicFilter = keys.nextElement(); + if (MqttUtil.isMatch(topicFilter, topic)) { + clientsTobePush.add(client); + } + } + } + } + return clientsTobePush; + } + + default RemotingCommand doResponse(MqttFixedHeader fixedHeader) { + if (fixedHeader.qosLevel().value() > 0) { + RemotingCommand command = RemotingCommand.createResponseCommand(MqttHeader.class); + MqttHeader mqttHeader = (MqttHeader) command.readCustomHeader(); + if (fixedHeader.qosLevel().equals(MqttQoS.AT_MOST_ONCE)) { + mqttHeader.setMessageType(MqttMessageType.PUBACK.value()); + mqttHeader.setDup(false); + mqttHeader.setQosLevel(MqttQoS.AT_MOST_ONCE.value()); + mqttHeader.setRetain(false); + mqttHeader.setRemainingLength(2); + mqttHeader.setPacketId(0); + } else if (fixedHeader.qosLevel().equals(MqttQoS.AT_LEAST_ONCE)) { + //PUBREC/PUBREL/PUBCOMP + } + return command; + } + return null; + } } diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttMessageForwarder.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttMessageForwarder.java index 7cc9d029..5718fadb 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttMessageForwarder.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttMessageForwarder.java @@ -17,21 +17,29 @@ package org.apache.rocketmq.mqtt.mqtthandler.impl; +import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttPublishMessage; +import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; +import io.netty.handler.codec.mqtt.MqttQoS; +import java.util.Set; +import org.apache.rocketmq.common.client.Client; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl; import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler; -import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; +import org.apache.rocketmq.mqtt.processor.InnerMqttMessageProcessor; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.protocol.RemotingCommand; public class MqttMessageForwarder implements MessageHandler { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); - private final DefaultMqttMessageProcessor defaultMqttMessageProcessor; + private final InnerMqttMessageProcessor innerMqttMessageProcessor; - public MqttMessageForwarder(DefaultMqttMessageProcessor processor) { - this.defaultMqttMessageProcessor = processor; + public MqttMessageForwarder(InnerMqttMessageProcessor processor) { + this.innerMqttMessageProcessor = processor; } /** @@ -41,6 +49,17 @@ public class MqttMessageForwarder implements MessageHandler { * @return whether the message is handled successfully */ @Override public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) { - return null; + MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) message; + MqttFixedHeader fixedHeader = mqttPublishMessage.fixedHeader(); + MqttPublishVariableHeader variableHeader = mqttPublishMessage.variableHeader(); + if (fixedHeader.qosLevel().equals(MqttQoS.AT_MOST_ONCE)) { + ByteBuf payload = mqttPublishMessage.payload(); + //Publish message to clients + Set clientsTobePublish = findCurrentNodeClientsTobePublish(variableHeader.topicName(), (IOTClientManagerImpl) this.innerMqttMessageProcessor.getIotClientManager()); + innerMqttMessageProcessor.getDefaultMqttMessageProcessor().getMqttPushService().pushMessageQos0(variableHeader.topicName(), payload, clientsTobePublish); + }else if(fixedHeader.qosLevel().equals(MqttQoS.AT_LEAST_ONCE)){ + //TODO + } + return doResponse(fixedHeader); } } diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPublishMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPublishMessageHandler.java index 43cea3fc..a8900ce3 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPublishMessageHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPublishMessageHandler.java @@ -23,13 +23,21 @@ import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; -import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.util.ReferenceCountUtil; +import java.nio.ByteBuffer; import java.util.Enumeration; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.rocketmq.common.MqttConfig; +import org.apache.rocketmq.common.SnodeConfig; +import org.apache.rocketmq.common.client.Client; import org.apache.rocketmq.common.client.Subscription; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.exception.MQClientException; @@ -37,14 +45,22 @@ import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.header.GetMaxOffsetRequestHeader; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader; +import org.apache.rocketmq.common.protocol.heartbeat.MqttSubscriptionData; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.common.service.EnodeService; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl; +import org.apache.rocketmq.mqtt.client.MQTTSession; +import org.apache.rocketmq.mqtt.constant.MqttConstant; import org.apache.rocketmq.mqtt.exception.WrongMessageTypeException; import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler; import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; @@ -56,13 +72,21 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class MqttPublishMessageHandler implements MessageHandler { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); private final DefaultMqttMessageProcessor defaultMqttMessageProcessor; + private IOTClientManagerImpl iotClientManager; + private EnodeService enodeService; public MqttPublishMessageHandler(DefaultMqttMessageProcessor processor) { this.defaultMqttMessageProcessor = processor; + this.iotClientManager = (IOTClientManagerImpl) processor.getIotClientManager(); + this.enodeService = this.defaultMqttMessageProcessor.getEnodeService(); } @Override @@ -82,88 +106,178 @@ public class MqttPublishMessageHandler implements MessageHandler { } ByteBuf payload = mqttPublishMessage.payload(); - if (fixedHeader.qosLevel().equals(MqttQoS.AT_MOST_ONCE)) { - defaultMqttMessageProcessor.getMqttPushService().pushMessageQos0(variableHeader.topicName(), payload); - } else if (fixedHeader.qosLevel().equals(MqttQoS.AT_LEAST_ONCE)) { - // Store msg and invoke callback to publish msg to subscribers - // 1. Check if the root topic has been created - String rootTopic = MqttUtil.getRootTopic(variableHeader.topicName()); - TopicRouteData topicRouteData = null; - - try { - topicRouteData = this.defaultMqttMessageProcessor.getNnodeService().getTopicRouteDataByTopic(rootTopic, false); - } catch (MQClientException e) { - log.error("The rootTopic {} does not exist. Please create it first.", rootTopic); - throw new MQClientException(e.getResponseCode(), e.getErrorMessage()); - } + MqttHeader mqttHeader = new MqttHeader(); + mqttHeader.setTopicName(variableHeader.topicName()); + mqttHeader.setMessageType(MqttMessageType.PUBLISH.value()); + mqttHeader.setDup(false); + mqttHeader.setQosLevel(fixedHeader.qosLevel().value()); + mqttHeader.setRetain(false); //set to false tempararily, need to be implemented. + switch (fixedHeader.qosLevel()) { + case AT_MOST_ONCE: + //For clients connected to the current snode and isConnected is true + Set clientsTobePublish = findCurrentNodeClientsTobePublish(variableHeader.topicName(), this.iotClientManager); + + for (Client client : clientsTobePublish) { + ((MQTTSession) client).pushMessageAtQos(mqttHeader, payload, this.defaultMqttMessageProcessor); + } + + //For clients that connected to other snodes, transfer the message to them + //pseudo code: + //step1. find clients that match the publish topic + //step2. remove the clients that connected to the current node(clientsTobePublish) + //step2. get snode ips by clients in step2. + Set snodeIpsTobeTransfer = new HashSet<>(); + try { + transferMessage(snodeIpsTobeTransfer, variableHeader.topicName(), payload); + } catch (MqttException e) { + log.error("Transfer message failed: {}", e.getMessage()); + } finally { + ReferenceCountUtil.release(message); + } + case AT_LEAST_ONCE: + // Store msg and invoke callback to publish msg to subscribers + // 1. Check if the root topic has been created + String rootTopic = MqttUtil.getRootTopic(variableHeader.topicName()); + TopicRouteData topicRouteData = null; + + try { + topicRouteData = this.defaultMqttMessageProcessor.getNnodeService().getTopicRouteDataByTopic(rootTopic, false); + } catch (MQClientException e) { + log.error("The rootTopic {} does not exist. Please create it first.", rootTopic); + throw new MQClientException(e.getResponseCode(), e.getErrorMessage()); + } + + //2. Store msg + List datas = topicRouteData.getBrokerDatas(); + //select a broker randomly, need to be optimized here + BrokerData brokerData = datas.get(new Random().nextInt(datas.size())); + RemotingCommand request = createSendMessageRequest(rootTopic, fixedHeader, variableHeader, payload, brokerData.getBrokerName()); + CompletableFuture responseFuture = this.defaultMqttMessageProcessor.getEnodeService().sendMessage(null, brokerData.getBrokerName(), request); + responseFuture.whenComplete((data, ex) -> { + if (ex == null) { + //publish msg to subscribers + try { + SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) data.decodeCommandCustomHeader(SendMessageResponseHeader.class); + //find clients that subscribed this topic from all snodes and put it to map. + Map> snodeAddr2Clients = new HashMap<>(); + + //for clientIds connected to current snode, trigger the logic of push message + List clients = snodeAddr2Clients.get(this.defaultMqttMessageProcessor.getSnodeConfig().getSnodeIP1()); + for (Client client : clients) { + Subscription subscription = this.iotClientManager.getSubscriptionByClientId(client.getClientId()); + ConcurrentHashMap subscriptionTable = subscription.getSubscriptionTable(); + + //for each client, wrap a task: pull messages from commitlog one by one, and push them if current client subscribe it. + Runnable task = new Runnable() { + + @Override + public void run() { + //compare current consumeOffset of rootTopic@clientId with maxOffset, pull message if consumeOffset < maxOffset + long maxOffsetInQueue; + try { + maxOffsetInQueue = getMaxOffset(brokerData.getBrokerName(), rootTopic); + long consumeOffset = enodeService.queryOffset(brokerData.getBrokerName(), client.getClientId(), rootTopic, 0); + long i = consumeOffset; + while (i < maxOffsetInQueue) { + //TODO query messages from enode + RemotingCommand response = RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, null); + ByteBuffer byteBuffer = ByteBuffer.wrap(response.getBody()); + MessageExt messageExt = MessageDecoder.clientDecode(byteBuffer, true); + final String realTopic = messageExt.getProperty(MessageConst.PROPERTY_REAL_TOPIC); + + boolean needSkip = needSkip(realTopic); + if (needSkip) { + log.info("Current client doesn't subscribe topic:{}, skip this message", realTopic); + maxOffsetInQueue = getMaxOffset(brokerData.getBrokerName(), rootTopic); + i += 1; + continue; + } + Integer pushQos = lowerQosToTheSubscriptionDesired(realTopic, Integer.valueOf(messageExt.getProperty(MqttConstant.PROPERTY_MQTT_QOS))); + mqttHeader.setQosLevel(pushQos); + //push message + MQTTSession mqttSession = (MQTTSession) client; + mqttSession.pushMessageAtQos(mqttHeader, payload, defaultMqttMessageProcessor); + + maxOffsetInQueue = getMaxOffset(brokerData.getBrokerName(), rootTopic); + i += 1; + } + } catch (Exception ex) { + log.error("Get max offset error, remoting: {} error: {} ", remotingChannel.remoteAddress(), ex); + } + } + + private boolean needSkip(final String realTopic) { + Enumeration topicFilters = subscriptionTable.keys(); + while (topicFilters.hasMoreElements()) { + if (MqttUtil.isMatch(topicFilters.nextElement(), realTopic)) { + return false; + } + } + return true; + } - //2. Store msg - List datas = topicRouteData.getBrokerDatas(); - BrokerData brokerData = datas.get(new Random().nextInt(datas.size())); - RemotingCommand request = createSendMessageRequest(rootTopic, variableHeader, payload, brokerData.getBrokerName()); - CompletableFuture responseFuture = this.defaultMqttMessageProcessor.getEnodeService().sendMessage(null, brokerData.getBrokerName(), request); - responseFuture.whenComplete((data, ex) -> { - if (ex == null) { - //publish msg to subscribers - try { - SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) data.decodeCommandCustomHeader(SendMessageResponseHeader.class); - //find clients that subscribed this topic from all clients and put it to map. - Map> snodeAddr2ClientIds = new HashMap<>(); - - //for clientIds connected to current snode, publish msg directly - List clientIds = snodeAddr2ClientIds.get(this.defaultMqttMessageProcessor.getSnodeConfig().getSnodeIP1() + this.defaultMqttMessageProcessor.getSnodeConfig().getListenPort()); - IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) this.defaultMqttMessageProcessor.getIotClientManager(); - for (String clientId : clientIds) { - Subscription subscription = iotClientManager.getSubscriptionByClientId(clientId); - Enumeration topicFilters = subscription.getSubscriptionTable().keys(); - while (topicFilters.hasMoreElements()) { - String topicFilter = topicFilters.nextElement(); - if(MqttUtil.isMatch(topicFilter, variableHeader.topicName())) { - long offset = this.defaultMqttMessageProcessor.getEnodeService().queryOffset(brokerData.getBrokerName(), clientId, topicFilter, 0); - if (offset == -1) { -// this.defaultMqttMessageProcessor.getEnodeService().persistOffset(null, brokerData.getBrokerName(), clientId, 0, ); + private Integer lowerQosToTheSubscriptionDesired(String publishTopic, + Integer publishingQos) { + Integer pushQos = Integer.valueOf(publishingQos); + Iterator> iterator = subscriptionTable.entrySet().iterator(); + Integer maxRequestedQos = 0; + while (iterator.hasNext()) { + final String topicFilter = iterator.next().getKey(); + if (MqttUtil.isMatch(topicFilter, publishTopic)) { + MqttSubscriptionData mqttSubscriptionData = (MqttSubscriptionData) iterator.next().getValue(); + maxRequestedQos = mqttSubscriptionData.getQos() > maxRequestedQos ? mqttSubscriptionData.getQos() : maxRequestedQos; + } + } + if (publishingQos > maxRequestedQos) { + pushQos = maxRequestedQos; + } + return pushQos; } - } + }; + } + //for clientIds connected to other snodes, forward msg + } catch (RemotingCommandException e) { + e.printStackTrace(); } - //for clientIds connected to other snodes, forward msg - - } catch (RemotingCommandException e) { - e.printStackTrace(); - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (RemotingConnectException e) { - e.printStackTrace(); - } catch (RemotingTimeoutException e) { - e.printStackTrace(); - } catch (RemotingSendRequestException e) { - e.printStackTrace(); + } else { + log.error("Store Qos=1 Message error: {}", ex); } - } else { - log.error("Store Qos=1 Message error: {}", ex); - } - }); + }); } - if (fixedHeader.qosLevel().value() > 0) { - RemotingCommand command = RemotingCommand.createResponseCommand(MqttHeader.class); - MqttHeader mqttHeader = (MqttHeader) command.readCustomHeader(); - if (fixedHeader.qosLevel().equals(MqttQoS.AT_MOST_ONCE)) { - mqttHeader.setMessageType(MqttMessageType.PUBACK.value()); - mqttHeader.setDup(false); - mqttHeader.setQosLevel(MqttQoS.AT_MOST_ONCE.value()); - mqttHeader.setRetain(false); - mqttHeader.setRemainingLength(2); - mqttHeader.setPacketId(0); - } else if (fixedHeader.qosLevel().equals(MqttQoS.AT_LEAST_ONCE)) { - //PUBREC/PUBREL/PUBCOMP + return doResponse(fixedHeader); + } + + private void transferMessage(Set snodeAddresses, String topic, ByteBuf payload) throws MqttException { + SnodeConfig snodeConfig = defaultMqttMessageProcessor.getSnodeConfig(); + MqttConfig mqttConfig = defaultMqttMessageProcessor.getMqttConfig(); + String url = "tcp://" + snodeConfig.getSnodeIP1() + ":" + (mqttConfig.getListenPort() - 1); + String clientId = defaultMqttMessageProcessor.getSnodeConfig().getSnodeIP1(); + + for (String snodeAddress : snodeAddresses) { + final Map snode2MqttClient = iotClientManager.getSnode2MqttClient(); + MqttClient client; + if (snode2MqttClient.containsKey(snodeAddresses)) { + client = snode2MqttClient.get(snodeAddress); + } else { + MemoryPersistence persistence = new MemoryPersistence(); + client = new MqttClient(url, clientId, persistence); + MqttConnectOptions connOpts = new MqttConnectOptions(); + client.connect(connOpts); + snode2MqttClient.put(snodeAddress, client); } - return command; + + byte[] body = new byte[payload.readableBytes()]; + payload.readBytes(body); + org.eclipse.paho.client.mqttv3.MqttMessage message = new org.eclipse.paho.client.mqttv3.MqttMessage(body); + message.setQos(0); + client.publish(topic, message); } - return null; } - private RemotingCommand createSendMessageRequest(String rootTopic, MqttPublishVariableHeader variableHeader, + private RemotingCommand createSendMessageRequest(String rootTopic, MqttFixedHeader fixedHeader, + MqttPublishVariableHeader variableHeader, ByteBuf payload, String enodeName) { byte[] body = new byte[payload.readableBytes()]; payload.readBytes(body); @@ -176,9 +290,20 @@ public class MqttPublishMessageHandler implements MessageHandler { requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); requestHeader.setBatch(false); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, variableHeader.topicName()); + MessageAccessor.putProperty(msg, MqttConstant.PROPERTY_MQTT_QOS, fixedHeader.qosLevel().name()); requestHeader.setEnodeName(enodeName); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader); request.setBody(msg.getBody()); return request; } + + private long getMaxOffset(String enodeName, + String topic) throws InterruptedException, RemotingTimeoutException, RemotingCommandException, RemotingSendRequestException, RemotingConnectException { + GetMaxOffsetRequestHeader requestHeader = new GetMaxOffsetRequestHeader(); + requestHeader.setTopic(topic); + requestHeader.setQueueId(0); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MAX_OFFSET, requestHeader); + + return this.defaultMqttMessageProcessor.getEnodeService().getMaxOffsetInQueue(enodeName, topic, 0, request); + } } diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttSubscribeMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttSubscribeMessageHandler.java index 31cfd0c2..370d7aa7 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttSubscribeMessageHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttSubscribeMessageHandler.java @@ -118,8 +118,8 @@ public class MqttSubscribeMessageHandler implements MessageHandler { //do the logic when client sends subscribe packet. //1.update clientId2Subscription ConcurrentHashMap clientId2Subscription = iotClientManager.getClientId2Subscription(); - ConcurrentHashMap>> topic2SubscriptionTable = iotClientManager.getTopic2SubscriptionTable(); - Subscription subscription = null; + ConcurrentHashMap> topic2Clients = iotClientManager.getTopic2Clients(); + Subscription subscription; if (clientId2Subscription.containsKey(client.getClientId())) { subscription = clientId2Subscription.get(client.getClientId()); } else { @@ -133,23 +133,17 @@ public class MqttSubscribeMessageHandler implements MessageHandler { grantQoss.add(actualQos); SubscriptionData subscriptionData = new MqttSubscriptionData(mqttTopicSubscription.qualityOfService().value(), client.getClientId(), mqttTopicSubscription.topicName()); subscriptionDatas.put(mqttTopicSubscription.topicName(), subscriptionData); - //2.update topic2SubscriptionTable + //2.update topic2ClientIds String rootTopic = MqttUtil.getRootTopic(mqttTopicSubscription.topicName()); - ConcurrentHashMap> client2SubscriptionData = topic2SubscriptionTable.get(rootTopic); - if (client2SubscriptionData == null || client2SubscriptionData.size() == 0) { - client2SubscriptionData = new ConcurrentHashMap<>(); - ConcurrentHashMap> prev = topic2SubscriptionTable.putIfAbsent(rootTopic, client2SubscriptionData); + if (topic2Clients.contains(rootTopic)) { + final Set clientIds = topic2Clients.get(rootTopic); + clientIds.add(client); + } else { + Set clients = new HashSet<>(); + clients.add(client); + Set prev = topic2Clients.putIfAbsent(rootTopic, clients); if (prev != null) { - client2SubscriptionData = prev; - } - Set subscriptionDataSet = client2SubscriptionData.get(client); - if (subscriptionDataSet == null) { - subscriptionDataSet = new HashSet<>(); - Set prevSubscriptionDataSet = client2SubscriptionData.putIfAbsent(client, subscriptionDataSet); - if (prevSubscriptionDataSet != null) { - subscriptionDataSet = prevSubscriptionDataSet; - } - subscriptionDataSet.add(subscriptionData); + prev.add(client); } } } diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttUnsubscribeMessagHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttUnsubscribeMessagHandler.java index 34c7fdad..c745905d 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttUnsubscribeMessagHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttUnsubscribeMessagHandler.java @@ -25,6 +25,7 @@ import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage; import io.netty.handler.codec.mqtt.MqttUnsubscribePayload; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.common.client.Client; @@ -99,27 +100,32 @@ public class MqttUnsubscribeMessagHandler implements MessageHandler { private void doUnsubscribe(Client client, List topics, IOTClientManagerImpl iotClientManager) { ConcurrentHashMap clientId2Subscription = iotClientManager.getClientId2Subscription(); - ConcurrentHashMap>> topic2SubscriptionTable = iotClientManager.getTopic2SubscriptionTable(); + ConcurrentHashMap> topic2Clients = iotClientManager.getTopic2Clients(); + Subscription subscription = clientId2Subscription.get(client.getClientId()); - for (String topicFilter : topics) { - //1.update clientId2Subscription - if (clientId2Subscription.containsKey(client.getClientId())) { - Subscription subscription = clientId2Subscription.get(client.getClientId()); + //1.update clientId2Subscription + if (clientId2Subscription.containsKey(client.getClientId())) { + for (String topicFilter : topics) { subscription.getSubscriptionTable().remove(topicFilter); } - //2.update topic2SubscriptionTable - String rootTopic = MqttUtil.getRootTopic(topicFilter); - ConcurrentHashMap> client2SubscriptionData = topic2SubscriptionTable.get(rootTopic); - if (client2SubscriptionData != null) { - Set subscriptionDataSet = client2SubscriptionData.get(client); - if (subscriptionDataSet != null) { - Iterator iterator = subscriptionDataSet.iterator(); - while (iterator.hasNext()) { - if (iterator.next().getTopic().equals(topicFilter)) - iterator.remove(); - } + } + + for (Iterator>> iterator = topic2Clients.entrySet().iterator(); iterator.hasNext(); ) { + Map.Entry> next = iterator.next(); + String rootTopic = next.getKey(); + boolean needRemove = true; + for (Map.Entry entry : subscription.getSubscriptionTable().entrySet()) { + if (MqttUtil.getRootTopic(entry.getKey()).equals(rootTopic)) { + needRemove = false; + break; } } + if (needRemove) { + next.getValue().remove(client); + } + if (next.getValue().size() == 0) { + iterator.remove(); + } } } } diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java index 24994064..fb520e0d 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java @@ -30,7 +30,6 @@ import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttSubscribeMessage; import io.netty.handler.codec.mqtt.MqttSubscribePayload; -import java.io.UnsupportedEncodingException; import java.util.HashMap; import java.util.Map; import org.apache.rocketmq.common.MqttConfig; @@ -61,7 +60,6 @@ import org.apache.rocketmq.mqtt.service.impl.WillMessageServiceImpl; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingServer; import org.apache.rocketmq.remoting.RequestProcessor; -import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; @@ -85,7 +83,8 @@ public class DefaultMqttMessageProcessor implements RequestProcessor { private EnodeService enodeService; private NnodeService nnodeService; - public DefaultMqttMessageProcessor(MqttConfig mqttConfig, SnodeConfig snodeConfig, RemotingServer mqttRemotingServer, + public DefaultMqttMessageProcessor(MqttConfig mqttConfig, SnodeConfig snodeConfig, + RemotingServer mqttRemotingServer, EnodeService enodeService, NnodeService nnodeService) { this.mqttConfig = mqttConfig; this.snodeConfig = snodeConfig; @@ -119,7 +118,7 @@ public class DefaultMqttMessageProcessor implements RequestProcessor { @Override public RemotingCommand processRequest(RemotingChannel remotingChannel, RemotingCommand message) - throws RemotingCommandException, UnsupportedEncodingException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException { + throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException { MqttHeader mqttHeader = (MqttHeader) message.readCustomHeader(); MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.valueOf(mqttHeader.getMessageType()), mqttHeader.isDup(), MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(), @@ -132,7 +131,6 @@ public class DefaultMqttMessageProcessor implements RequestProcessor { mqttHeader.isHasPassword(), mqttHeader.isWillRetain(), mqttHeader.getWillQos(), mqttHeader.isWillFlag(), mqttHeader.isCleanSession(), mqttHeader.getKeepAliveTimeSeconds()); -// MqttConnectPayload mqttConnectPayload = (MqttConnectPayload) message.getPayload(); MqttConnectPayload mqttConnectPayload = (MqttConnectPayload) MqttEncodeDecodeUtil.decode(message.getBody(), MqttConnectPayload.class); mqttMessage = new MqttConnectMessage(fixedHeader, mqttConnectVariableHeader, mqttConnectPayload); break; diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/InnerMqttMessageProcessor.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/InnerMqttMessageProcessor.java new file mode 100644 index 00000000..0f50e8e2 --- /dev/null +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/InnerMqttMessageProcessor.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.mqtt.processor; + +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttPublishMessage; +import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; +import io.netty.handler.codec.mqtt.MqttQoS; +import org.apache.rocketmq.common.MqttConfig; +import org.apache.rocketmq.common.SnodeConfig; +import org.apache.rocketmq.common.client.ClientManager; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.exception.MQClientException; +import org.apache.rocketmq.common.service.EnodeService; +import org.apache.rocketmq.common.service.NnodeService; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttMessageForwarder; +import org.apache.rocketmq.mqtt.service.WillMessageService; +import org.apache.rocketmq.mqtt.service.impl.MqttPushServiceImpl; +import org.apache.rocketmq.remoting.RemotingChannel; +import org.apache.rocketmq.remoting.RemotingServer; +import org.apache.rocketmq.remoting.RequestProcessor; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; + +import static io.netty.handler.codec.mqtt.MqttMessageType.PUBLISH; + +public class InnerMqttMessageProcessor implements RequestProcessor { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); + + private final DefaultMqttMessageProcessor defaultMqttMessageProcessor; + private WillMessageService willMessageService; + private MqttPushServiceImpl mqttPushService; + private ClientManager iotClientManager; + private RemotingServer innerMqttRemotingServer; + private MqttConfig mqttConfig; + private SnodeConfig snodeConfig; + private EnodeService enodeService; + private NnodeService nnodeService; + private MqttMessageForwarder mqttMessageForwarder; + + public InnerMqttMessageProcessor(DefaultMqttMessageProcessor defaultMqttMessageProcessor, RemotingServer innerMqttRemotingServer) { + this.defaultMqttMessageProcessor = defaultMqttMessageProcessor; + this.willMessageService = this.defaultMqttMessageProcessor.getWillMessageService(); + this.mqttPushService = this.defaultMqttMessageProcessor.getMqttPushService(); + this.iotClientManager = this.defaultMqttMessageProcessor.getIotClientManager(); + this.innerMqttRemotingServer = innerMqttRemotingServer; + this.enodeService = this.defaultMqttMessageProcessor.getEnodeService(); + this.nnodeService = this.defaultMqttMessageProcessor.getNnodeService(); + this.mqttMessageForwarder = new MqttMessageForwarder(this); + } + + @Override + public RemotingCommand processRequest(RemotingChannel remotingChannel, RemotingCommand message) + throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException { + MqttHeader mqttHeader = (MqttHeader) message.readCustomHeader(); + if(mqttHeader.getMessageType().equals(PUBLISH)){ + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.valueOf(mqttHeader.getMessageType()), + mqttHeader.isDup(), MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(), + mqttHeader.getRemainingLength()); + MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(mqttHeader.getTopicName(), mqttHeader.getPacketId()); + MqttMessage mqttMessage = new MqttPublishMessage(fixedHeader, mqttPublishVariableHeader, Unpooled.copiedBuffer(message.getBody())); + return mqttMessageForwarder.handleMessage(mqttMessage, remotingChannel); + }else{ + return defaultMqttMessageProcessor.processRequest(remotingChannel, message); + } + + } + + @Override + public boolean rejectRequest() { + return false; + } + + public WillMessageService getWillMessageService() { + return willMessageService; + } + + public MqttPushServiceImpl getMqttPushService() { + return mqttPushService; + } + + public ClientManager getIotClientManager() { + return iotClientManager; + } + + public MqttConfig getMqttConfig() { + return mqttConfig; + } + + public void setMqttConfig(MqttConfig mqttConfig) { + this.mqttConfig = mqttConfig; + } + + public SnodeConfig getSnodeConfig() { + return snodeConfig; + } + + public void setSnodeConfig(SnodeConfig snodeConfig) { + this.snodeConfig = snodeConfig; + } + + public EnodeService getEnodeService() { + return enodeService; + } + + public void setEnodeService(EnodeService enodeService) { + this.enodeService = enodeService; + } + + public NnodeService getNnodeService() { + return nnodeService; + } + + public void setNnodeService(NnodeService nnodeService) { + this.nnodeService = nnodeService; + } + + public DefaultMqttMessageProcessor getDefaultMqttMessageProcessor() { + return defaultMqttMessageProcessor; + } +} diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/service/impl/MqttPushServiceImpl.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/service/impl/MqttPushServiceImpl.java index 9b9b17e1..4e2faacb 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/service/impl/MqttPushServiceImpl.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/service/impl/MqttPushServiceImpl.java @@ -19,11 +19,8 @@ package org.apache.rocketmq.mqtt.service.impl; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.util.ReferenceCountUtil; -import java.util.HashSet; -import java.util.Map; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -31,14 +28,11 @@ import org.apache.rocketmq.common.MqttConfig; import org.apache.rocketmq.common.client.Client; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.protocol.RequestCode; -import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; -import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl; import org.apache.rocketmq.mqtt.constant.MqttConstant; import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; -import org.apache.rocketmq.mqtt.util.MqttUtil; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl; import org.apache.rocketmq.remoting.netty.NettyChannelImpl; @@ -49,7 +43,7 @@ public class MqttPushServiceImpl { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); private ExecutorService pushMqttMessageExecutorService; - private DefaultMqttMessageProcessor defaultMqttMessageProcessor; + private static DefaultMqttMessageProcessor defaultMqttMessageProcessor; public MqttPushServiceImpl(DefaultMqttMessageProcessor defaultMqttMessageProcessor, MqttConfig mqttConfig) { this.defaultMqttMessageProcessor = defaultMqttMessageProcessor; @@ -63,21 +57,23 @@ public class MqttPushServiceImpl { false); } - public class MqttPushTask implements Runnable { + static class MqttPushTask implements Runnable { private AtomicBoolean canceled = new AtomicBoolean(false); private final ByteBuf message; private final String topic; private final Integer qos; private boolean retain; private Integer packetId; + private Client client; public MqttPushTask(final String topic, final ByteBuf message, final Integer qos, boolean retain, - Integer packetId) { + Integer packetId, Client client) { this.message = message; this.topic = topic; this.qos = qos; this.retain = retain; this.packetId = packetId; + this.client = client; } @Override @@ -86,32 +82,14 @@ public class MqttPushServiceImpl { try { RemotingCommand requestCommand = buildRequestCommand(topic, qos, retain, packetId); - //find those clients publishing the message to - IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) defaultMqttMessageProcessor.getIotClientManager(); - ConcurrentHashMap>> topic2SubscriptionTable = iotClientManager.getTopic2SubscriptionTable(); - Set clients = new HashSet<>(); - if (topic2SubscriptionTable.containsKey(MqttUtil.getRootTopic(topic))) { - ConcurrentHashMap> client2SubscriptionDatas = topic2SubscriptionTable.get(MqttUtil.getRootTopic(topic)); - for (Map.Entry> entry : client2SubscriptionDatas.entrySet()) { - Set subscriptionDatas = entry.getValue(); - for (SubscriptionData subscriptionData : subscriptionDatas) { - if (MqttUtil.isMatch(subscriptionData.getTopic(), topic)) { - clients.add(entry.getKey()); - break; - } - } - } - } - for (Client client : clients) { - RemotingChannel remotingChannel = client.getRemotingChannel(); - if (client.getRemotingChannel() instanceof NettyChannelHandlerContextImpl) { - remotingChannel = new NettyChannelImpl(((NettyChannelHandlerContextImpl) client.getRemotingChannel()).getChannelHandlerContext().channel()); - } - byte[] body = new byte[message.readableBytes()]; - message.readBytes(body); - requestCommand.setBody(body); - defaultMqttMessageProcessor.getMqttRemotingServer().push(remotingChannel, requestCommand, MqttConstant.DEFAULT_TIMEOUT_MILLS); + RemotingChannel remotingChannel = client.getRemotingChannel(); + if (client.getRemotingChannel() instanceof NettyChannelHandlerContextImpl) { + remotingChannel = new NettyChannelImpl(((NettyChannelHandlerContextImpl) client.getRemotingChannel()).getChannelHandlerContext().channel()); } + byte[] body = new byte[message.readableBytes()]; + message.readBytes(body); + requestCommand.setBody(body); + defaultMqttMessageProcessor.getMqttRemotingServer().push(remotingChannel, requestCommand, MqttConstant.DEFAULT_TIMEOUT_MILLS); } catch (Exception ex) { log.warn("Exception was thrown when pushing MQTT message to topic: {}, exception={}", topic, ex.getMessage()); } finally { @@ -147,15 +125,21 @@ public class MqttPushServiceImpl { } - public void pushMessageQos0(final String topic, final ByteBuf message) { - MqttPushTask pushTask = new MqttPushTask(topic, message, 0, false, 0); - pushMqttMessageExecutorService.submit(pushTask); + public void pushMessageQos0(final String topic, final ByteBuf message, Set clientsTobePublish) { + //For clientIds connected to the current snode + for (Client client : clientsTobePublish) { + MqttPushTask pushTask = new MqttPushTask(topic, message, 0, false, 0, client); + pushMqttMessageExecutorService.submit(pushTask); + } + } - public void pushMessageQos1(final String topic, final ByteBuf message, final Integer qos, boolean retain, - Integer packetId) { - MqttPushTask pushTask = new MqttPushTask(topic, message, qos, retain, packetId); - pushMqttMessageExecutorService.submit(pushTask); + public void pushMessageQos1(final String topic, final ByteBuf message, boolean retain, Integer packetId, + Set clientsTobePublish) { + for (Client client : clientsTobePublish) { + MqttPushTask pushTask = new MqttPushTask(topic, message, 1, retain, packetId, client); + pushMqttMessageExecutorService.submit(pushTask); + } } public void shutdown() { diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java index 03dac9e0..2d0ae0c6 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java @@ -30,10 +30,17 @@ import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.client.ClientManager; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.service.ClientService; +import org.apache.rocketmq.common.service.EnodeService; +import org.apache.rocketmq.common.service.MetricsService; +import org.apache.rocketmq.common.service.NnodeService; +import org.apache.rocketmq.common.service.PushService; +import org.apache.rocketmq.common.service.ScheduledService; import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; +import org.apache.rocketmq.mqtt.processor.InnerMqttMessageProcessor; import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.RemotingClient; import org.apache.rocketmq.remoting.RemotingClientFactory; @@ -62,12 +69,6 @@ import org.apache.rocketmq.snode.processor.ConsumerManageProcessor; import org.apache.rocketmq.snode.processor.HeartbeatProcessor; import org.apache.rocketmq.snode.processor.PullMessageProcessor; import org.apache.rocketmq.snode.processor.SendMessageProcessor; -import org.apache.rocketmq.common.service.ClientService; -import org.apache.rocketmq.common.service.EnodeService; -import org.apache.rocketmq.common.service.MetricsService; -import org.apache.rocketmq.common.service.NnodeService; -import org.apache.rocketmq.common.service.PushService; -import org.apache.rocketmq.common.service.ScheduledService; import org.apache.rocketmq.snode.service.impl.ClientServiceImpl; import org.apache.rocketmq.snode.service.impl.LocalEnodeServiceImpl; import org.apache.rocketmq.snode.service.impl.MetricsServiceImpl; @@ -91,6 +92,7 @@ public class SnodeController { private RemotingServer snodeServer; private RemotingClient mqttRemotingClient; private RemotingServer mqttRemotingServer; + private RemotingServer innerMqttRemotingServer; private ExecutorService sendMessageExecutor; private ExecutorService handleMqttMessageExecutor; private ExecutorService heartbeatExecutor; @@ -101,7 +103,6 @@ public class SnodeController { private ScheduledService scheduledService; private ClientManager producerManager; private ClientManager consumerManager; -// private ClientManager iotClientManager; private SubscriptionManager subscriptionManager; private ClientHousekeepingService clientHousekeepingService; private SubscriptionGroupManager subscriptionGroupManager; @@ -111,6 +112,7 @@ public class SnodeController { private PullMessageProcessor pullMessageProcessor; private HeartbeatProcessor heartbeatProcessor; private DefaultMqttMessageProcessor defaultMqttMessageProcessor; + private InnerMqttMessageProcessor innerMqttMessageProcessor; private InterceptorGroup remotingServerInterceptorGroup; private InterceptorGroup consumeMessageInterceptorGroup; private InterceptorGroup sendMessageInterceptorGroup; @@ -118,14 +120,12 @@ public class SnodeController { private ClientService clientService; private SlowConsumerService slowConsumerService; private MetricsService metricsService; -// private WillMessageService willMessageService; -// private MqttPushServiceImpl mqttPushService; private final ScheduledExecutorService scheduledExecutorService = Executors .newSingleThreadScheduledExecutor(new ThreadFactoryImpl( "SnodeControllerScheduledThread")); - public SnodeController(SnodeConfig snodeConfig, MqttConfig mqttConfig) { + public SnodeController(SnodeConfig snodeConfig, MqttConfig mqttConfig) throws CloneNotSupportedException { this.nettyClientConfig = snodeConfig.getNettyClientConfig(); this.nettyServerConfig = snodeConfig.getNettyServerConfig(); this.mqttServerConfig = mqttConfig.getMqttServerConfig(); @@ -155,6 +155,14 @@ public class SnodeController { this.mqttRemotingServer.init(this.mqttServerConfig, this.clientHousekeepingService); this.mqttRemotingServer.registerInterceptorGroup(this.remotingServerInterceptorGroup); } + this.innerMqttRemotingServer = RemotingServerFactory.getInstance().createRemotingServer( + RemotingUtil.MQTT_PROTOCOL); + ServerConfig innerMqttServerConfig = (ServerConfig)mqttServerConfig.clone(); + innerMqttServerConfig.setListenPort(mqttServerConfig.getListenPort() - 1); + if (this.innerMqttRemotingServer != null) { + this.innerMqttRemotingServer.init(innerMqttServerConfig, this.clientHousekeepingService); + this.innerMqttRemotingServer.registerInterceptorGroup(this.remotingServerInterceptorGroup); + } this.sendMessageExecutor = ThreadUtils.newThreadPoolExecutor( snodeConfig.getSnodeSendMessageMinPoolSize(), snodeConfig.getSnodeSendMessageMaxPoolSize(), @@ -212,7 +220,8 @@ public class SnodeController { this.sendMessageProcessor = new SendMessageProcessor(this); this.heartbeatProcessor = new HeartbeatProcessor(this); this.pullMessageProcessor = new PullMessageProcessor(this); - this.defaultMqttMessageProcessor = new DefaultMqttMessageProcessor(this.mqttConfig, mqttRemotingServer, enodeService, nnodeService); + this.defaultMqttMessageProcessor = new DefaultMqttMessageProcessor(this.mqttConfig, this.snodeConfig, mqttRemotingServer, enodeService, nnodeService); + this.innerMqttMessageProcessor = new InnerMqttMessageProcessor(this.defaultMqttMessageProcessor, innerMqttRemotingServer); this.pushService = new PushServiceImpl(this); this.clientService = new ClientServiceImpl(this); this.subscriptionManager = new SubscriptionManagerImpl(); @@ -352,6 +361,9 @@ public class SnodeController { if (mqttRemotingServer != null) { this.mqttRemotingServer.registerProcessor(RequestCode.MQTT_MESSAGE, defaultMqttMessageProcessor, handleMqttMessageExecutor); } + if (innerMqttRemotingServer != null) { + this.innerMqttRemotingServer.registerProcessor(RequestCode.MQTT_MESSAGE, innerMqttMessageProcessor, handleMqttMessageExecutor); + } } public void start() { -- GitLab