提交 e82145d6 编写于 作者: 如梦技术's avatar 如梦技术 🐛

mqtt 自定义接口的异常处理

上级 ee894a28
......@@ -16,6 +16,8 @@
package net.dreamlu.iot.mqtt.core.server.auth;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
/**
......@@ -25,6 +27,26 @@ import org.tio.core.ChannelContext;
*/
@FunctionalInterface
public interface IMqttServerAuthHandler {
Logger logger = LoggerFactory.getLogger(IMqttServerAuthHandler.class);
/**
* 认证
*
* @param context ChannelContext
* @param uniqueId mqtt 内唯一id,默认和 clientId 相同
* @param clientId 客户端 ID
* @param userName 用户名
* @param password 密码
* @return 是否认证成功
*/
default boolean verifyAuthenticate(ChannelContext context, String uniqueId, String clientId, String userName, String password) {
try {
return authenticate(context, uniqueId, clientId, userName, password);
} catch (Throwable e) {
logger.error("Mqtt authenticate validator error", e);
return false;
}
}
/**
* 认证
......
......@@ -17,6 +17,8 @@
package net.dreamlu.iot.mqtt.core.server.auth;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
/**
......@@ -25,6 +27,26 @@ import org.tio.core.ChannelContext;
* @author L.cm
*/
public interface IMqttServerPublishPermission {
Logger logger = LoggerFactory.getLogger(IMqttServerPublishPermission.class);
/**
* 否有发布权限
*
* @param context ChannelContext
* @param clientId 客户端 id
* @param topic topic
* @param qoS MqttQoS
* @param isRetain 是否保留消息
* @return 否有发布权限
*/
default boolean verifyPermission(ChannelContext context, String clientId, String topic, MqttQoS qoS, boolean isRetain) {
try {
return hasPermission(context, clientId, topic, qoS, isRetain);
} catch (Throwable e) {
logger.error("Mqtt publish permission validator error", e);
return false;
}
}
/**
* 否有发布权限
......
......@@ -17,6 +17,8 @@
package net.dreamlu.iot.mqtt.core.server.auth;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
/**
......@@ -25,6 +27,25 @@ import org.tio.core.ChannelContext;
* @author L.cm
*/
public interface IMqttServerSubscribeValidator {
Logger logger = LoggerFactory.getLogger(IMqttServerSubscribeValidator.class);
/**
* 校验订阅的 topicFilter
*
* @param context ChannelContext
* @param clientId clientId
* @param topicFilter topicFilter
* @param qoS MqttQoS
* @return 是否有权限
*/
default boolean verifyTopicFilter(ChannelContext context, String clientId, String topicFilter, MqttQoS qoS) {
try {
return isValid(context, clientId, topicFilter, qoS);
} catch (Throwable e) {
logger.error("Mqtt subscribe validator error", e);
return false;
}
}
/**
* 是否可以订阅
......
......@@ -103,7 +103,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
return;
}
// 3. 认证
if (!authHandler.authenticate(context, uniqueId, clientId, userName, password)) {
if (!authHandler.verifyAuthenticate(context, uniqueId, clientId, userName, password)) {
connAckByReturnCode(clientId, uniqueId, context, MqttConnectReasonCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
return;
}
......@@ -157,10 +157,16 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
willMessage.setNode(nodeName);
messageStore.addWillMessage(uniqueId, willMessage);
}
// 9. 返回 ack
// 9. 在线状态
try {
connectStatusListener.online(context, uniqueId);
} catch (Throwable e) {
logger.error("mqtt connectStatusListener", e);
connAckByReturnCode(clientId, uniqueId, context, MqttConnectReasonCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
return;
}
// 10. 返回 ack
connAckByReturnCode(clientId, uniqueId, context, MqttConnectReasonCode.CONNECTION_ACCEPTED);
// 10. 在线状态
connectStatusListener.online(context, uniqueId);
}
private static void connAckByReturnCode(String clientId, String uniqueId, ChannelContext context, MqttConnectReasonCode returnCode) {
......@@ -199,7 +205,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
MqttPublishVariableHeader variableHeader = message.variableHeader();
String topicName = variableHeader.topicName();
// 1. 判断是否有发布权限,没有权限则断开 mqtt 连接 mqtt 5.x qos1、qos2 可以响应 reasonCode
if (publishPermission != null && !publishPermission.hasPermission(context, clientId, topicName, mqttQoS, fixedHeader.isRetain())) {
if (publishPermission != null && !publishPermission.verifyPermission(context, clientId, topicName, mqttQoS, fixedHeader.isRetain())) {
Tio.remove(context, "Mqtt clientId:" + clientId + " publish topic: " + topicName + " no permission.");
return;
}
......@@ -318,7 +324,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
String topicFilter = subscription.topicName();
MqttQoS mqttQoS = subscription.qualityOfService();
// 校验是否可以订阅
if (enableSubscribeValidator && !subscribeValidator.isValid(context, clientId, topicFilter, mqttQoS)) {
if (enableSubscribeValidator && !subscribeValidator.verifyTopicFilter(context, clientId, topicFilter, mqttQoS)) {
grantedQosList.add(MqttQoS.FAILURE);
logger.error("Subscribe - clientId:{} topicFilter:{} mqttQoS:{} valid failed messageId:{}", clientId, topicFilter, mqttQoS, messageId);
} else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册