提交 ca44c630 编写于 作者: 浅梦2013's avatar 浅梦2013

mica-mqtt client 添加批量取消订阅。

上级 89016e67
...@@ -21,6 +21,7 @@ import net.dreamlu.iot.mqtt.codec.MqttProperties.MqttPropertyType; ...@@ -21,6 +21,7 @@ import net.dreamlu.iot.mqtt.codec.MqttProperties.MqttPropertyType;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.List; import java.util.List;
/** /**
...@@ -200,22 +201,29 @@ public final class MqttMessageBuilders { ...@@ -200,22 +201,29 @@ public final class MqttMessageBuilders {
public static final class SubscribeBuilder { public static final class SubscribeBuilder {
private List<MqttTopicSubscription> subscriptions; private final List<MqttTopicSubscription> subscriptions;
private int messageId; private int messageId;
private MqttProperties properties; private MqttProperties properties;
SubscribeBuilder() { SubscribeBuilder() {
subscriptions = new ArrayList<>(5);
} }
public SubscribeBuilder addSubscription(MqttQoS qos, String topic) { public SubscribeBuilder addSubscription(MqttTopicSubscription subscription) {
ensureSubscriptionsExist(); subscriptions.add(subscription);
subscriptions.add(new MqttTopicSubscription(topic, qos));
return this; return this;
} }
public SubscribeBuilder addSubscription(MqttQoS qos, String topic) {
return addSubscription(new MqttTopicSubscription(topic, qos));
}
public SubscribeBuilder addSubscription(String topic, MqttSubscriptionOption option) { public SubscribeBuilder addSubscription(String topic, MqttSubscriptionOption option) {
ensureSubscriptionsExist(); return addSubscription(new MqttTopicSubscription(topic, option));
subscriptions.add(new MqttTopicSubscription(topic, option)); }
public SubscribeBuilder addSubscriptions(Collection<MqttTopicSubscription> subscriptionColl) {
subscriptions.addAll(subscriptionColl);
return this; return this;
} }
...@@ -237,31 +245,27 @@ public final class MqttMessageBuilders { ...@@ -237,31 +245,27 @@ public final class MqttMessageBuilders {
MqttSubscribePayload mqttSubscribePayload = new MqttSubscribePayload(subscriptions); MqttSubscribePayload mqttSubscribePayload = new MqttSubscribePayload(subscriptions);
return new MqttSubscribeMessage(mqttFixedHeader, mqttVariableHeader, mqttSubscribePayload); return new MqttSubscribeMessage(mqttFixedHeader, mqttVariableHeader, mqttSubscribePayload);
} }
private void ensureSubscriptionsExist() {
if (subscriptions == null) {
subscriptions = new ArrayList<>(5);
}
}
} }
public static final class UnsubscribeBuilder { public static final class UnsubscribeBuilder {
private final List<String> topicFilters;
private List<String> topicFilters;
private int messageId; private int messageId;
private MqttProperties properties; private MqttProperties properties;
UnsubscribeBuilder() { UnsubscribeBuilder() {
topicFilters = new ArrayList<>(5);
} }
public UnsubscribeBuilder addTopicFilter(String topic) { public UnsubscribeBuilder addTopicFilter(String topic) {
if (topicFilters == null) {
topicFilters = new ArrayList<>(5);
}
topicFilters.add(topic); topicFilters.add(topic);
return this; return this;
} }
public UnsubscribeBuilder addTopicFilters(Collection<String> topicColl) {
topicFilters.addAll(topicColl);
return this;
}
public UnsubscribeBuilder messageId(int messageId) { public UnsubscribeBuilder messageId(int messageId) {
this.messageId = messageId; this.messageId = messageId;
return this; return this;
......
...@@ -197,11 +197,11 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor { ...@@ -197,11 +197,11 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor {
return; return;
} }
if (logger.isInfoEnabled()) { if (logger.isInfoEnabled()) {
logger.info("MQTT Topic:{} successfully unSubscribed messageId:{}", pendingUnSubscription.getTopic(), messageId); logger.info("MQTT Topic:{} successfully unSubscribed messageId:{}", pendingUnSubscription.getTopics(), messageId);
} }
pendingUnSubscription.onUnSubAckReceived(); pendingUnSubscription.onUnSubAckReceived();
clientSession.removePaddingUnSubscribe(messageId); clientSession.removePaddingUnSubscribe(messageId);
clientSession.removeSubscriptions(pendingUnSubscription.getTopic()); clientSession.removeSubscriptions(pendingUnSubscription.getTopics());
} }
@Override @Override
......
...@@ -116,8 +116,8 @@ public final class DefaultMqttClientSession implements IMqttClientSession { ...@@ -116,8 +116,8 @@ public final class DefaultMqttClientSession implements IMqttClientSession {
} }
@Override @Override
public void removeSubscriptions(String topicFilter) { public void removeSubscriptions(List<String> topicFilters) {
subscriptions.remove(topicFilter); topicFilters.forEach(subscriptions::remove);
} }
@Override @Override
......
...@@ -88,9 +88,9 @@ public interface IMqttClientSession { ...@@ -88,9 +88,9 @@ public interface IMqttClientSession {
/** /**
* 删除订阅过程消息 * 删除订阅过程消息
* *
* @param topicFilter topicFilter * @param topicFilters topicFilter 集合
*/ */
void removeSubscriptions(String topicFilter); void removeSubscriptions(List<String> topicFilters);
/** /**
* 添加取消订阅过程消息 * 添加取消订阅过程消息
......
...@@ -25,6 +25,8 @@ import org.tio.client.TioClient; ...@@ -25,6 +25,8 @@ import org.tio.client.TioClient;
import org.tio.core.Tio; import org.tio.core.Tio;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
...@@ -122,18 +124,28 @@ public final class MqttClient { ...@@ -122,18 +124,28 @@ public final class MqttClient {
/** /**
* 取消订阅 * 取消订阅
* *
* @param topicFilter topicFilter * @param topicFilters topicFilter 集合
* @return MqttClient
*/
public MqttClient unSubscribe(String... topicFilters) {
return unSubscribe(Arrays.asList(topicFilters));
}
/**
* 取消订阅
*
* @param topicFilters topicFilter 集合
* @return MqttClient * @return MqttClient
*/ */
public MqttClient unSubscribe(String topicFilter) { public MqttClient unSubscribe(List<String> topicFilters) {
int messageId = MqttClientMessageId.getId(); int messageId = MqttClientMessageId.getId();
MqttUnsubscribeMessage message = MqttMessageBuilders.unsubscribe() MqttUnsubscribeMessage message = MqttMessageBuilders.unsubscribe()
.addTopicFilter(topicFilter) .addTopicFilters(topicFilters)
.messageId(messageId) .messageId(messageId)
.build(); .build();
MqttPendingUnSubscription pendingUnSubscription = new MqttPendingUnSubscription(topicFilter, message); MqttPendingUnSubscription pendingUnSubscription = new MqttPendingUnSubscription(topicFilters, message);
Boolean result = Tio.send(context, message); Boolean result = Tio.send(context, message);
logger.info("MQTT Topic:{} messageId:{} unSubscribing result:{}", topicFilter, messageId, result); logger.info("MQTT Topic:{} messageId:{} unSubscribing result:{}", topicFilters, messageId, result);
// 解绑 subManage listener // 解绑 subManage listener
clientSession.addPaddingUnSubscribe(messageId, pendingUnSubscription); clientSession.addPaddingUnSubscribe(messageId, pendingUnSubscription);
pendingUnSubscription.startRetransmissionTimer(executor, msg -> Tio.send(context, msg)); pendingUnSubscription.startRetransmissionTimer(executor, msg -> Tio.send(context, msg));
......
...@@ -4,6 +4,7 @@ import net.dreamlu.iot.mqtt.codec.MqttMessage; ...@@ -4,6 +4,7 @@ import net.dreamlu.iot.mqtt.codec.MqttMessage;
import net.dreamlu.iot.mqtt.codec.MqttUnsubscribeMessage; import net.dreamlu.iot.mqtt.codec.MqttUnsubscribeMessage;
import net.dreamlu.iot.mqtt.core.common.RetryProcessor; import net.dreamlu.iot.mqtt.core.common.RetryProcessor;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.function.Consumer; import java.util.function.Consumer;
...@@ -11,16 +12,16 @@ import java.util.function.Consumer; ...@@ -11,16 +12,16 @@ import java.util.function.Consumer;
* MqttPendingSubscription,参考于 netty-mqtt-client * MqttPendingSubscription,参考于 netty-mqtt-client
*/ */
final class MqttPendingUnSubscription { final class MqttPendingUnSubscription {
private final String topic; private final List<String> topics;
private final RetryProcessor<MqttUnsubscribeMessage> retryProcessor = new RetryProcessor<>(); private final RetryProcessor<MqttUnsubscribeMessage> retryProcessor = new RetryProcessor<>();
MqttPendingUnSubscription(String topic, MqttUnsubscribeMessage unSubscribeMessage) { MqttPendingUnSubscription(List<String> topics, MqttUnsubscribeMessage unSubscribeMessage) {
this.topic = topic; this.topics = topics;
this.retryProcessor.setOriginalMessage(unSubscribeMessage); this.retryProcessor.setOriginalMessage(unSubscribeMessage);
} }
protected String getTopic() { protected List<String> getTopics() {
return topic; return topics;
} }
protected void startRetransmissionTimer(ScheduledThreadPoolExecutor executor, Consumer<MqttMessage> sendPacket) { protected void startRetransmissionTimer(ScheduledThreadPoolExecutor executor, Consumer<MqttMessage> sendPacket) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册