提交 6fb20678 编写于 作者: 浅梦2013's avatar 浅梦2013

mica-mqtt client 重复订阅优化。

上级 622031c4
......@@ -20,6 +20,8 @@ import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish;
import net.dreamlu.iot.mqtt.core.util.MultiValueMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
......@@ -29,6 +31,7 @@ import java.util.*;
* @author L.cm
*/
public final class DefaultMqttClientSession implements IMqttClientSession {
private static final Logger logger = LoggerFactory.getLogger(DefaultMqttClientSession.class);
/**
* 订阅的数据承载
*/
......@@ -68,6 +71,7 @@ public final class DefaultMqttClientSession implements IMqttClientSession {
for (MqttClientSubscription subscription : subscriptionSet) {
// 1. 已经存在订阅
if (clientSubscription.equals(subscription)) {
logger.error("MQTT Topic:{} mqttQoS:{} listener:{} duplicate subscription.", topicFilter, mqttQoS, listener);
return true;
}
MqttQoS subQos = subscription.getMqttQoS();
......@@ -77,6 +81,9 @@ public final class DefaultMqttClientSession implements IMqttClientSession {
// 3. 监听器不相同则直接添加
if (subListener != listener) {
subscriptions.add(topicFilter, clientSubscription);
logger.warn("MQTT Topic:{} mqttQoS:{} listener:{} has a higher level qos, added directly.", topicFilter, mqttQoS, listener);
} else {
logger.error("MQTT Topic:{} mqttQoS:{} listener:{} has a higher level qos, duplicate subscription.", topicFilter, mqttQoS, listener);
}
return true;
}
......
......@@ -98,22 +98,22 @@ public final class MqttClient {
* @return MqttClient
*/
public MqttClient subscribe(MqttQoS mqttQoS, String topicFilter, IMqttClientMessageListener listener) {
// 先判断是否已经订阅过,重复订阅
// 先判断是否已经订阅过,重复订阅,直接跳出
boolean subscribed = clientSession.isSubscribed(topicFilter, mqttQoS, listener);
if (subscribed) {
logger.error("MQTT Topic:{} mqttQoS:{} listener:{} duplicate subscription.", topicFilter, mqttQoS, listener);
} else {
int messageId = MqttClientMessageId.getId();
MqttSubscribeMessage message = MqttMessageBuilders.subscribe()
.addSubscription(mqttQoS, topicFilter)
.messageId(messageId)
.build();
MqttPendingSubscription pendingSubscription = new MqttPendingSubscription(mqttQoS, topicFilter, listener, message);
Boolean result = Tio.send(context, message);
logger.info("MQTT Topic:{} mqttQoS:{} messageId:{} subscribing result:{}", topicFilter, mqttQoS, messageId, result);
pendingSubscription.startRetransmitTimer(executor, (msg) -> Tio.send(context, message));
clientSession.addPaddingSubscribe(messageId, pendingSubscription);
return this;
}
// 没有订阅过
int messageId = MqttClientMessageId.getId();
MqttSubscribeMessage message = MqttMessageBuilders.subscribe()
.addSubscription(mqttQoS, topicFilter)
.messageId(messageId)
.build();
MqttPendingSubscription pendingSubscription = new MqttPendingSubscription(mqttQoS, topicFilter, listener, message);
Boolean result = Tio.send(context, message);
logger.info("MQTT Topic:{} mqttQoS:{} messageId:{} subscribing result:{}", topicFilter, mqttQoS, messageId, result);
pendingSubscription.startRetransmitTimer(executor, (msg) -> Tio.send(context, message));
clientSession.addPaddingSubscribe(messageId, pendingSubscription);
return this;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册