diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientSession.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientSession.java index cd19d749367804934cd4ee6507e7410567dd3272..bb5147bc40f2fa941101228eaa66c418e5f0bb8f 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientSession.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientSession.java @@ -64,7 +64,24 @@ public final class DefaultMqttClientSession implements IMqttClientSession { if (subscriptionSet == null || subscriptionSet.isEmpty()) { return false; } - return subscriptionSet.contains(new MqttClientSubscription(mqttQoS, topicFilter, listener)); + MqttClientSubscription clientSubscription = new MqttClientSubscription(mqttQoS, topicFilter, listener); + for (MqttClientSubscription subscription : subscriptionSet) { + // 1. 已经存在订阅 + if (clientSubscription.equals(subscription)) { + return true; + } + MqttQoS subQos = subscription.getMqttQoS(); + IMqttClientMessageListener subListener = subscription.getListener(); + // 2. 如果已经存在更高或同级别 qos + if (subQos.value() >= mqttQoS.value()) { + // 3. 监听器不相同则直接添加 + if (subListener != listener) { + subscriptions.add(topicFilter, clientSubscription); + } + return true; + } + } + return false; } @Override