提交 a37f17b5 编写于 作者: 浅梦2013's avatar 浅梦2013

mica-mqtt server 添加发布权限接口。

上级 698fe55f
......@@ -19,6 +19,7 @@ package net.dreamlu.iot.mqtt.core.server;
import net.dreamlu.iot.mqtt.codec.ByteBufferAllocator;
import net.dreamlu.iot.mqtt.codec.MqttConstant;
import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerAuthHandler;
import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerPublishPermission;
import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerSubscribeValidator;
import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerUniqueIdService;
import net.dreamlu.iot.mqtt.core.server.dispatcher.AbstractMqttMessageDispatcher;
......@@ -100,6 +101,10 @@ public class MqttServerCreator {
* 订阅校验器
*/
private IMqttServerSubscribeValidator subscribeValidator;
/**
* 发布权限校验
*/
private IMqttServerPublishPermission publishPermission;
/**
* 消息处理器
*/
......@@ -277,6 +282,15 @@ public class MqttServerCreator {
return this;
}
public IMqttServerPublishPermission getPublishPermission() {
return publishPermission;
}
public MqttServerCreator publishPermission(IMqttServerPublishPermission publishPermission) {
this.publishPermission = publishPermission;
return this;
}
public IMqttMessageDispatcher getMessageDispatcher() {
return messageDispatcher;
}
......
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.server.auth;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import org.tio.core.ChannelContext;
/**
* mqtt 服务端校验客户端是否有发布权限
*
* @author L.cm
*/
public interface IMqttServerPublishPermission {
/**
* 否有发布权限
*
* @param context ChannelContext
* @param clientId 客户端 id
* @param topic topic
* @param qoS MqttQoS
* @return 否有发布权限
*/
boolean hasPermission(ChannelContext context, String clientId, String topic, MqttQoS qoS);
}
......@@ -23,6 +23,7 @@ import net.dreamlu.iot.mqtt.core.server.MqttConst;
import net.dreamlu.iot.mqtt.core.server.MqttServerCreator;
import net.dreamlu.iot.mqtt.core.server.MqttServerProcessor;
import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerAuthHandler;
import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerPublishPermission;
import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerSubscribeValidator;
import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerUniqueIdService;
import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
......@@ -51,6 +52,10 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
*/
public class DefaultMqttServerProcessor implements MqttServerProcessor {
private static final Logger logger = LoggerFactory.getLogger(DefaultMqttServerProcessor.class);
/**
* 默认的超时时间
*/
private static final long DEFAULT_HEARTBEAT_TIMEOUT = 120_000L;
/**
* 2 倍客户端 keepAlive 时间
*/
......@@ -61,6 +66,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
private final IMqttServerAuthHandler authHandler;
private final IMqttServerUniqueIdService uniqueIdService;
private final IMqttServerSubscribeValidator subscribeValidator;
private final IMqttServerPublishPermission publishPermission;
private final IMqttMessageDispatcher messageDispatcher;
private final IMqttConnectStatusListener connectStatusListener;
private final IMqttMessageListener messageListener;
......@@ -68,12 +74,13 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
private final ScheduledThreadPoolExecutor executor;
public DefaultMqttServerProcessor(MqttServerCreator serverCreator, ScheduledThreadPoolExecutor executor) {
this.heartbeatTimeout = serverCreator.getHeartbeatTimeout() == null ? 120_000L : serverCreator.getHeartbeatTimeout();
this.heartbeatTimeout = serverCreator.getHeartbeatTimeout() == null ? DEFAULT_HEARTBEAT_TIMEOUT : serverCreator.getHeartbeatTimeout();
this.messageStore = serverCreator.getMessageStore();
this.sessionManager = serverCreator.getSessionManager();
this.authHandler = serverCreator.getAuthHandler();
this.uniqueIdService = serverCreator.getUniqueIdService();
this.subscribeValidator = serverCreator.getSubscribeValidator();
this.publishPermission = serverCreator.getPublishPermission();
this.messageDispatcher = serverCreator.getMessageDispatcher();
this.connectStatusListener = serverCreator.getConnectStatusListener();
this.messageListener = serverCreator.getMessageListener();
......@@ -191,6 +198,11 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
MqttQoS mqttQoS = fixedHeader.qosLevel();
MqttPublishVariableHeader variableHeader = message.variableHeader();
String topicName = variableHeader.topicName();
// 1. 判断是否有发布权限
if (publishPermission != null && !publishPermission.hasPermission(context, clientId, topicName, mqttQoS)) {
return;
}
// 2. 处理发布逻辑
int packetId = variableHeader.packetId();
logger.debug("Publish - clientId:{} topicName:{} mqttQoS:{} packetId:{}", clientId, topicName, mqttQoS, packetId);
switch (mqttQoS) {
......
......@@ -19,6 +19,7 @@ package net.dreamlu.iot.mqtt.spring.server;
import net.dreamlu.iot.mqtt.core.server.MqttServer;
import net.dreamlu.iot.mqtt.core.server.MqttServerCreator;
import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerAuthHandler;
import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerPublishPermission;
import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerSubscribeValidator;
import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerUniqueIdService;
import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
......@@ -56,6 +57,7 @@ public class MqttServerConfiguration {
ObjectProvider<IMqttServerAuthHandler> authHandlerObjectProvider,
ObjectProvider<IMqttServerUniqueIdService> uniqueIdServiceObjectProvider,
ObjectProvider<IMqttServerSubscribeValidator> subscribeValidatorObjectProvider,
ObjectProvider<IMqttServerPublishPermission> publishPermissionObjectProvider,
ObjectProvider<IMqttMessageDispatcher> messageDispatcherObjectProvider,
ObjectProvider<IMqttMessageStore> messageStoreObjectProvider,
ObjectProvider<IMqttSessionManager> sessionManagerObjectProvider,
......@@ -102,6 +104,8 @@ public class MqttServerConfiguration {
uniqueIdServiceObjectProvider.ifAvailable(serverCreator::uniqueIdService);
// 订阅校验
subscribeValidatorObjectProvider.ifAvailable(serverCreator::subscribeValidator);
// 订阅权限校验
publishPermissionObjectProvider.ifAvailable(serverCreator::publishPermission);
// 消息转发
messageDispatcherObjectProvider.ifAvailable(serverCreator::messageDispatcher);
// 消息存储
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册