提交 4594da4d 编写于 作者: 如梦技术's avatar 如梦技术 🐛

mica-mqtt server 添加消息拦截器,gitee #I5KLST

上级 487d8151
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.server;
import net.dreamlu.iot.mqtt.codec.MqttMessage;
import net.dreamlu.iot.mqtt.core.server.interceptor.IMqttMessageInterceptor;
import org.tio.core.ChannelContext;
import java.util.ArrayList;
import java.util.List;
/**
* mqtt 消息拦截器集合
*
* @since 1.3.9
* @author L.cm
*/
public class MqttMessageInterceptors {
private final List<IMqttMessageInterceptor> interceptors;
public MqttMessageInterceptors() {
this.interceptors = new ArrayList<>();
}
/**
* 添加拦截器
*
* @param interceptor IMqttMessageInterceptor
*/
void add(IMqttMessageInterceptor interceptor) {
this.interceptors.add(interceptor);
}
/**
* 接收到TCP层传过来的数据后
*
* @param context ChannelContext
* @param receivedBytes 本次接收了多少字节
* @throws Exception Exception
*/
public void onAfterReceivedBytes(ChannelContext context, int receivedBytes) throws Exception {
for (IMqttMessageInterceptor interceptor : interceptors) {
interceptor.onAfterReceivedBytes(context, receivedBytes);
}
}
/**
* 解码成功后触发本方法
*
* @param context ChannelContext
* @param message MqttMessage
* @param packetSize packetSize
*/
public void onAfterDecoded(ChannelContext context, MqttMessage message, int packetSize) {
for (IMqttMessageInterceptor interceptor : interceptors) {
interceptor.onAfterDecoded(context, message, packetSize);
}
}
/**
* 处理一个消息包后
*
* @param context ChannelContext
* @param message MqttMessage
* @param cost 本次处理消息耗时,单位:毫秒
* @throws Exception Exception
*/
public void onAfterHandled(ChannelContext context, MqttMessage message, long cost) throws Exception {
for (IMqttMessageInterceptor interceptor : interceptors) {
interceptor.onAfterHandled(context, message, cost);
}
}
}
......@@ -16,6 +16,7 @@
package net.dreamlu.iot.mqtt.core.server;
import net.dreamlu.iot.mqtt.codec.MqttMessage;
import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener;
import net.dreamlu.iot.mqtt.core.server.http.core.MqttHttpHelper;
......@@ -43,6 +44,7 @@ public class MqttServerAioListener extends DefaultAioListener {
private final IMqttSessionManager sessionManager;
private final IMqttMessageDispatcher messageDispatcher;
private final IMqttConnectStatusListener connectStatusListener;
private final MqttMessageInterceptors messageInterceptors;
private final ThreadPoolExecutor executor;
public MqttServerAioListener(MqttServerCreator serverCreator, ThreadPoolExecutor executor) {
......@@ -50,6 +52,7 @@ public class MqttServerAioListener extends DefaultAioListener {
this.sessionManager = serverCreator.getSessionManager();
this.messageDispatcher = serverCreator.getMessageDispatcher();
this.connectStatusListener = serverCreator.getConnectStatusListener();
this.messageInterceptors = serverCreator.getMessageInterceptors();
this.executor = executor;
}
......@@ -142,4 +145,23 @@ public class MqttServerAioListener extends DefaultAioListener {
}
}
@Override
public void onAfterReceivedBytes(ChannelContext context, int receivedBytes) throws Exception {
messageInterceptors.onAfterReceivedBytes(context, receivedBytes);
}
@Override
public void onAfterDecoded(ChannelContext context, Packet packet, int packetSize) {
if (packet instanceof MqttMessage) {
messageInterceptors.onAfterDecoded(context, (MqttMessage) packet, packetSize);
}
}
@Override
public void onAfterHandled(ChannelContext context, Packet packet, long cost) throws Exception {
if (packet instanceof MqttMessage) {
messageInterceptors.onAfterHandled(context, (MqttMessage) packet, cost);
}
}
}
......@@ -29,6 +29,7 @@ import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener;
import net.dreamlu.iot.mqtt.core.server.event.IMqttMessageListener;
import net.dreamlu.iot.mqtt.core.server.event.IMqttSessionListener;
import net.dreamlu.iot.mqtt.core.server.http.core.MqttWebServer;
import net.dreamlu.iot.mqtt.core.server.interceptor.IMqttMessageInterceptor;
import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager;
import net.dreamlu.iot.mqtt.core.server.session.InMemoryMqttSessionManager;
import net.dreamlu.iot.mqtt.core.server.store.IMqttMessageStore;
......@@ -195,7 +196,10 @@ public class MqttServerCreator {
* TioConfig 自定义配置
*/
private Consumer<TioConfig> tioConfigCustomize;
/**
* 消息拦截器
*/
private final MqttMessageInterceptors messageInterceptors = new MqttMessageInterceptors();
public String getName() {
return name;
}
......@@ -506,6 +510,15 @@ public class MqttServerCreator {
return this;
}
public MqttMessageInterceptors getMessageInterceptors() {
return messageInterceptors;
}
public MqttServerCreator addInterceptor(IMqttMessageInterceptor interceptor) {
this.messageInterceptors.add(interceptor);
return this;
}
public MqttServer build() {
// 默认的节点名称,用于集群
if (StrUtil.isBlank(this.nodeName)) {
......
......@@ -19,7 +19,10 @@ package net.dreamlu.iot.mqtt.core.server.http.websocket;
import net.dreamlu.iot.mqtt.codec.ByteBufferUtil;
import net.dreamlu.iot.mqtt.codec.MqttMessage;
import net.dreamlu.iot.mqtt.codec.WriteBuffer;
import net.dreamlu.iot.mqtt.core.server.MqttMessageInterceptors;
import net.dreamlu.iot.mqtt.core.server.MqttServerCreator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.core.TioConfig;
......@@ -39,6 +42,7 @@ import java.nio.ByteBuffer;
* @author L.cm
*/
public class MqttWsMsgHandler implements IWsMsgHandler {
private static final Logger logger = LoggerFactory.getLogger(MqttWsMsgHandler.class);
/**
* mqtt websocket message body key
*/
......@@ -52,6 +56,7 @@ public class MqttWsMsgHandler implements IWsMsgHandler {
*/
private final String[] supportedSubProtocols;
private final AioHandler mqttServerAioHandler;
private final MqttMessageInterceptors messageInterceptors;
public MqttWsMsgHandler(MqttServerCreator serverCreator, AioHandler aioHandler) {
this(serverCreator, new String[]{"mqtt", "mqttv3.1", "mqttv3.1.1"}, aioHandler);
......@@ -63,6 +68,7 @@ public class MqttWsMsgHandler implements IWsMsgHandler {
this.serverCreator = serverCreator;
this.supportedSubProtocols = supportedSubProtocols;
this.mqttServerAioHandler = aioHandler;
this.messageInterceptors = serverCreator.getMessageInterceptors();
}
@Override
......@@ -108,7 +114,8 @@ public class MqttWsMsgHandler implements IWsMsgHandler {
// 可能会一次有多个包,所以需要进行拆包
while (buffer.hasRemaining()) {
// 解析 mqtt 消息
Packet packet = mqttServerAioHandler.decode(buffer, 0, 0, buffer.remaining(), context);
int readableLength = buffer.remaining();
Packet packet = mqttServerAioHandler.decode(buffer, 0, 0, readableLength, context);
if (packet == null) {
// 如果拆包之后还有剩余,写回到 WriteBuffer
int remaining = buffer.remaining();
......@@ -119,7 +126,20 @@ public class MqttWsMsgHandler implements IWsMsgHandler {
}
return null;
}
// 消息解析后
try {
messageInterceptors.onAfterDecoded(context, (MqttMessage) packet, readableLength);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
// 消息处理
mqttServerAioHandler.handler(packet, context);
// 消息处理后
try {
messageInterceptors.onAfterHandled(context, (MqttMessage) packet, readableLength);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
}
return null;
}
......
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.server.interceptor;
import net.dreamlu.iot.mqtt.codec.MqttMessage;
import org.tio.core.ChannelContext;
/**
* mqtt 消息拦截器
*
* @since 1.3.9
* @author L.cm
*/
public interface IMqttMessageInterceptor {
/**
* 接收到TCP层传过来的数据后
*
* @param context ChannelContext
* @param receivedBytes 本次接收了多少字节
* @throws Exception Exception
*/
default void onAfterReceivedBytes(ChannelContext context, int receivedBytes) throws Exception {
}
/**
* 解码成功后触发本方法
*
* @param context ChannelContext
* @param message MqttMessage
* @param packetSize packetSize
*/
default void onAfterDecoded(ChannelContext context, MqttMessage message, int packetSize) {
}
/**
* 处理一个消息包后
*
* @param context ChannelContext
* @param message MqttMessage
* @param cost 本次处理消息耗时,单位:毫秒
* @throws Exception Exception
*/
default void onAfterHandled(ChannelContext context, MqttMessage message, long cost) throws Exception {
}
}
......@@ -55,6 +55,7 @@ mqtt:
| IMqttMessageStore | 集群是,单机否 | 遗嘱和保留消息存储 |
| AbstractMqttMessageDispatcher | 集群是,单机否 | 消息转发,(遗嘱、保留消息转发) |
| IpStatListener | 否 | t-io ip 状态监听 |
| IMqttMessageInterceptor | 否 | 消息烂机器,1.3.9 新增 |
### 2.3 IMqttMessageListener (用于监听客户端上传的消息) 使用示例
......
......@@ -26,6 +26,7 @@ import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener;
import net.dreamlu.iot.mqtt.core.server.event.IMqttMessageListener;
import net.dreamlu.iot.mqtt.core.server.event.IMqttSessionListener;
import net.dreamlu.iot.mqtt.core.server.interceptor.IMqttMessageInterceptor;
import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager;
import net.dreamlu.iot.mqtt.core.server.store.IMqttMessageStore;
import net.dreamlu.iot.mqtt.core.server.support.DefaultMqttServerAuthHandler;
......@@ -83,6 +84,7 @@ public class MqttServerConfiguration {
ObjectProvider<IMqttMessageListener> messageListenerObjectProvider,
ObjectProvider<IMqttConnectStatusListener> connectStatusListenerObjectProvider,
ObjectProvider<IpStatListener> ipStatListenerObjectProvider,
ObjectProvider<IMqttMessageInterceptor> messageInterceptorObjectProvider,
ObjectProvider<MqttServerCustomizer> customizers) {
MqttServerCreator serverCreator = MqttServer.create()
.name(properties.getName())
......@@ -142,6 +144,8 @@ public class MqttServerConfiguration {
connectStatusListenerObjectProvider.ifAvailable(serverCreator::connectStatusListener);
// ip 状态监听
ipStatListenerObjectProvider.ifAvailable(serverCreator::ipStatListener);
// 消息监听器
messageInterceptorObjectProvider.orderedStream().forEach(serverCreator::addInterceptor);
// 自定义处理
customizers.ifAvailable((customizer) -> customizer.customize(serverCreator));
return serverCreator;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册