提交 6b1bc029 编写于 作者: 如梦技术's avatar 如梦技术 🐛

代码完善。

上级 0333b0f3
......@@ -21,7 +21,7 @@ import net.dreamlu.iot.mqtt.codec.MqttPublishMessage;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager;
import net.dreamlu.iot.mqtt.core.server.store.SubscribeStore;
import net.dreamlu.iot.mqtt.core.server.model.Subscribe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
......@@ -122,12 +122,12 @@ public final class MqttServer {
logger.warn("Mqtt publish to clientId:{} ChannelContext is null May be disconnected.", clientId);
return false;
}
List<SubscribeStore> subscribeList = sessionManager.searchSubscribe(clientId, topic);
List<Subscribe> subscribeList = sessionManager.searchSubscribe(clientId, topic);
if (subscribeList.isEmpty()) {
logger.warn("Mqtt publish but clientId:{} subscribeList is empty.", clientId);
return false;
}
for (SubscribeStore subscribe : subscribeList) {
for (Subscribe subscribe : subscribeList) {
int subMqttQoS = subscribe.getMqttQoS();
MqttQoS mqttQoS = qos.value() > subMqttQoS ? MqttQoS.valueOf(subMqttQoS) : qos;
publish(context, clientId, topic, payload, mqttQoS, retain);
......
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & www.net.dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.server.model;
import net.dreamlu.iot.mqtt.codec.MqttMessageType;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Map;
/**
* 消息模型,用于存储
*
* @author L.cm
*/
public class Message implements Serializable {
private String clientId;
private int messageId;
private Map<String, Object> headers;
private MqttMessageType messageType;
private byte[] payload;
private long storeTime;
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public int getMessageId() {
return messageId;
}
public void setMessageId(int messageId) {
this.messageId = messageId;
}
public Map<String, Object> getHeaders() {
return headers;
}
public void setHeaders(Map<String, Object> headers) {
this.headers = headers;
}
public MqttMessageType getMessageType() {
return messageType;
}
public void setMessageType(MqttMessageType messageType) {
this.messageType = messageType;
}
public byte[] getPayload() {
return payload;
}
public void setPayload(byte[] payload) {
this.payload = payload;
}
public long getStoreTime() {
return storeTime;
}
public void setStoreTime(long storeTime) {
this.storeTime = storeTime;
}
@Override
public String toString() {
return "MessageInfo{" +
"clientId='" + clientId + '\'' +
", messageId=" + messageId +
", headers=" + headers +
", messageType=" + messageType +
", payload=" + Arrays.toString(payload) +
", storeTime=" + storeTime +
'}';
}
}
......@@ -14,25 +14,25 @@
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.server.store;
package net.dreamlu.iot.mqtt.core.server.model;
import java.io.Serializable;
import java.util.Objects;
import java.util.regex.Pattern;
/**
* 订阅存储
* 订阅模型,用于存储
*
* @author L.cm
*/
public class SubscribeStore implements Serializable {
public class Subscribe implements Serializable {
private Pattern topicRegex;
private int mqttQoS;
public SubscribeStore() {
public Subscribe() {
}
public SubscribeStore(String topicFilter, int mqttQoS) {
public Subscribe(String topicFilter, int mqttQoS) {
this.topicRegex = Pattern.compile(topicFilter.replace("+", "[^/]+").replace("#", ".+").concat("$"));
this.mqttQoS = mqttQoS;
}
......@@ -41,7 +41,7 @@ public class SubscribeStore implements Serializable {
return topicRegex;
}
public SubscribeStore setTopicRegex(Pattern topicRegex) {
public Subscribe setTopicRegex(Pattern topicRegex) {
this.topicRegex = topicRegex;
return this;
}
......@@ -50,7 +50,7 @@ public class SubscribeStore implements Serializable {
return mqttQoS;
}
public SubscribeStore setMqttQoS(int mqttQoS) {
public Subscribe setMqttQoS(int mqttQoS) {
this.mqttQoS = mqttQoS;
return this;
}
......@@ -63,7 +63,7 @@ public class SubscribeStore implements Serializable {
if (o == null || getClass() != o.getClass()) {
return false;
}
SubscribeStore that = (SubscribeStore) o;
Subscribe that = (Subscribe) o;
return mqttQoS == that.mqttQoS &&
Objects.equals(topicRegex, that.topicRegex);
}
......
......@@ -19,7 +19,7 @@ package net.dreamlu.iot.mqtt.core.server.session;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish;
import net.dreamlu.iot.mqtt.core.server.store.SubscribeStore;
import net.dreamlu.iot.mqtt.core.server.model.Subscribe;
import java.util.List;
......@@ -54,7 +54,7 @@ public interface IMqttSessionManager {
* @param topicName topicName
* @return 订阅存储列表
*/
List<SubscribeStore> searchSubscribe(String clientId, String topicName);
List<Subscribe> searchSubscribe(String clientId, String topicName);
/**
* 添加发布过程存储
......
......@@ -19,7 +19,7 @@ package net.dreamlu.iot.mqtt.core.server.session;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish;
import net.dreamlu.iot.mqtt.core.server.store.SubscribeStore;
import net.dreamlu.iot.mqtt.core.server.model.Subscribe;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
......@@ -39,7 +39,7 @@ public class InMemoryMqttSessionManager implements IMqttSessionManager {
/**
* clientId: {topicFilter: SubscribeStore}
*/
private final ConcurrentMap<String, ConcurrentMap<String, SubscribeStore>> subscribeStore = new ConcurrentHashMap<>();
private final ConcurrentMap<String, ConcurrentMap<String, Subscribe>> subscribeStore = new ConcurrentHashMap<>();
/**
* clientId: {msgId: Object}
*/
......@@ -51,13 +51,13 @@ public class InMemoryMqttSessionManager implements IMqttSessionManager {
@Override
public void addSubscribe(String clientId, String topicFilter, MqttQoS mqttQoS) {
Map<String, SubscribeStore> data = subscribeStore.computeIfAbsent(clientId, (key) -> new ConcurrentHashMap<>(16));
data.put(topicFilter, new SubscribeStore(topicFilter, mqttQoS.value()));
Map<String, Subscribe> data = subscribeStore.computeIfAbsent(clientId, (key) -> new ConcurrentHashMap<>(16));
data.put(topicFilter, new Subscribe(topicFilter, mqttQoS.value()));
}
@Override
public void removeSubscribe(String clientId, String topicFilter) {
ConcurrentMap<String, SubscribeStore> map = subscribeStore.get(clientId);
ConcurrentMap<String, Subscribe> map = subscribeStore.get(clientId);
if (map == null) {
return;
}
......@@ -65,14 +65,14 @@ public class InMemoryMqttSessionManager implements IMqttSessionManager {
}
@Override
public List<SubscribeStore> searchSubscribe(String clientId, String topicName) {
List<SubscribeStore> list = new ArrayList<>();
ConcurrentMap<String, SubscribeStore> map = subscribeStore.get(clientId);
public List<Subscribe> searchSubscribe(String clientId, String topicName) {
List<Subscribe> list = new ArrayList<>();
ConcurrentMap<String, Subscribe> map = subscribeStore.get(clientId);
if (map == null) {
return Collections.emptyList();
}
Collection<SubscribeStore> values = map.values();
for (SubscribeStore value : values) {
Collection<Subscribe> values = map.values();
for (Subscribe value : values) {
if (value.getTopicRegex().matcher(topicName).matches()) {
list.add(value);
}
......
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & www.net.dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.server.store;
import net.dreamlu.iot.mqtt.core.server.model.Message;
import java.util.Collection;
/**
* message store
*
* @author L.cm
*/
public interface IMqttMessageStore {
/**
* 存储clientId的遗嘱消息
*
* @param clientId clientId
* @param message message
* @return boolean
*/
boolean addWillMessage(String clientId, Message message);
/**
* 清理该clientId的遗嘱消息
*
* @param clientId clientId
* @return boolean
*/
boolean clearWillMessage(String clientId);
/**
* 获取will消息
*
* @param clientId clientId
* @return Message
*/
Message getWillMessage(String clientId);
/**
* 存储retain消息
*
* @param topic topic
* @param message message
* @return boolean
*/
boolean addRetainMessage(String topic, Message message);
/**
* 清理该topic的 retain消息
*
* @param topic topic
* @return boolean
*/
boolean clearRetainMessage(String topic);
/**
* 获取所有retain消息
*
* @return Message Collection
*/
Collection<Message> getAllRetainMsg();
}
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & www.net.dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.server.store;
import net.dreamlu.iot.mqtt.core.server.model.Message;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* message store
*
* @author L.cm
*/
public class InMemoryMqttMessageStore implements IMqttMessageStore {
/**
* 遗嘱消息 clientId: Message
*/
private final ConcurrentMap<String, Message> willStore = new ConcurrentHashMap<>();
/**
* 保持消息 clientId: Message
*/
private final ConcurrentMap<String, Message> retainStore = new ConcurrentHashMap<>();
@Override
public boolean addWillMessage(String clientId, Message message) {
willStore.put(clientId, message);
return true;
}
@Override
public boolean clearWillMessage(String clientId) {
willStore.remove(clientId);
return true;
}
@Override
public Message getWillMessage(String clientId) {
return willStore.get(clientId);
}
@Override
public boolean addRetainMessage(String topic, Message message) {
retainStore.put(topic, message);
return true;
}
@Override
public boolean clearRetainMessage(String topic) {
retainStore.remove(topic);
return true;
}
@Override
public Collection<Message> getAllRetainMsg() {
return retainStore.values();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册