提交 69abe7aa 编写于 作者: 浅梦2013's avatar 浅梦2013

mica-mqtt-client 支持共享订阅

上级 608a7f79
......@@ -18,11 +18,10 @@ package net.dreamlu.iot.mqtt.core.client;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.codec.MqttTopicSubscription;
import net.dreamlu.iot.mqtt.core.util.TopicUtil;
import net.dreamlu.iot.mqtt.core.common.TopicFilterType;
import java.io.Serializable;
import java.util.Objects;
import java.util.regex.Pattern;
/**
* 发送订阅,未 ack 前的数据承载
......@@ -32,7 +31,7 @@ import java.util.regex.Pattern;
public final class MqttClientSubscription implements Serializable {
private final String topicFilter;
private final MqttQoS mqttQoS;
private final Pattern topicRegex;
private final TopicFilterType type;
private final IMqttClientMessageListener listener;
public MqttClientSubscription(MqttQoS mqttQoS,
......@@ -40,7 +39,7 @@ public final class MqttClientSubscription implements Serializable {
IMqttClientMessageListener listener) {
this.mqttQoS = Objects.requireNonNull(mqttQoS, "MQTT subscribe mqttQoS is null.");
this.topicFilter = Objects.requireNonNull(topicFilter, "MQTT subscribe topicFilter is null.");
this.topicRegex = TopicUtil.getTopicPattern(topicFilter);
this.type = TopicFilterType.getType(topicFilter);
this.listener = Objects.requireNonNull(listener, "MQTT subscribe listener is null.");
}
......@@ -57,7 +56,7 @@ public final class MqttClientSubscription implements Serializable {
}
public boolean matches(String topic) {
return this.topicRegex.matcher(topic).matches();
return this.type.match(this.topicFilter, topic);
}
public MqttTopicSubscription toTopicSubscription() {
......@@ -88,7 +87,6 @@ public final class MqttClientSubscription implements Serializable {
return "MqttClientSubscription{" +
"topicFilter='" + topicFilter + '\'' +
", mqttQoS=" + mqttQoS +
", topicRegex=" + topicRegex +
'}';
}
......
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.common;
import net.dreamlu.iot.mqtt.core.util.TopicUtil;
/**
* TopicFilter 类型
*
* @author L.cm
*/
public enum TopicFilterType {
/**
* 默认 TopicFilter
*/
NONE {
@Override
public boolean match(String topicFilter, String topicName) {
return TopicUtil.match(topicFilter, topicName);
}
},
/**
* $queue/ 为前缀的共享订阅是不带群组的共享订阅
*/
QUEUE {
@Override
public boolean match(String topicFilter, String topicName) {
int prefixLen = TopicFilterType.SHARE_QUEUE_PREFIX.length();
if (topicName.startsWith("/")) {
prefixLen = prefixLen - 1;
}
return TopicUtil.match(topicFilter.substring(prefixLen), topicName);
}
},
/**
* $share/<group-name>/ 为前缀的共享订阅是带群组的共享订阅
*/
SHARE {
@Override
public boolean match(String topicFilter, String topicName) {
String shareTopicFilter = topicFilter.substring(TopicFilterType.SHARE_GROUP_PREFIX.length());
String[] group = shareTopicFilter.split("/");
String groupName = group[0];
String shareTopicPrefix = TopicFilterType.SHARE_GROUP_PREFIX + groupName + '/';
int prefixLen = shareTopicPrefix.length();
if (topicName.startsWith("/")) {
prefixLen = prefixLen - 1;
}
return TopicUtil.match(topicFilter.substring(prefixLen), topicName);
}
};
/**
* 共享订阅的 topic
*/
public static final String SHARE_QUEUE_PREFIX = "$queue/";
public static final String SHARE_GROUP_PREFIX = "$share/";
/**
* 判断 topicFilter 和 topicName 匹配情况
*
* @param topicFilter topicFilter
* @param topicName topicName
* @return 是否匹配
*/
public abstract boolean match(String topicFilter, String topicName);
/**
* 获取 topicFilter 类型
*
* @param topicFilter topicFilter
* @return TopicFilterType
*/
public static TopicFilterType getType(String topicFilter) {
if (topicFilter.startsWith(TopicFilterType.SHARE_QUEUE_PREFIX)) {
return TopicFilterType.QUEUE;
} else if (topicFilter.startsWith(TopicFilterType.SHARE_GROUP_PREFIX)) {
return TopicFilterType.SHARE;
} else {
return TopicFilterType.NONE;
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册