From 9f08b3fd2891d4a888e4b439dcfb35dccc025847 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A6=82=E6=A2=A6=E6=8A=80=E6=9C=AF?= <596392912@qq.com> Date: Sun, 26 Dec 2021 12:08:52 +0800 Subject: [PATCH] =?UTF-8?q?:sparkles:=20=E4=BB=A3=E7=A0=81=E4=BC=98?= =?UTF-8?q?=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/mqtt/core/client/MqttClient.java | 25 +++++++++---------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClient.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClient.java index 38ec539..2e1acf2 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClient.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClient.java @@ -100,20 +100,19 @@ public final class MqttClient { public MqttClient subscribe(MqttQoS mqttQoS, String topicFilter, IMqttClientMessageListener listener) { // 先判断是否已经订阅过,重复订阅,直接跳出 boolean subscribed = clientSession.isSubscribed(topicFilter, mqttQoS, listener); - if (subscribed) { - return this; + if (!subscribed) { + // 没有订阅过 + int messageId = MqttClientMessageId.getId(); + MqttSubscribeMessage message = MqttMessageBuilders.subscribe() + .addSubscription(mqttQoS, topicFilter) + .messageId(messageId) + .build(); + MqttPendingSubscription pendingSubscription = new MqttPendingSubscription(mqttQoS, topicFilter, listener, message); + Boolean result = Tio.send(context, message); + logger.info("MQTT Topic:{} mqttQoS:{} messageId:{} subscribing result:{}", topicFilter, mqttQoS, messageId, result); + pendingSubscription.startRetransmitTimer(executor, (msg) -> Tio.send(context, message)); + clientSession.addPaddingSubscribe(messageId, pendingSubscription); } - // 没有订阅过 - int messageId = MqttClientMessageId.getId(); - MqttSubscribeMessage message = MqttMessageBuilders.subscribe() - .addSubscription(mqttQoS, topicFilter) - .messageId(messageId) - .build(); - MqttPendingSubscription pendingSubscription = new MqttPendingSubscription(mqttQoS, topicFilter, listener, message); - Boolean result = Tio.send(context, message); - logger.info("MQTT Topic:{} mqttQoS:{} messageId:{} subscribing result:{}", topicFilter, mqttQoS, messageId, result); - pendingSubscription.startRetransmitTimer(executor, (msg) -> Tio.send(context, message)); - clientSession.addPaddingSubscribe(messageId, pendingSubscription); return this; } -- GitLab