From f8da6ffa63e763db34129782d27b4b205ab08149 Mon Sep 17 00:00:00 2001 From: chengxiangwang Date: Tue, 14 May 2019 10:32:18 +0800 Subject: [PATCH] optimize Client of IOT; add in-flight window --- .../apache/rocketmq/common/client/Client.java | 38 +---- .../mqtt/client/IOTClientManagerImpl.java | 43 ++--- .../rocketmq/mqtt/client/InFlightMessage.java | 32 ++++ .../rocketmq/mqtt/client/MQTTSession.java | 159 ++++++++++++++++++ .../mqtt/mqtthandler/MessageHandler.java | 15 +- .../impl/MqttConnectMessageHandler.java | 6 +- .../impl/MqttDisconnectMessageHandler.java | 4 +- .../impl/MqttPingreqMessageHandler.java | 7 +- .../impl/MqttSubscribeMessageHandler.java | 7 +- .../impl/MqttUnsubscribeMessagHandler.java | 5 +- .../mqtt/service/MqttPushService.java | 25 +++ .../service/impl/MqttPushServiceImpl.java | 73 +++----- 12 files changed, 284 insertions(+), 130 deletions(-) create mode 100644 mqtt/src/main/java/org/apache/rocketmq/mqtt/client/InFlightMessage.java create mode 100644 mqtt/src/main/java/org/apache/rocketmq/mqtt/client/MQTTSession.java create mode 100644 mqtt/src/main/java/org/apache/rocketmq/mqtt/service/MqttPushService.java diff --git a/common/src/main/java/org/apache/rocketmq/common/client/Client.java b/common/src/main/java/org/apache/rocketmq/common/client/Client.java index 1719b6bc..0fe42760 100644 --- a/common/src/main/java/org/apache/rocketmq/common/client/Client.java +++ b/common/src/main/java/org/apache/rocketmq/common/client/Client.java @@ -39,13 +39,9 @@ public class Client { private LanguageCode language; - private boolean isConnected; - - private boolean cleanSession; + private String snodeAddress; - private boolean willFlag; - private String snodeAddress; public ClientRole getClientRole() { return clientRole; @@ -70,16 +66,13 @@ public class Client { Objects.equals(groups, client.groups) && Objects.equals(remotingChannel, client.remotingChannel) && language == client.language && - isConnected == client.isConnected && - cleanSession == client.cleanSession && - willFlag == client.willFlag && snodeAddress == client.snodeAddress; } @Override public int hashCode() { return Objects.hash(clientRole, clientId, groups, remotingChannel, heartbeatInterval, - lastUpdateTimestamp, version, language, isConnected, cleanSession, willFlag, snodeAddress); + lastUpdateTimestamp, version, language, snodeAddress); } public RemotingChannel getRemotingChannel() { @@ -130,30 +123,6 @@ public class Client { this.language = language; } - public boolean isConnected() { - return isConnected; - } - - public void setConnected(boolean connected) { - isConnected = connected; - } - - public boolean isCleanSession() { - return cleanSession; - } - - public void setCleanSession(boolean cleanSession) { - this.cleanSession = cleanSession; - } - - public boolean isWillFlag() { - return willFlag; - } - - public void setWillFlag(boolean willFlag) { - this.willFlag = willFlag; - } - public String getSnodeAddress() { return snodeAddress; } @@ -181,9 +150,6 @@ public class Client { ", lastUpdateTimestamp=" + lastUpdateTimestamp + ", version=" + version + ", language=" + language + - ", isConnected=" + isConnected + - ", cleanSession=" + cleanSession + - ", willFlag=" + willFlag + ", snodeAddress=" + snodeAddress + '}'; } diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/IOTClientManagerImpl.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/IOTClientManagerImpl.java index fd0507bb..3465f386 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/IOTClientManagerImpl.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/IOTClientManagerImpl.java @@ -17,8 +17,8 @@ package org.apache.rocketmq.mqtt.client; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -37,8 +37,6 @@ public class IOTClientManagerImpl extends ClientManagerImpl { public static final String IOT_GROUP = "IOT_GROUP"; - // private final ConcurrentHashMap>> topic2SubscriptionTable = new ConcurrentHashMap<>( -// 1024); private final ConcurrentHashMap> topic2Clients = new ConcurrentHashMap<>( 1024); private final ConcurrentHashMap clientId2Subscription = new ConcurrentHashMap<>(1024); @@ -47,10 +45,6 @@ public class IOTClientManagerImpl extends ClientManagerImpl { public IOTClientManagerImpl() { } - public void onUnsubscribe(Client client, List topics) { - //do the logic when client sends unsubscribe packet. - } - @Override public void onClose(Set groups, RemotingChannel remotingChannel) { for (String groupId : groups) { //remove client after invoking onClosed method(client may be used in onClosed) @@ -63,9 +57,12 @@ public class IOTClientManagerImpl extends ClientManagerImpl { public void onClosed(String group, RemotingChannel remotingChannel) { //do the logic when connection is closed by any reason. //step1. Clean subscription data if cleanSession=1 - Client client = this.getClient(IOT_GROUP, remotingChannel); + MQTTSession client = (MQTTSession) this.getClient(IOT_GROUP, remotingChannel); if (client.isCleanSession()) { cleanSessionState(client.getClientId()); + } else { + client.setConnected(false); + //TODO update persistent store } //step2. Publish will message associated with current connection(Question: Does will message need to be deleted after publishing.) @@ -82,42 +79,30 @@ public class IOTClientManagerImpl extends ClientManagerImpl { } public void cleanSessionState(String clientId) { -/* clientId2Subscription.remove(clientId); - for (Iterator>>> iterator = topic2SubscriptionTable.entrySet().iterator(); iterator.hasNext(); ) { - Map.Entry>> next = iterator.next(); - for (Iterator>> iterator1 = next.getValue().entrySet().iterator(); iterator1.hasNext(); ) { - Map.Entry> next1 = iterator1.next(); - if (!next1.getKey().getClientId().equals(clientId)) { - continue; - } - iterator1.remove(); - } - if (next.getValue() == null || next.getValue().size() == 0) { - iterator.remove(); - } - }*/ + Map> toBeRemoveFromPersistentStore = new HashMap<>(); for (Iterator>> iterator = topic2Clients.entrySet().iterator(); iterator.hasNext(); ) { Map.Entry> next = iterator.next(); Iterator iterator1 = next.getValue().iterator(); while (iterator1.hasNext()) { - if (iterator1.next().getClientId().equals(clientId)) { + Client client = iterator1.next(); + if (client.getClientId().equals(clientId)) { iterator1.remove(); + Set clients = toBeRemoveFromPersistentStore.getOrDefault((next.getKey()), new HashSet<>()); + clients.add(client); + toBeRemoveFromPersistentStore.put(next.getKey(), clients); } } } + //TODO update persistent store base on toBeRemoveFromPersistentStore clientId2Subscription.remove(clientId); - - //remove offline messages + //TODO update persistent store + //TODO remove offline messages } public Subscription getSubscriptionByClientId(String clientId) { return clientId2Subscription.get(clientId); } - /* public ConcurrentHashMap>> getTopic2SubscriptionTable() { - return topic2SubscriptionTable; - }*/ - public ConcurrentHashMap> getTopic2Clients() { return topic2Clients; } diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/InFlightMessage.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/InFlightMessage.java new file mode 100644 index 00000000..441de315 --- /dev/null +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/InFlightMessage.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.mqtt.client; + +import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.mqtt.MqttQoS; + +public class InFlightMessage { + final String topic; + final MqttQoS publishingQos; + final ByteBuf payload; + + InFlightMessage(String topic, MqttQoS publishingQos, ByteBuf payload) { + this.topic = topic; + this.publishingQos = publishingQos; + this.payload = payload; + } +} diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/MQTTSession.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/MQTTSession.java new file mode 100644 index 00000000..935966e1 --- /dev/null +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/MQTTSession.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.mqtt.client; + +import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.mqtt.MqttPublishMessage; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.rocketmq.common.client.Client; +import org.apache.rocketmq.common.client.Subscription; +import org.apache.rocketmq.mqtt.exception.MqttRuntimeException; +import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; +import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; + +public class MQTTSession extends Client { + private boolean cleanSession; + private boolean isConnected; + private boolean willFlag; + private final AtomicInteger inflightSlots = new AtomicInteger(10); + private final Map inflightWindow = new HashMap<>(); + private final DelayQueue inflightTimeouts = new DelayQueue<>(); + private final AtomicInteger lastPacketId = new AtomicInteger(0); + private Hashtable inUsePacketIds = new Hashtable(); + private int nextPacketId = 0; + + static class InFlightPacket implements Delayed { + + final int packetId; + private long startTime; + + InFlightPacket(int packetId, long delayInMilliseconds) { + this.packetId = packetId; + this.startTime = System.currentTimeMillis() + delayInMilliseconds; + } + + @Override + public long getDelay(TimeUnit unit) { + long diff = startTime - System.currentTimeMillis(); + return unit.convert(diff, TimeUnit.MILLISECONDS); + } + + @Override + public int compareTo(Delayed o) { + if ((this.startTime - ((InFlightPacket) o).startTime) == 0) { + return 0; + } + if ((this.startTime - ((InFlightPacket) o).startTime) > 0) { + return 1; + } else { + return -1; + } + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof Client)) { + return false; + } + Client client = (Client) o; + return Objects.equals(this.getClientId(), client.getClientId()); + } + + public boolean isConnected() { + return isConnected; + } + + public void setConnected(boolean connected) { + isConnected = connected; + } + + public boolean isCleanSession() { + return cleanSession; + } + + public void setCleanSession(boolean cleanSession) { + this.cleanSession = cleanSession; + } + + public boolean isWillFlag() { + return willFlag; + } + + public void setWillFlag(boolean willFlag) { + this.willFlag = willFlag; + } + + public void pushMessageAtQos(MqttHeader mqttHeader, ByteBuf payload, + DefaultMqttMessageProcessor defaultMqttMessageProcessor) { + + if (mqttHeader.getQosLevel() > 0) { +// IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) defaultMqttMessageProcessor.getIotClientManager(); +// ConcurrentHashMap clientId2Subscription = iotClientManager.getClientId2Subscription(); +// Subscription subscription = clientId2Subscription.get(this.getClientId()); +// Enumeration topicFilters = subscription.getSubscriptionTable().keys(); +// while (topicFilters.hasMoreElements()) { +// String topicFilter = topicFilters.nextElement(); +// } + + inflightSlots.decrementAndGet(); + mqttHeader.setPacketId(getNextPacketId()); + inflightWindow.put(mqttHeader.getPacketId(), new InFlightMessage(mqttHeader.getTopicName(), )); + } + defaultMqttMessageProcessor.getMqttPushService().pushMessageQos(mqttHeader, payload, this); + } + + private synchronized void releasePacketId(int msgId) { + this.inUsePacketIds.remove(new Integer(msgId)); + } + + private synchronized int getNextPacketId() { + int startingMessageId = this.nextPacketId; + int loopCount = 0; + + do { + ++this.nextPacketId; + if (this.nextPacketId > 65535) { + this.nextPacketId = 1; + } + + if (this.nextPacketId == startingMessageId) { + ++loopCount; + if (loopCount == 2) { + throw new MqttRuntimeException("Could not get available packetId."); + } + } + } + while (this.inUsePacketIds.containsKey(new Integer(this.nextPacketId))); + + Integer id = new Integer(this.nextPacketId); + this.inUsePacketIds.put(id, id); + return this.nextPacketId; + } +} diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/MessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/MessageHandler.java index 6d737802..679a2942 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/MessageHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/MessageHandler.java @@ -29,6 +29,7 @@ import org.apache.rocketmq.common.client.Client; import org.apache.rocketmq.common.client.Subscription; import org.apache.rocketmq.common.exception.MQClientException; import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl; +import org.apache.rocketmq.mqtt.client.MQTTSession; import org.apache.rocketmq.mqtt.util.MqttUtil; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.exception.RemotingConnectException; @@ -55,12 +56,14 @@ public interface MessageHandler { if (topic2Clients.containsKey(MqttUtil.getRootTopic(topic))) { Set clients = topic2Clients.get(MqttUtil.getRootTopic(topic)); for (Client client : clients) { - Subscription subscription = clientId2Subscription.get(client.getClientId()); - Enumeration keys = subscription.getSubscriptionTable().keys(); - while (keys.hasMoreElements()) { - String topicFilter = keys.nextElement(); - if (MqttUtil.isMatch(topicFilter, topic)) { - clientsTobePush.add(client); + if(((MQTTSession)client).isConnected()) { + Subscription subscription = clientId2Subscription.get(client.getClientId()); + Enumeration keys = subscription.getSubscriptionTable().keys(); + while (keys.hasMoreElements()) { + String topicFilter = keys.nextElement(); + if (MqttUtil.isMatch(topicFilter, topic)) { + clientsTobePush.add(client); + } } } } diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttConnectMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttConnectMessageHandler.java index ecbb1247..36d60c07 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttConnectMessageHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttConnectMessageHandler.java @@ -25,7 +25,6 @@ import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttQoS; import java.util.HashSet; import org.apache.commons.lang3.StringUtils; -import org.apache.rocketmq.common.client.Client; import org.apache.rocketmq.common.client.ClientManager; import org.apache.rocketmq.common.client.ClientRole; import org.apache.rocketmq.common.client.Subscription; @@ -35,6 +34,7 @@ import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl; +import org.apache.rocketmq.mqtt.client.MQTTSession; import org.apache.rocketmq.mqtt.exception.MqttConnectException; import org.apache.rocketmq.mqtt.exception.WrongMessageTypeException; import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler; @@ -117,7 +117,7 @@ public class MqttConnectMessageHandler implements MessageHandler { } } - Client client = new Client(); + MQTTSession client = new MQTTSession(); client.setClientId(payload.clientIdentifier()); client.setClientRole(ClientRole.IOTCLIENT); client.setGroups(new HashSet() { @@ -169,7 +169,7 @@ public class MqttConnectMessageHandler implements MessageHandler { private boolean isConnected(RemotingChannel remotingChannel, String clientId) { ClientManager iotClientManager = defaultMqttMessageProcessor.getIotClientManager(); - Client client = iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel); + MQTTSession client = (MQTTSession) iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel); if (client != null && client.getClientId().equals(clientId) && client.isConnected()) { return true; } diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttDisconnectMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttDisconnectMessageHandler.java index 5f21a4b2..fc0ecd13 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttDisconnectMessageHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttDisconnectMessageHandler.java @@ -20,11 +20,11 @@ package org.apache.rocketmq.mqtt.mqtthandler.impl; import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttQoS; -import org.apache.rocketmq.common.client.Client; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl; +import org.apache.rocketmq.mqtt.client.MQTTSession; import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler; import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.remoting.RemotingChannel; @@ -56,7 +56,7 @@ public class MqttDisconnectMessageHandler implements MessageHandler { } //discard will message associated with the current connection(client) - Client client = defaultMqttMessageProcessor.getIotClientManager() + MQTTSession client = (MQTTSession)defaultMqttMessageProcessor.getIotClientManager() .getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel); if (client != null) { defaultMqttMessageProcessor.getWillMessageService().deleteWillMessage(client.getClientId()); diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPingreqMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPingreqMessageHandler.java index dd1972d5..1cd04333 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPingreqMessageHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPingreqMessageHandler.java @@ -17,17 +17,14 @@ package org.apache.rocketmq.mqtt.mqtthandler.impl; -import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessageType; -import io.netty.handler.codec.mqtt.MqttPubAckMessage; -import org.apache.rocketmq.common.client.Client; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl; -import org.apache.rocketmq.mqtt.exception.WrongMessageTypeException; +import org.apache.rocketmq.mqtt.client.MQTTSession; import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler; import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.remoting.RemotingChannel; @@ -52,7 +49,7 @@ public class MqttPingreqMessageHandler implements MessageHandler { @Override public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) { IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) defaultMqttMessageProcessor.getIotClientManager(); - Client client = iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel); + MQTTSession client = (MQTTSession)iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel); log.debug("Handle MQTT client: {} Pingreq.", client.getClientId()); RemotingCommand response = RemotingCommand.createResponseCommand(MqttHeader.class); if (client != null && client.isConnected()) { diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttSubscribeMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttSubscribeMessageHandler.java index 370d7aa7..44f2c579 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttSubscribeMessageHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttSubscribeMessageHandler.java @@ -38,6 +38,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl; +import org.apache.rocketmq.mqtt.client.MQTTSession; import org.apache.rocketmq.mqtt.constant.MqttConstant; import org.apache.rocketmq.mqtt.exception.WrongMessageTypeException; import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler; @@ -74,7 +75,7 @@ public class MqttSubscribeMessageHandler implements MessageHandler { MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) message; MqttSubscribePayload payload = mqttSubscribeMessage.payload(); IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) defaultMqttMessageProcessor.getIotClientManager(); - Client client = iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel); + MQTTSession client = (MQTTSession)iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel); if (client == null) { log.error("Can't find associated client, the connection will be closed. remotingChannel={}, MqttMessage={}", remotingChannel.toString(), message.toString()); remotingChannel.close(); @@ -98,6 +99,7 @@ public class MqttSubscribeMessageHandler implements MessageHandler { RemotingCommand command = RemotingCommand.createResponseCommand(MqttHeader.class); MqttHeader mqttHeader = (MqttHeader) command.readCustomHeader(); mqttHeader.setMessageType(MqttMessageType.SUBACK.value()); + // dup/qos/retain value are always as below of SUBACK mqttHeader.setDup(false); mqttHeader.setQosLevel(MqttQoS.AT_MOST_ONCE.value()); mqttHeader.setRetain(false); @@ -124,7 +126,7 @@ public class MqttSubscribeMessageHandler implements MessageHandler { subscription = clientId2Subscription.get(client.getClientId()); } else { subscription = new Subscription(); - subscription.setCleanSession(client.isCleanSession()); + subscription.setCleanSession(((MQTTSession)client).isCleanSession()); } ConcurrentHashMap subscriptionDatas = subscription.getSubscriptionTable(); List grantQoss = new ArrayList<>(); @@ -147,6 +149,7 @@ public class MqttSubscribeMessageHandler implements MessageHandler { } } } + //TODO update persistent store of topic2Clients and clientId2Subscription return grantQoss; } diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttUnsubscribeMessagHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttUnsubscribeMessagHandler.java index c745905d..6913189d 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttUnsubscribeMessagHandler.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttUnsubscribeMessagHandler.java @@ -84,7 +84,8 @@ public class MqttUnsubscribeMessagHandler implements MessageHandler { RemotingCommand command = RemotingCommand.createResponseCommand(MqttHeader.class); MqttHeader mqttHeader = (MqttHeader) command.readCustomHeader(); - mqttHeader.setMessageType(MqttMessageType.SUBACK.value()); + // dup/qos/retain value are always as below of UNSUBACK + mqttHeader.setMessageType(MqttMessageType.UNSUBACK.value()); mqttHeader.setDup(false); mqttHeader.setQosLevel(MqttQoS.AT_MOST_ONCE.value()); mqttHeader.setRetain(false); @@ -127,5 +128,7 @@ public class MqttUnsubscribeMessagHandler implements MessageHandler { iterator.remove(); } } + + //TODO update persistent store } } diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/service/MqttPushService.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/service/MqttPushService.java new file mode 100644 index 00000000..bbc706ef --- /dev/null +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/service/MqttPushService.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.mqtt.service; + +import io.netty.buffer.ByteBuf; +import org.apache.rocketmq.common.client.Client; +import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; + +public interface MqttPushService { + void pushMessageQos(MqttHeader mqttHeader, final ByteBuf message, Client client); +} diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/service/impl/MqttPushServiceImpl.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/service/impl/MqttPushServiceImpl.java index 4e2faacb..a87a6b6c 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/service/impl/MqttPushServiceImpl.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/service/impl/MqttPushServiceImpl.java @@ -17,9 +17,7 @@ package org.apache.rocketmq.mqtt.service.impl; import io.netty.buffer.ByteBuf; -import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.util.ReferenceCountUtil; -import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -33,13 +31,14 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.mqtt.constant.MqttConstant; import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; +import org.apache.rocketmq.mqtt.service.MqttPushService; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl; import org.apache.rocketmq.remoting.netty.NettyChannelImpl; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; -public class MqttPushServiceImpl { +public class MqttPushServiceImpl implements MqttPushService { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); private ExecutorService pushMqttMessageExecutorService; @@ -57,22 +56,23 @@ public class MqttPushServiceImpl { false); } - static class MqttPushTask implements Runnable { + public static class MqttPushTask implements Runnable { private AtomicBoolean canceled = new AtomicBoolean(false); private final ByteBuf message; - private final String topic; - private final Integer qos; - private boolean retain; - private Integer packetId; + private final MqttHeader mqttHeader; private Client client; +// private final String topic; +// private final Integer qos; +// private boolean retain; +// private Integer packetId; - public MqttPushTask(final String topic, final ByteBuf message, final Integer qos, boolean retain, - Integer packetId, Client client) { + public MqttPushTask(final MqttHeader mqttHeader, final ByteBuf message, Client client) { this.message = message; - this.topic = topic; - this.qos = qos; - this.retain = retain; - this.packetId = packetId; + this.mqttHeader = mqttHeader; +// this.topic = topic; +// this.qos = qos; +// this.retain = retain; +// this.packetId = packetId; this.client = client; } @@ -80,7 +80,7 @@ public class MqttPushServiceImpl { public void run() { if (!canceled.get()) { try { - RemotingCommand requestCommand = buildRequestCommand(topic, qos, retain, packetId); + RemotingCommand requestCommand = buildRequestCommand(this.mqttHeader); RemotingChannel remotingChannel = client.getRemotingChannel(); if (client.getRemotingChannel() instanceof NettyChannelHandlerContextImpl) { @@ -91,29 +91,22 @@ public class MqttPushServiceImpl { requestCommand.setBody(body); defaultMqttMessageProcessor.getMqttRemotingServer().push(remotingChannel, requestCommand, MqttConstant.DEFAULT_TIMEOUT_MILLS); } catch (Exception ex) { - log.warn("Exception was thrown when pushing MQTT message to topic: {}, exception={}", topic, ex.getMessage()); + log.warn("Exception was thrown when pushing MQTT message to topic: {}, clientId:{}, exception={}", mqttHeader.getTopicName(), client.getClientId(), ex.getMessage()); } finally { ReferenceCountUtil.release(message); } } else { - log.info("Push message to topic: {} canceled!", topic); + log.info("Push message to topic: {}, clientId:{}, canceled!", mqttHeader.getTopicName(), client.getClientId()); } } - private RemotingCommand buildRequestCommand(final String topic, final Integer qos, boolean retain, - Integer packetId) { - MqttHeader mqttHeader = new MqttHeader(); - mqttHeader.setMessageType(MqttMessageType.PUBLISH.value()); - if (qos == 0) { - mqttHeader.setDup(false);//DUP is always 0 for qos=0 messages - } else { - mqttHeader.setDup(false);//DUP is depending on whether it is a re-delivery of an earlier attempt. - } - mqttHeader.setQosLevel(qos); - mqttHeader.setRetain(retain); - mqttHeader.setPacketId(packetId); - mqttHeader.setTopicName(topic); - mqttHeader.setRemainingLength(4 + topic.getBytes().length + message.readableBytes()); + private RemotingCommand buildRequestCommand(MqttHeader mqttHeader) { +// if (qos == 0) { +// mqttHeader.setDup(false);//DUP is always 0 for qos=0 messages +// } else { +// mqttHeader.setDup(false);//DUP is depending on whether it is a re-delivery of an earlier attempt. +// } +// mqttHeader.setRemainingLength(4 + topic.getBytes().length + message.readableBytes()); RemotingCommand pushMessage = RemotingCommand.createRequestCommand(RequestCode.MQTT_MESSAGE, mqttHeader); return pushMessage; @@ -125,21 +118,9 @@ public class MqttPushServiceImpl { } - public void pushMessageQos0(final String topic, final ByteBuf message, Set clientsTobePublish) { - //For clientIds connected to the current snode - for (Client client : clientsTobePublish) { - MqttPushTask pushTask = new MqttPushTask(topic, message, 0, false, 0, client); - pushMqttMessageExecutorService.submit(pushTask); - } - - } - - public void pushMessageQos1(final String topic, final ByteBuf message, boolean retain, Integer packetId, - Set clientsTobePublish) { - for (Client client : clientsTobePublish) { - MqttPushTask pushTask = new MqttPushTask(topic, message, 1, retain, packetId, client); - pushMqttMessageExecutorService.submit(pushTask); - } + public void pushMessageQos(MqttHeader mqttHeader, final ByteBuf message, Client client) { + MqttPushTask pushTask = new MqttPushTask(mqttHeader, message, client); + pushMqttMessageExecutorService.submit(pushTask); } public void shutdown() { -- GitLab