提交 54837baf 编写于 作者: 如梦技术's avatar 如梦技术 🐛

优化 mqtt 解码异常处理。

上级 c883282f
......@@ -17,6 +17,8 @@
package net.dreamlu.iot.mqtt.codec;
import org.tio.core.ChannelContext;
import org.tio.core.exception.AioDecodeException;
import org.tio.core.exception.TioDecodeException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
......@@ -54,7 +56,21 @@ public final class MqttDecoder {
this.maxClientIdLength = maxClientIdLength;
}
public MqttMessage decode(ChannelContext ctx, ByteBuffer buffer, int limit, int position, int readableLength) {
public MqttMessage doDecode(ChannelContext ctx, ByteBuffer buffer, int limit, int position, int readableLength) throws TioDecodeException {
// 1. 半包
MqttMessage message = decode(ctx, buffer, limit, position, readableLength);
if (message == null) {
return null;
}
// 2. 解码异常
DecoderResult decoderResult = message.decoderResult();
if (decoderResult.isFailure() && decoderResult.getCause() instanceof DecoderException) {
throw new AioDecodeException(decoderResult.getCause());
}
return message;
}
private MqttMessage decode(ChannelContext ctx, ByteBuffer buffer, int limit, int position, int readableLength) {
// 1. 首先判断缓存中协议头是否读完(MQTT协议头为2字节)
if (readableLength < MQTT_PROTOCOL_LENGTH) {
return null;
......
......@@ -20,7 +20,6 @@ import net.dreamlu.iot.mqtt.codec.*;
import org.tio.client.intf.ClientAioHandler;
import org.tio.core.ChannelContext;
import org.tio.core.TioConfig;
import org.tio.core.exception.AioDecodeException;
import org.tio.core.exception.TioDecodeException;
import org.tio.core.intf.Packet;
......@@ -52,15 +51,7 @@ public class MqttClientAioHandler implements ClientAioHandler {
@Override
public Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext context) throws TioDecodeException {
MqttMessage message = mqttDecoder.decode(context, buffer, limit, position, readableLength);
if (message == null) {
return null;
}
DecoderResult decoderResult = message.decoderResult();
if (decoderResult.isFailure() && decoderResult.getCause() instanceof DecoderException) {
throw new AioDecodeException(decoderResult.getCause());
}
return message;
return mqttDecoder.doDecode(context, buffer, limit, position, readableLength);
}
@Override
......
......@@ -22,7 +22,6 @@ import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.core.TioConfig;
import org.tio.core.exception.AioDecodeException;
import org.tio.core.exception.TioDecodeException;
import org.tio.core.intf.Packet;
import org.tio.server.AcceptCompletionHandler;
......@@ -61,15 +60,7 @@ public class MqttServerAioHandler implements ServerAioHandler {
*/
@Override
public Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext context) throws TioDecodeException {
MqttMessage message = mqttDecoder.decode(context, buffer, limit, position, readableLength);
if (message == null) {
return null;
}
DecoderResult decoderResult = message.decoderResult();
if (decoderResult.isFailure() && decoderResult.getCause() instanceof DecoderException) {
throw new AioDecodeException(decoderResult.getCause());
}
return message;
return mqttDecoder.doDecode(context, buffer, limit, position, readableLength);
}
/**
......
......@@ -56,19 +56,26 @@ public class MqttServerAioListener extends DefaultAioListener {
@Override
public void onBeforeClose(ChannelContext context, Throwable throwable, String remark, boolean isRemove) {
// 1. 业务 id
String clientId = context.getBsId();
// 2. 判断是否正常断开
boolean isNotNormalDisconnect = context.get(MqttConst.DIS_CONNECTED) == null;
if (isNotNormalDisconnect || throwable != null) {
logger.error("Mqtt server close clientId isBlank, remark:{} isRemove:{}", remark, isRemove, throwable);
} else {
logger.info("Mqtt server close clientId:{} remark:{} isRemove:{}", clientId, remark, isRemove);
}
// 3. 业务 id 不能为空
if (StrUtil.isBlank(clientId)) {
logger.warn("Mqtt server close clientId isBlank, remark:{} isRemove:{}", remark, isRemove);
return;
}
logger.info("Mqtt server close clientId:{} remark:{} isRemove:{}", clientId, remark, isRemove);
// 1. 对于异常断开连接,处理遗嘱消息
// 4. 对于异常断开连接,处理遗嘱消息
sendWillMessage(context, clientId);
// 2. 会话清理
// 5. 会话清理
cleanSession(clientId);
// 3. 解绑 clientId
// 6. 解绑 clientId
Tio.unbindBsId(context);
// 4. 下线事件
// 7. 下线事件
notify(clientId);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册