提交 e1e19b03 编写于 作者: 如梦技术's avatar 如梦技术 🐛

移除 lombok 添加 TODO 晚上整。

上级 3ba5df75
......@@ -42,7 +42,7 @@ public class MqttClientAioHandler implements ClientAioHandler {
public MqttClientAioHandler(MqttClientProcessor processor) {
this.mqttDecoder = MqttDecoder.INSTANCE;
this.mqttEncoder = MqttEncoder.INSTANCE;
this.processor =processor;
this.processor = processor;
}
@Override
......@@ -66,7 +66,7 @@ public class MqttClientAioHandler implements ClientAioHandler {
// 1. 先判断 mqtt 消息解析是否正常
DecoderResult decoderResult = message.decoderResult();
if (decoderResult.isFailure()) {
processFailure(context, message);
processFailure(message);
return;
}
MqttFixedHeader fixedHeader = message.fixedHeader();
......@@ -105,10 +105,9 @@ public class MqttClientAioHandler implements ClientAioHandler {
/**
* 处理失败
*
* @param context ChannelContext
* @param mqttMessage MqttMessage
*/
private void processFailure(ChannelContext context, MqttMessage mqttMessage) {
private void processFailure(MqttMessage mqttMessage) {
// 客户端失败,我认为日志记录异常就行了
Throwable cause = mqttMessage.decoderResult().getCause();
log.error(cause.getMessage(), cause);
......
......@@ -16,7 +16,6 @@
package net.dreamlu.iot.mqtt.core.client;
import lombok.RequiredArgsConstructor;
import net.dreamlu.iot.mqtt.codec.MqttConnectMessage;
import net.dreamlu.iot.mqtt.codec.MqttMessageBuilders;
import org.tio.client.DefaultClientAioListener;
......@@ -30,12 +29,17 @@ import java.nio.charset.StandardCharsets;
*
* @author L.cm
*/
@RequiredArgsConstructor
public class MqttClientAioListener extends DefaultClientAioListener {
private final String clientId;
private final String username;
private final String password;
public MqttClientAioListener(String clientId, String username, String password) {
this.clientId = clientId;
this.username = username;
this.password = password;
}
@Override
public void onAfterConnected(ChannelContext context, boolean isConnected, boolean isReconnect) {
if (isConnected) {
......
......@@ -33,7 +33,7 @@ import java.nio.ByteBuffer;
* @author L.cm
*/
public class MqttServerAioHandler implements ServerAioHandler {
private static final Logger log = LoggerFactory.getLogger(AcceptCompletionHandler.class);
private static Logger log = LoggerFactory.getLogger(AcceptCompletionHandler.class);
private final MqttDecoder mqttDecoder;
private final MqttEncoder mqttEncoder;
private final MqttServerProcessor processor;
......@@ -93,12 +93,15 @@ public class MqttServerAioHandler implements ServerAioHandler {
MqttFixedHeader fixedHeader = mqttMessage.fixedHeader();
MqttMessageType messageType = fixedHeader.messageType();
log.debug("MqttMessageType:{}", messageType);
// 2. 单独处理 CONNECT 的消息
// 3. 其他消息先判断是否连接、认证过
// TODO L.cm 还是设计 filter 去处理该问题???
// TODO L.cm t-io 的 bsid 是否可以用它来绑定 clientId
switch (messageType) {
case CONNECT:
processor.processConnect(context, (MqttConnectMessage) mqttMessage);
break;
case CONNACK:
break;
case PUBLISH:
processor.processPublish(context, (MqttPublishMessage) mqttMessage);
break;
......@@ -117,18 +120,12 @@ public class MqttServerAioHandler implements ServerAioHandler {
case SUBSCRIBE:
processor.processSubscribe(context, (MqttSubscribeMessage) mqttMessage);
break;
case SUBACK:
break;
case UNSUBSCRIBE:
processor.processUnSubscribe(context, (MqttUnsubscribeMessage) mqttMessage);
break;
case UNSUBACK:
break;
case PINGREQ:
processor.processPingReq(context);
break;
case PINGRESP:
break;
case DISCONNECT:
processor.processDisConnect(context);
break;
......@@ -161,6 +158,9 @@ public class MqttServerAioHandler implements ServerAioHandler {
.sessionPresent(false)
.build();
Tio.send(context, message);
} else if (cause instanceof DecoderException) {
log.error(cause.getMessage(), cause);
// 消息解码异常,
} else {
log.error(cause.getMessage(), cause);
// 发送断开连接,是否强制关闭客户端连接???
......
......@@ -27,7 +27,8 @@ import org.tio.core.DefaultAioListener;
public class MqttServerAioListener extends DefaultAioListener {
@Override
public boolean onHeartbeatTimeout(ChannelContext channelContext, Long interval, int heartbeatTimeoutCount) {
public boolean onHeartbeatTimeout(ChannelContext context, Long interval, int heartbeatTimeoutCount) {
// TODO L.cm 微调此处,三次超时时断开,避免长时间占用服务器连接
return true;
}
......
package net.dreamlu.iot.mqtt.server;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.dreamlu.iot.mqtt.codec.*;
import net.dreamlu.iot.mqtt.core.server.MqttServerProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.utils.hutool.StrUtil;
......@@ -17,9 +17,8 @@ import java.util.stream.Collectors;
*
* @author L.cm
*/
@Slf4j
@RequiredArgsConstructor
public class MqttBrokerProcessorImpl implements MqttServerProcessor {
private static Logger log = LoggerFactory.getLogger(MqttBrokerProcessorImpl.class);
private static final String MQTT_CLIENT_ID_KEY = "mqttClientId";
@Override
......
......@@ -24,14 +24,6 @@
<module>mica-mqtt-example</module>
</modules>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册