提交 03961933 编写于 作者: 如梦技术's avatar 如梦技术 🐛

优化 mqtt client 连接处理的逻辑。

上级 e4fd54d3
...@@ -37,11 +37,14 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; ...@@ -37,11 +37,14 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
public class DefaultMqttClientProcessor implements IMqttClientProcessor { public class DefaultMqttClientProcessor implements IMqttClientProcessor {
private static final Logger logger = LoggerFactory.getLogger(DefaultMqttClientProcessor.class); private static final Logger logger = LoggerFactory.getLogger(DefaultMqttClientProcessor.class);
private final MqttClientStore clientStore; private final MqttClientStore clientStore;
private final IMqttClientConnectListener connectListener;
private final ScheduledThreadPoolExecutor executor; private final ScheduledThreadPoolExecutor executor;
public DefaultMqttClientProcessor(MqttClientStore clientStore, public DefaultMqttClientProcessor(MqttClientCreator mqttClientCreator,
MqttClientStore clientStore,
ScheduledThreadPoolExecutor executor) { ScheduledThreadPoolExecutor executor) {
this.clientStore = clientStore; this.clientStore = clientStore;
this.connectListener = mqttClientCreator.getConnectListener();
this.executor = executor; this.executor = executor;
} }
...@@ -53,13 +56,21 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor { ...@@ -53,13 +56,21 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor {
@Override @Override
public void processConAck(ChannelContext context, MqttConnAckMessage message) { public void processConAck(ChannelContext context, MqttConnAckMessage message) {
MqttConnectReturnCode returnCode = message.variableHeader().connectReturnCode(); MqttConnAckVariableHeader connAckVariableHeader = message.variableHeader();
MqttConnectReturnCode returnCode = connAckVariableHeader.connectReturnCode();
switch (returnCode) { switch (returnCode) {
case CONNECTION_ACCEPTED: case CONNECTION_ACCEPTED:
// 1. 连接成功的日志
if (logger.isInfoEnabled()) { if (logger.isInfoEnabled()) {
Node node = context.getServerNode(); Node node = context.getServerNode();
logger.info("MqttClient contextId:{} connection:{}:{} succeeded!", context.getId(), node.getIp(), node.getPort()); logger.info("MqttClient contextId:{} connection:{}:{} succeeded!", context.getId(), node.getIp(), node.getPort());
} }
// 2. 发布连接通知
publishConnectEvent(context);
// 3. 如果 session 不存在重连时发送重新订阅
if (!connAckVariableHeader.isSessionPresent()) {
reSendSubscription(context);
}
break; break;
case CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD: case CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD:
case CONNECTION_REFUSED_IDENTIFIER_REJECTED: case CONNECTION_REFUSED_IDENTIFIER_REJECTED:
...@@ -73,6 +84,36 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor { ...@@ -73,6 +84,36 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor {
} }
} }
private void reSendSubscription(ChannelContext context) {
List<MqttClientSubscription> subscriptionList = clientStore.getAndCleanSubscription();
for (MqttClientSubscription subscription : subscriptionList) {
int messageId = MqttClientMessageId.getId();
MqttQoS mqttQoS = subscription.getMqttQoS();
String topicFilter = subscription.getTopicFilter();
MqttSubscribeMessage message = MqttMessageBuilders.subscribe()
.addSubscription(mqttQoS, topicFilter)
.messageId(messageId)
.build();
MqttPendingSubscription pendingSubscription = new MqttPendingSubscription(mqttQoS, topicFilter, subscription.getListener(), message);
Boolean result = Tio.send(context, message);
logger.info("MQTT Topic:{} mqttQoS:{} messageId:{} resubscribing result:{}", topicFilter, mqttQoS, messageId, result);
pendingSubscription.startRetransmitTimer(executor, (msg) -> Tio.send(context, message));
clientStore.addPaddingSubscribe(messageId, pendingSubscription);
}
}
private void publishConnectEvent(ChannelContext context) {
// 先判断是否配置监听
if (connectListener == null) {
return;
}
try {
connectListener.onConnected(context);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
}
@Override @Override
public void processSubAck(MqttSubAckMessage message) { public void processSubAck(MqttSubAckMessage message) {
int messageId = message.variableHeader().messageId(); int messageId = message.variableHeader().messageId();
......
...@@ -30,8 +30,7 @@ public interface IMqttClientConnectListener { ...@@ -30,8 +30,7 @@ public interface IMqttClientConnectListener {
* 监听到消息 * 监听到消息
* *
* @param context ChannelContext * @param context ChannelContext
* @param isReconnect 是否重连
*/ */
void onConnected(ChannelContext context, boolean isReconnect); void onConnected(ChannelContext context);
} }
...@@ -16,7 +16,9 @@ ...@@ -16,7 +16,9 @@
package net.dreamlu.iot.mqtt.core.client; package net.dreamlu.iot.mqtt.core.client;
import net.dreamlu.iot.mqtt.codec.*; import net.dreamlu.iot.mqtt.codec.MqttConnectMessage;
import net.dreamlu.iot.mqtt.codec.MqttMessageBuilders;
import net.dreamlu.iot.mqtt.codec.MqttProperties;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.tio.client.DefaultClientAioListener; import org.tio.client.DefaultClientAioListener;
...@@ -25,9 +27,7 @@ import org.tio.core.Tio; ...@@ -25,9 +27,7 @@ import org.tio.core.Tio;
import org.tio.utils.hutool.StrUtil; import org.tio.utils.hutool.StrUtil;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.ScheduledThreadPoolExecutor;
/** /**
* mqtt 客户端监听器 * mqtt 客户端监听器
...@@ -36,99 +36,56 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; ...@@ -36,99 +36,56 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
*/ */
public class MqttClientAioListener extends DefaultClientAioListener { public class MqttClientAioListener extends DefaultClientAioListener {
private static final Logger logger = LoggerFactory.getLogger(MqttClient.class); private static final Logger logger = LoggerFactory.getLogger(MqttClient.class);
private final MqttClientCreator clientConfig; private final MqttConnectMessage connectMessage;
private final MqttWillMessage willMessage;
private final MqttClientStore clientStore;
private final IMqttClientConnectListener connectListener;
private final ScheduledThreadPoolExecutor executor;
public MqttClientAioListener(MqttClientCreator clientConfig, public MqttClientAioListener(MqttClientCreator mqttClientCreator) {
MqttClientStore clientStore, this.connectMessage = getConnectMessage(Objects.requireNonNull(mqttClientCreator));
ScheduledThreadPoolExecutor executor) {
this.clientConfig = Objects.requireNonNull(clientConfig);
this.willMessage = clientConfig.getWillMessage();
this.clientStore = clientStore;
this.connectListener = clientConfig.getConnectListener();
this.executor = executor;
}
@Override
public void onAfterConnected(ChannelContext context, boolean isConnected, boolean isReconnect) {
if (isConnected) {
// 1. 建立连接后发送 mqtt 连接的消息
MqttMessageBuilders.ConnectBuilder builder = MqttMessageBuilders.connect()
.clientId(clientConfig.getClientId())
.username(clientConfig.getUsername())
.keepAlive(clientConfig.getKeepAliveSecs())
.cleanSession(clientConfig.isCleanSession())
.protocolVersion(clientConfig.getVersion())
.willFlag(willMessage != null);
// 2. 密码
String password = clientConfig.getPassword();
if (StrUtil.isNotBlank(password)) {
builder.password(password.getBytes(StandardCharsets.UTF_8));
}
// 3. 遗嘱消息
if (willMessage != null) {
builder.willTopic(willMessage.getTopic())
.willMessage(willMessage.getMessage())
.willRetain(willMessage.isRetain())
.willQoS(willMessage.getQos())
.willProperties(willMessage.getWillProperties());
}
// 4. mqtt5 properties
MqttProperties properties = clientConfig.getProperties();
if (properties != null) {
builder.properties(properties);
}
// 5. 发送 mqtt 连接消息
sendConnectMessage(context, builder.build());
// 6. 重连时发送重新订阅
reSendSubscription(context);
// 7. 发布连接通知
publishConnectEvent(context, isReconnect);
}
} }
/** /**
* 发送连接的消息 * 构造连接消息
* *
* @param context ChannelContext * @param mqttClientCreator MqttClientCreator
* @param message MqttMessage * @return MqttConnectMessage
*/ */
private static void sendConnectMessage(ChannelContext context, MqttMessage message) { private static MqttConnectMessage getConnectMessage(MqttClientCreator mqttClientCreator) {
// 5. 发送 mqtt 连接消息 MqttWillMessage willMessage = mqttClientCreator.getWillMessage();
Boolean result = Tio.send(context, message); // 1. 建立连接后发送 mqtt 连接的消息
logger.info("MqttClient reconnect send connect result:{}", result); MqttMessageBuilders.ConnectBuilder builder = MqttMessageBuilders.connect()
} .clientId(mqttClientCreator.getClientId())
.username(mqttClientCreator.getUsername())
private void reSendSubscription(ChannelContext context) { .keepAlive(mqttClientCreator.getKeepAliveSecs())
List<MqttClientSubscription> subscriptionList = clientStore.getAndCleanSubscription(); .cleanSession(mqttClientCreator.isCleanSession())
for (MqttClientSubscription subscription : subscriptionList) { .protocolVersion(mqttClientCreator.getVersion())
int messageId = MqttClientMessageId.getId(); .willFlag(willMessage != null);
MqttQoS mqttQoS = subscription.getMqttQoS(); // 2. 密码
String topicFilter = subscription.getTopicFilter(); String password = mqttClientCreator.getPassword();
MqttSubscribeMessage message = MqttMessageBuilders.subscribe() if (StrUtil.isNotBlank(password)) {
.addSubscription(mqttQoS, topicFilter) builder.password(password.getBytes(StandardCharsets.UTF_8));
.messageId(messageId)
.build();
MqttPendingSubscription pendingSubscription = new MqttPendingSubscription(mqttQoS, topicFilter, subscription.getListener(), message);
Boolean result = Tio.send(context, message);
logger.info("MQTT Topic:{} mqttQoS:{} messageId:{} resubscribing result:{}", topicFilter, mqttQoS, messageId, result);
pendingSubscription.startRetransmitTimer(executor, (msg) -> Tio.send(context, message));
clientStore.addPaddingSubscribe(messageId, pendingSubscription);
} }
// 3. 遗嘱消息
if (willMessage != null) {
builder.willTopic(willMessage.getTopic())
.willMessage(willMessage.getMessage())
.willRetain(willMessage.isRetain())
.willQoS(willMessage.getQos())
.willProperties(willMessage.getWillProperties());
}
// 4. mqtt5 properties
MqttProperties properties = mqttClientCreator.getProperties();
if (properties != null) {
builder.properties(properties);
}
return builder.build();
} }
private void publishConnectEvent(ChannelContext context, boolean isReconnect) { @Override
// 先判断是否配置监听 public void onAfterConnected(ChannelContext context, boolean isConnected, boolean isReconnect) {
if (connectListener == null) { if (isConnected) {
return; // 重连时,发送 mqtt 连接消息
} Boolean result = Tio.send(context, this.connectMessage);
try { logger.info("MqttClient reconnect send connect result:{}", result);
connectListener.onConnected(context, isReconnect);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
} }
} }
} }
...@@ -322,10 +322,10 @@ public final class MqttClientCreator { ...@@ -322,10 +322,10 @@ public final class MqttClientCreator {
} }
MqttClientStore clientStore = new MqttClientStore(); MqttClientStore clientStore = new MqttClientStore();
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, DefaultThreadFactory.getInstance("MqttClient")); ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, DefaultThreadFactory.getInstance("MqttClient"));
IMqttClientProcessor processor = new DefaultMqttClientProcessor(clientStore, executor); IMqttClientProcessor processor = new DefaultMqttClientProcessor(this, clientStore, executor);
// 2. 初始化 mqtt 处理器 // 2. 初始化 mqtt 处理器
ClientAioHandler clientAioHandler = new MqttClientAioHandler(this, processor); ClientAioHandler clientAioHandler = new MqttClientAioHandler(this, processor);
ClientAioListener clientAioListener = new MqttClientAioListener(this, clientStore, executor); ClientAioListener clientAioListener = new MqttClientAioListener(this);
// 3. 重连配置 // 3. 重连配置
ReconnConf reconnConf = null; ReconnConf reconnConf = null;
if (this.reconnect) { if (this.reconnect) {
......
...@@ -31,5 +31,9 @@ public interface MqttConst { ...@@ -31,5 +31,9 @@ public interface MqttConst {
* 是 http 协议 * 是 http 协议
*/ */
String IS_HTTP = "is_http"; String IS_HTTP = "is_http";
/**
* session 有效期,小于等于 0,关闭时清理,大于 0 采用缓存处理
*/
String SESSION_EXPIRES = "session_expires";
} }
...@@ -80,7 +80,9 @@ public class MqttServerAioListener extends DefaultAioListener { ...@@ -80,7 +80,9 @@ public class MqttServerAioListener extends DefaultAioListener {
return; return;
} }
// 5. 对于异常断开连接,处理遗嘱消息 // 5. 对于异常断开连接,处理遗嘱消息
sendWillMessage(context, clientId); if (isNotNormalDisconnect) {
sendWillMessage(clientId);
}
// 6. 会话清理 // 6. 会话清理
cleanSession(clientId); cleanSession(clientId);
// 7. 解绑 clientId // 7. 解绑 clientId
...@@ -89,13 +91,8 @@ public class MqttServerAioListener extends DefaultAioListener { ...@@ -89,13 +91,8 @@ public class MqttServerAioListener extends DefaultAioListener {
notify(clientId); notify(clientId);
} }
private void sendWillMessage(ChannelContext context, String clientId) { private void sendWillMessage(String clientId) {
// 1. 判断是否正常断开 // 发送遗嘱消息
Object normalDisconnectMark = context.get(MqttConst.DIS_CONNECTED);
if (normalDisconnectMark != null) {
return;
}
// 2. 发送遗嘱消息
try { try {
Message willMessage = messageStore.getWillMessage(clientId); Message willMessage = messageStore.getWillMessage(clientId);
if (willMessage == null) { if (willMessage == null) {
......
...@@ -162,7 +162,10 @@ public class InMemoryMqttSessionManager implements IMqttSessionManager { ...@@ -162,7 +162,10 @@ public class InMemoryMqttSessionManager implements IMqttSessionManager {
@Override @Override
public boolean hasSession(String clientId) { public boolean hasSession(String clientId) {
return false; return pendingQos2PublishStore.containsKey(clientId)
|| pendingPublishStore.containsKey(clientId)
|| messageIdStore.containsKey(clientId)
|| subscribeStore.values().stream().anyMatch(data -> data.containsKey(clientId));
} }
@Override @Override
......
...@@ -48,8 +48,8 @@ public class MqttClientTest { ...@@ -48,8 +48,8 @@ public class MqttClientTest {
// .maxBytesInMessage(1024 * 10) // .maxBytesInMessage(1024 * 10)
.version(MqttVersion.MQTT_5) .version(MqttVersion.MQTT_5)
// 连接监听 // 连接监听
.connectListener((context, isReconnect) -> { .connectListener((context) -> {
logger.info("接服务器成功..."); logger.info("接服务器成功...");
}) })
.connect(); .connect();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册