From ca44c6308586773819af5c8bbed144ebad0acc09 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B5=85=E6=A2=A6?= <1101766085@qq.com> Date: Tue, 28 Dec 2021 15:14:57 +0800 Subject: [PATCH] =?UTF-8?q?:sparkles:=20mica-mqtt=20client=20=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=E6=89=B9=E9=87=8F=E5=8F=96=E6=B6=88=E8=AE=A2=E9=98=85?= =?UTF-8?q?=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/mqtt/codec/MqttMessageBuilders.java | 38 ++++++++++--------- .../client/DefaultMqttClientProcessor.java | 4 +- .../core/client/DefaultMqttClientSession.java | 4 +- .../mqtt/core/client/IMqttClientSession.java | 4 +- .../iot/mqtt/core/client/MqttClient.java | 22 ++++++++--- .../client/MqttPendingUnSubscription.java | 11 +++--- 6 files changed, 50 insertions(+), 33 deletions(-) diff --git a/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttMessageBuilders.java b/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttMessageBuilders.java index 1e77bec..d256f9a 100644 --- a/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttMessageBuilders.java +++ b/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttMessageBuilders.java @@ -21,6 +21,7 @@ import net.dreamlu.iot.mqtt.codec.MqttProperties.MqttPropertyType; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.List; /** @@ -200,22 +201,29 @@ public final class MqttMessageBuilders { public static final class SubscribeBuilder { - private List subscriptions; + private final List subscriptions; private int messageId; private MqttProperties properties; SubscribeBuilder() { + subscriptions = new ArrayList<>(5); } - public SubscribeBuilder addSubscription(MqttQoS qos, String topic) { - ensureSubscriptionsExist(); - subscriptions.add(new MqttTopicSubscription(topic, qos)); + public SubscribeBuilder addSubscription(MqttTopicSubscription subscription) { + subscriptions.add(subscription); return this; } + public SubscribeBuilder addSubscription(MqttQoS qos, String topic) { + return addSubscription(new MqttTopicSubscription(topic, qos)); + } + public SubscribeBuilder addSubscription(String topic, MqttSubscriptionOption option) { - ensureSubscriptionsExist(); - subscriptions.add(new MqttTopicSubscription(topic, option)); + return addSubscription(new MqttTopicSubscription(topic, option)); + } + + public SubscribeBuilder addSubscriptions(Collection subscriptionColl) { + subscriptions.addAll(subscriptionColl); return this; } @@ -237,31 +245,27 @@ public final class MqttMessageBuilders { MqttSubscribePayload mqttSubscribePayload = new MqttSubscribePayload(subscriptions); return new MqttSubscribeMessage(mqttFixedHeader, mqttVariableHeader, mqttSubscribePayload); } - - private void ensureSubscriptionsExist() { - if (subscriptions == null) { - subscriptions = new ArrayList<>(5); - } - } } public static final class UnsubscribeBuilder { - - private List topicFilters; + private final List topicFilters; private int messageId; private MqttProperties properties; UnsubscribeBuilder() { + topicFilters = new ArrayList<>(5); } public UnsubscribeBuilder addTopicFilter(String topic) { - if (topicFilters == null) { - topicFilters = new ArrayList<>(5); - } topicFilters.add(topic); return this; } + public UnsubscribeBuilder addTopicFilters(Collection topicColl) { + topicFilters.addAll(topicColl); + return this; + } + public UnsubscribeBuilder messageId(int messageId) { this.messageId = messageId; return this; diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientProcessor.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientProcessor.java index 4773336..1f74483 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientProcessor.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientProcessor.java @@ -197,11 +197,11 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor { return; } if (logger.isInfoEnabled()) { - logger.info("MQTT Topic:{} successfully unSubscribed messageId:{}", pendingUnSubscription.getTopic(), messageId); + logger.info("MQTT Topic:{} successfully unSubscribed messageId:{}", pendingUnSubscription.getTopics(), messageId); } pendingUnSubscription.onUnSubAckReceived(); clientSession.removePaddingUnSubscribe(messageId); - clientSession.removeSubscriptions(pendingUnSubscription.getTopic()); + clientSession.removeSubscriptions(pendingUnSubscription.getTopics()); } @Override diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientSession.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientSession.java index c824746..d06a8c1 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientSession.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientSession.java @@ -116,8 +116,8 @@ public final class DefaultMqttClientSession implements IMqttClientSession { } @Override - public void removeSubscriptions(String topicFilter) { - subscriptions.remove(topicFilter); + public void removeSubscriptions(List topicFilters) { + topicFilters.forEach(subscriptions::remove); } @Override diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/IMqttClientSession.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/IMqttClientSession.java index 3b37cd0..7c53a9a 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/IMqttClientSession.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/IMqttClientSession.java @@ -88,9 +88,9 @@ public interface IMqttClientSession { /** * 删除订阅过程消息 * - * @param topicFilter topicFilter + * @param topicFilters topicFilter 集合 */ - void removeSubscriptions(String topicFilter); + void removeSubscriptions(List topicFilters); /** * 添加取消订阅过程消息 diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClient.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClient.java index fe52d54..bc263b1 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClient.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClient.java @@ -25,6 +25,8 @@ import org.tio.client.TioClient; import org.tio.core.Tio; import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; import java.util.Objects; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -122,18 +124,28 @@ public final class MqttClient { /** * 取消订阅 * - * @param topicFilter topicFilter + * @param topicFilters topicFilter 集合 + * @return MqttClient + */ + public MqttClient unSubscribe(String... topicFilters) { + return unSubscribe(Arrays.asList(topicFilters)); + } + + /** + * 取消订阅 + * + * @param topicFilters topicFilter 集合 * @return MqttClient */ - public MqttClient unSubscribe(String topicFilter) { + public MqttClient unSubscribe(List topicFilters) { int messageId = MqttClientMessageId.getId(); MqttUnsubscribeMessage message = MqttMessageBuilders.unsubscribe() - .addTopicFilter(topicFilter) + .addTopicFilters(topicFilters) .messageId(messageId) .build(); - MqttPendingUnSubscription pendingUnSubscription = new MqttPendingUnSubscription(topicFilter, message); + MqttPendingUnSubscription pendingUnSubscription = new MqttPendingUnSubscription(topicFilters, message); Boolean result = Tio.send(context, message); - logger.info("MQTT Topic:{} messageId:{} unSubscribing result:{}", topicFilter, messageId, result); + logger.info("MQTT Topic:{} messageId:{} unSubscribing result:{}", topicFilters, messageId, result); // 解绑 subManage listener clientSession.addPaddingUnSubscribe(messageId, pendingUnSubscription); pendingUnSubscription.startRetransmissionTimer(executor, msg -> Tio.send(context, msg)); diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttPendingUnSubscription.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttPendingUnSubscription.java index ce4065f..5f8e348 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttPendingUnSubscription.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttPendingUnSubscription.java @@ -4,6 +4,7 @@ import net.dreamlu.iot.mqtt.codec.MqttMessage; import net.dreamlu.iot.mqtt.codec.MqttUnsubscribeMessage; import net.dreamlu.iot.mqtt.core.common.RetryProcessor; +import java.util.List; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.function.Consumer; @@ -11,16 +12,16 @@ import java.util.function.Consumer; * MqttPendingSubscription,参考于 netty-mqtt-client */ final class MqttPendingUnSubscription { - private final String topic; + private final List topics; private final RetryProcessor retryProcessor = new RetryProcessor<>(); - MqttPendingUnSubscription(String topic, MqttUnsubscribeMessage unSubscribeMessage) { - this.topic = topic; + MqttPendingUnSubscription(List topics, MqttUnsubscribeMessage unSubscribeMessage) { + this.topics = topics; this.retryProcessor.setOriginalMessage(unSubscribeMessage); } - protected String getTopic() { - return topic; + protected List getTopics() { + return topics; } protected void startRetransmissionTimer(ScheduledThreadPoolExecutor executor, Consumer sendPacket) { -- GitLab