From e82145d679536800c446c0cd63a009cee1e34000 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A6=82=E6=A2=A6=E6=8A=80=E6=9C=AF?= <596392912@qq.com> Date: Tue, 11 Jan 2022 20:42:28 +0800 Subject: [PATCH] =?UTF-8?q?:sparkles:=20mqtt=20=E8=87=AA=E5=AE=9A=E4=B9=89?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3=E7=9A=84=E5=BC=82=E5=B8=B8=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../server/auth/IMqttServerAuthHandler.java | 22 +++++++++++++++++++ .../auth/IMqttServerPublishPermission.java | 22 +++++++++++++++++++ .../auth/IMqttServerSubscribeValidator.java | 21 ++++++++++++++++++ .../support/DefaultMqttServerProcessor.java | 18 ++++++++++----- 4 files changed, 77 insertions(+), 6 deletions(-) diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/auth/IMqttServerAuthHandler.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/auth/IMqttServerAuthHandler.java index bf51ea7..a76697e 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/auth/IMqttServerAuthHandler.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/auth/IMqttServerAuthHandler.java @@ -16,6 +16,8 @@ package net.dreamlu.iot.mqtt.core.server.auth; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.tio.core.ChannelContext; /** @@ -25,6 +27,26 @@ import org.tio.core.ChannelContext; */ @FunctionalInterface public interface IMqttServerAuthHandler { + Logger logger = LoggerFactory.getLogger(IMqttServerAuthHandler.class); + + /** + * 认证 + * + * @param context ChannelContext + * @param uniqueId mqtt 内唯一id,默认和 clientId 相同 + * @param clientId 客户端 ID + * @param userName 用户名 + * @param password 密码 + * @return 是否认证成功 + */ + default boolean verifyAuthenticate(ChannelContext context, String uniqueId, String clientId, String userName, String password) { + try { + return authenticate(context, uniqueId, clientId, userName, password); + } catch (Throwable e) { + logger.error("Mqtt authenticate validator error", e); + return false; + } + } /** * 认证 diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/auth/IMqttServerPublishPermission.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/auth/IMqttServerPublishPermission.java index 3799f54..1671800 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/auth/IMqttServerPublishPermission.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/auth/IMqttServerPublishPermission.java @@ -17,6 +17,8 @@ package net.dreamlu.iot.mqtt.core.server.auth; import net.dreamlu.iot.mqtt.codec.MqttQoS; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.tio.core.ChannelContext; /** @@ -25,6 +27,26 @@ import org.tio.core.ChannelContext; * @author L.cm */ public interface IMqttServerPublishPermission { + Logger logger = LoggerFactory.getLogger(IMqttServerPublishPermission.class); + + /** + * 否有发布权限 + * + * @param context ChannelContext + * @param clientId 客户端 id + * @param topic topic + * @param qoS MqttQoS + * @param isRetain 是否保留消息 + * @return 否有发布权限 + */ + default boolean verifyPermission(ChannelContext context, String clientId, String topic, MqttQoS qoS, boolean isRetain) { + try { + return hasPermission(context, clientId, topic, qoS, isRetain); + } catch (Throwable e) { + logger.error("Mqtt publish permission validator error", e); + return false; + } + } /** * 否有发布权限 diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/auth/IMqttServerSubscribeValidator.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/auth/IMqttServerSubscribeValidator.java index 992e673..52fe4e7 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/auth/IMqttServerSubscribeValidator.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/auth/IMqttServerSubscribeValidator.java @@ -17,6 +17,8 @@ package net.dreamlu.iot.mqtt.core.server.auth; import net.dreamlu.iot.mqtt.codec.MqttQoS; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.tio.core.ChannelContext; /** @@ -25,6 +27,25 @@ import org.tio.core.ChannelContext; * @author L.cm */ public interface IMqttServerSubscribeValidator { + Logger logger = LoggerFactory.getLogger(IMqttServerSubscribeValidator.class); + + /** + * 校验订阅的 topicFilter + * + * @param context ChannelContext + * @param clientId clientId + * @param topicFilter topicFilter + * @param qoS MqttQoS + * @return 是否有权限 + */ + default boolean verifyTopicFilter(ChannelContext context, String clientId, String topicFilter, MqttQoS qoS) { + try { + return isValid(context, clientId, topicFilter, qoS); + } catch (Throwable e) { + logger.error("Mqtt subscribe validator error", e); + return false; + } + } /** * 是否可以订阅 diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/support/DefaultMqttServerProcessor.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/support/DefaultMqttServerProcessor.java index 0b1408b..21ff6bb 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/support/DefaultMqttServerProcessor.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/support/DefaultMqttServerProcessor.java @@ -103,7 +103,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor { return; } // 3. 认证 - if (!authHandler.authenticate(context, uniqueId, clientId, userName, password)) { + if (!authHandler.verifyAuthenticate(context, uniqueId, clientId, userName, password)) { connAckByReturnCode(clientId, uniqueId, context, MqttConnectReasonCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD); return; } @@ -157,10 +157,16 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor { willMessage.setNode(nodeName); messageStore.addWillMessage(uniqueId, willMessage); } - // 9. 返回 ack + // 9. 在线状态 + try { + connectStatusListener.online(context, uniqueId); + } catch (Throwable e) { + logger.error("mqtt connectStatusListener", e); + connAckByReturnCode(clientId, uniqueId, context, MqttConnectReasonCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE); + return; + } + // 10. 返回 ack connAckByReturnCode(clientId, uniqueId, context, MqttConnectReasonCode.CONNECTION_ACCEPTED); - // 10. 在线状态 - connectStatusListener.online(context, uniqueId); } private static void connAckByReturnCode(String clientId, String uniqueId, ChannelContext context, MqttConnectReasonCode returnCode) { @@ -199,7 +205,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor { MqttPublishVariableHeader variableHeader = message.variableHeader(); String topicName = variableHeader.topicName(); // 1. 判断是否有发布权限,没有权限则断开 mqtt 连接 mqtt 5.x qos1、qos2 可以响应 reasonCode - if (publishPermission != null && !publishPermission.hasPermission(context, clientId, topicName, mqttQoS, fixedHeader.isRetain())) { + if (publishPermission != null && !publishPermission.verifyPermission(context, clientId, topicName, mqttQoS, fixedHeader.isRetain())) { Tio.remove(context, "Mqtt clientId:" + clientId + " publish topic: " + topicName + " no permission."); return; } @@ -318,7 +324,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor { String topicFilter = subscription.topicName(); MqttQoS mqttQoS = subscription.qualityOfService(); // 校验是否可以订阅 - if (enableSubscribeValidator && !subscribeValidator.isValid(context, clientId, topicFilter, mqttQoS)) { + if (enableSubscribeValidator && !subscribeValidator.verifyTopicFilter(context, clientId, topicFilter, mqttQoS)) { grantedQosList.add(MqttQoS.FAILURE); logger.error("Subscribe - clientId:{} topicFilter:{} mqttQoS:{} valid failed messageId:{}", clientId, topicFilter, mqttQoS, messageId); } else { -- GitLab