diff --git a/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttMessageBuilders.java b/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttMessageBuilders.java index 1e77beca3d9faa3bd4896d1121e9655b62918a49..d256f9a950077d2f357b50e09cae5d0fe0535a56 100644 --- a/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttMessageBuilders.java +++ b/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttMessageBuilders.java @@ -21,6 +21,7 @@ import net.dreamlu.iot.mqtt.codec.MqttProperties.MqttPropertyType; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.List; /** @@ -200,22 +201,29 @@ public final class MqttMessageBuilders { public static final class SubscribeBuilder { - private List subscriptions; + private final List subscriptions; private int messageId; private MqttProperties properties; SubscribeBuilder() { + subscriptions = new ArrayList<>(5); } - public SubscribeBuilder addSubscription(MqttQoS qos, String topic) { - ensureSubscriptionsExist(); - subscriptions.add(new MqttTopicSubscription(topic, qos)); + public SubscribeBuilder addSubscription(MqttTopicSubscription subscription) { + subscriptions.add(subscription); return this; } + public SubscribeBuilder addSubscription(MqttQoS qos, String topic) { + return addSubscription(new MqttTopicSubscription(topic, qos)); + } + public SubscribeBuilder addSubscription(String topic, MqttSubscriptionOption option) { - ensureSubscriptionsExist(); - subscriptions.add(new MqttTopicSubscription(topic, option)); + return addSubscription(new MqttTopicSubscription(topic, option)); + } + + public SubscribeBuilder addSubscriptions(Collection subscriptionColl) { + subscriptions.addAll(subscriptionColl); return this; } @@ -237,31 +245,27 @@ public final class MqttMessageBuilders { MqttSubscribePayload mqttSubscribePayload = new MqttSubscribePayload(subscriptions); return new MqttSubscribeMessage(mqttFixedHeader, mqttVariableHeader, mqttSubscribePayload); } - - private void ensureSubscriptionsExist() { - if (subscriptions == null) { - subscriptions = new ArrayList<>(5); - } - } } public static final class UnsubscribeBuilder { - - private List topicFilters; + private final List topicFilters; private int messageId; private MqttProperties properties; UnsubscribeBuilder() { + topicFilters = new ArrayList<>(5); } public UnsubscribeBuilder addTopicFilter(String topic) { - if (topicFilters == null) { - topicFilters = new ArrayList<>(5); - } topicFilters.add(topic); return this; } + public UnsubscribeBuilder addTopicFilters(Collection topicColl) { + topicFilters.addAll(topicColl); + return this; + } + public UnsubscribeBuilder messageId(int messageId) { this.messageId = messageId; return this; diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientProcessor.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientProcessor.java index 4773336609ef4bd3b02374566f3b4acde27f77a2..1f7448386860fa58e2373b09c32d5f7013f8c579 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientProcessor.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientProcessor.java @@ -197,11 +197,11 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor { return; } if (logger.isInfoEnabled()) { - logger.info("MQTT Topic:{} successfully unSubscribed messageId:{}", pendingUnSubscription.getTopic(), messageId); + logger.info("MQTT Topic:{} successfully unSubscribed messageId:{}", pendingUnSubscription.getTopics(), messageId); } pendingUnSubscription.onUnSubAckReceived(); clientSession.removePaddingUnSubscribe(messageId); - clientSession.removeSubscriptions(pendingUnSubscription.getTopic()); + clientSession.removeSubscriptions(pendingUnSubscription.getTopics()); } @Override diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientSession.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientSession.java index c824746f128288f4cae2ca78e50e241b859e0db1..d06a8c1808591a0deea53d37a8013bdd7be6a5ea 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientSession.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientSession.java @@ -116,8 +116,8 @@ public final class DefaultMqttClientSession implements IMqttClientSession { } @Override - public void removeSubscriptions(String topicFilter) { - subscriptions.remove(topicFilter); + public void removeSubscriptions(List topicFilters) { + topicFilters.forEach(subscriptions::remove); } @Override diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/IMqttClientSession.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/IMqttClientSession.java index 3b37cd0b5ee22d9ff2a5e926683d1c61689eaa5a..7c53a9a77de3be2f1b1e8ad007a03ca6e17458b4 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/IMqttClientSession.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/IMqttClientSession.java @@ -88,9 +88,9 @@ public interface IMqttClientSession { /** * 删除订阅过程消息 * - * @param topicFilter topicFilter + * @param topicFilters topicFilter 集合 */ - void removeSubscriptions(String topicFilter); + void removeSubscriptions(List topicFilters); /** * 添加取消订阅过程消息 diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClient.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClient.java index fe52d549449256b6f50dc81648cbbb14f453db64..bc263b17507e379b592142ed9a4df7e7d7c30ebc 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClient.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClient.java @@ -25,6 +25,8 @@ import org.tio.client.TioClient; import org.tio.core.Tio; import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; import java.util.Objects; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -122,18 +124,28 @@ public final class MqttClient { /** * 取消订阅 * - * @param topicFilter topicFilter + * @param topicFilters topicFilter 集合 + * @return MqttClient + */ + public MqttClient unSubscribe(String... topicFilters) { + return unSubscribe(Arrays.asList(topicFilters)); + } + + /** + * 取消订阅 + * + * @param topicFilters topicFilter 集合 * @return MqttClient */ - public MqttClient unSubscribe(String topicFilter) { + public MqttClient unSubscribe(List topicFilters) { int messageId = MqttClientMessageId.getId(); MqttUnsubscribeMessage message = MqttMessageBuilders.unsubscribe() - .addTopicFilter(topicFilter) + .addTopicFilters(topicFilters) .messageId(messageId) .build(); - MqttPendingUnSubscription pendingUnSubscription = new MqttPendingUnSubscription(topicFilter, message); + MqttPendingUnSubscription pendingUnSubscription = new MqttPendingUnSubscription(topicFilters, message); Boolean result = Tio.send(context, message); - logger.info("MQTT Topic:{} messageId:{} unSubscribing result:{}", topicFilter, messageId, result); + logger.info("MQTT Topic:{} messageId:{} unSubscribing result:{}", topicFilters, messageId, result); // 解绑 subManage listener clientSession.addPaddingUnSubscribe(messageId, pendingUnSubscription); pendingUnSubscription.startRetransmissionTimer(executor, msg -> Tio.send(context, msg)); diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttPendingUnSubscription.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttPendingUnSubscription.java index ce4065f4c3fb487729a31c22f23d19a66a0599b6..5f8e3481714d40430d8eba73895ce6a2344bf8f4 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttPendingUnSubscription.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttPendingUnSubscription.java @@ -4,6 +4,7 @@ import net.dreamlu.iot.mqtt.codec.MqttMessage; import net.dreamlu.iot.mqtt.codec.MqttUnsubscribeMessage; import net.dreamlu.iot.mqtt.core.common.RetryProcessor; +import java.util.List; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.function.Consumer; @@ -11,16 +12,16 @@ import java.util.function.Consumer; * MqttPendingSubscription,参考于 netty-mqtt-client */ final class MqttPendingUnSubscription { - private final String topic; + private final List topics; private final RetryProcessor retryProcessor = new RetryProcessor<>(); - MqttPendingUnSubscription(String topic, MqttUnsubscribeMessage unSubscribeMessage) { - this.topic = topic; + MqttPendingUnSubscription(List topics, MqttUnsubscribeMessage unSubscribeMessage) { + this.topics = topics; this.retryProcessor.setOriginalMessage(unSubscribeMessage); } - protected String getTopic() { - return topic; + protected List getTopics() { + return topics; } protected void startRetransmissionTimer(ScheduledThreadPoolExecutor executor, Consumer sendPacket) {