diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServer.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServer.java index 1d1605eb63a649ecc7ebfb8a0d0ab2a2dc21eab8..a69853f1952ea694a7b8805a8c1a9048b4066352 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServer.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServer.java @@ -153,16 +153,13 @@ public final class MqttServer { logger.warn("Mqtt Topic:{} publish to clientId:{} ChannelContext is null may be disconnected.", topic, clientId); return false; } - List subscribeList = sessionManager.searchSubscribe(topic, clientId); - if (subscribeList.isEmpty()) { - logger.warn("Mqtt Topic:{} publish but clientId:{} subscribeList is empty.", topic, clientId); + Integer subMqttQoS = sessionManager.searchSubscribe(topic, clientId); + if (subMqttQoS == null) { + logger.warn("Mqtt Topic:{} publish but clientId:{} not subscribed.", topic, clientId); return false; } - for (Subscribe subscribe : subscribeList) { - int subMqttQoS = subscribe.getMqttQoS(); - MqttQoS mqttQoS = qos.value() > subMqttQoS ? MqttQoS.valueOf(subMqttQoS) : qos; - publish(context, clientId, topic, payload, mqttQoS, retain); - } + MqttQoS mqttQoS = qos.value() > subMqttQoS ? MqttQoS.valueOf(subMqttQoS) : qos; + publish(context, clientId, topic, payload, mqttQoS, retain); return true; } diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/model/Subscribe.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/model/Subscribe.java index cd88679ea9fb9878f99ccf770802ad2ceb9df03f..df45459e3549dad46661fe400b5ea8dcc15c50a1 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/model/Subscribe.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/model/Subscribe.java @@ -32,13 +32,8 @@ public class Subscribe implements Serializable { public Subscribe() { } - public Subscribe(String topicFilter, String clientId) { - this.topicFilter = topicFilter; + public Subscribe(String clientId, int mqttQoS) { this.clientId = clientId; - } - - public Subscribe(String topicFilter, int mqttQoS) { - this.topicFilter = topicFilter; this.mqttQoS = mqttQoS; } diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/session/IMqttSessionManager.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/session/IMqttSessionManager.java index 3fe6fe71cf67708ccba7a0ed36e43022ad6e8f8d..cbfd08389f487948d5304a8d256558c62a245288 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/session/IMqttSessionManager.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/session/IMqttSessionManager.java @@ -48,13 +48,13 @@ public interface IMqttSessionManager { void removeSubscribe(String topicFilter, String clientId); /** - * 查找订阅信息 + * 查找订阅 qos 信息 * * @param topicName topicName * @param clientId 客户端 Id * @return 订阅存储列表 */ - List searchSubscribe(String topicName, String clientId); + Integer searchSubscribe(String topicName, String clientId); /** * 查找订阅信息 diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/session/InMemoryMqttSessionManager.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/session/InMemoryMqttSessionManager.java index 75a44c527f86bf7295010e2647e52207ecaee53d..bc75f8376ff939b407ef8959bd61ad8440bbd723 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/session/InMemoryMqttSessionManager.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/session/InMemoryMqttSessionManager.java @@ -22,10 +22,7 @@ import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish; import net.dreamlu.iot.mqtt.core.server.model.Subscribe; import net.dreamlu.iot.mqtt.core.util.MqttTopicUtil; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; @@ -77,8 +74,8 @@ public class InMemoryMqttSessionManager implements IMqttSessionManager { } @Override - public List searchSubscribe(String topicName, String clientId) { - List list = new ArrayList<>(); + public Integer searchSubscribe(String topicName, String clientId) { + Integer qosValue = null; Set topicFilterSet = subscribeStore.keySet(); for (String topicFilter : topicFilterSet) { if (MqttTopicUtil.getTopicPattern(topicFilter).matcher(topicName).matches()) { @@ -86,29 +83,39 @@ public class InMemoryMqttSessionManager implements IMqttSessionManager { if (data != null && !data.isEmpty()) { Integer mqttQoS = data.get(clientId); if (mqttQoS != null) { - list.add(new Subscribe(topicFilter, mqttQoS)); + if (qosValue == null) { + qosValue = mqttQoS; + } else { + qosValue = Math.min(qosValue, mqttQoS); + } } } } } - return list; + return qosValue; } @Override public List searchSubscribe(String topicName) { - List list = new ArrayList<>(); + // 排除重复订阅,例如: /test/# 和 /# 只发一份 + Map subscribeMap = new HashMap<>(32); Set topicFilterSet = subscribeStore.keySet(); for (String topicFilter : topicFilterSet) { if (MqttTopicUtil.getTopicPattern(topicFilter).matcher(topicName).matches()) { ConcurrentMap data = subscribeStore.get(topicFilter); if (data != null && !data.isEmpty()) { data.forEach((clientId, qos) -> { - list.add(new Subscribe(topicFilter, clientId, qos)); + subscribeMap.merge(clientId, qos, Math::min); }); } } } - return list; + List subscribeList = new ArrayList<>(); + subscribeMap.forEach((clientId, qos) -> { + subscribeList.add(new Subscribe(clientId, qos)); + }); + subscribeMap.clear(); + return subscribeList; } @Override