提交 851ab360 编写于 作者: 浅梦2013's avatar 浅梦2013

mica-mqtt client、server connectListener 改为异步

上级 9eb1652f
......@@ -100,11 +100,14 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor {
if (connectListener == null) {
return;
}
try {
connectListener.onConnected(context, context.isReconnect);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
// 触发客户端连接事件
executor.submit(() -> {
try {
connectListener.onConnected(context, context.isReconnect);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
});
}
/**
......@@ -149,7 +152,7 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor {
MqttPendingSubscription pendingSubscription = new MqttPendingSubscription(reSubscriptionList, message);
Boolean result = Tio.send(context, message);
logger.info("MQTT subscriptionList:{} messageId:{} resubscribing result:{}", reSubscriptionList, messageId, result);
pendingSubscription.startRetransmitTimer(executor, (msg) -> Tio.send(context, message));
pendingSubscription.startRetransmitTimer(executor, (msg) -> Tio.send(context, msg));
clientSession.addPaddingSubscribe(messageId, pendingSubscription);
}
......@@ -276,7 +279,7 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor {
Tio.send(context, pubRelMessage);
pendingPublish.setPubRelMessage(pubRelMessage);
pendingPublish.startPubRelRetransmissionTimer(executor, msg -> Tio.send(context, msg));
pendingPublish.startPubRelRetransmissionTimer(executor, (msg) -> Tio.send(context, msg));
}
@Override
......
......@@ -29,6 +29,7 @@ import org.tio.utils.hutool.StrUtil;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.ScheduledThreadPoolExecutor;
/**
* mqtt 客户端监听器
......@@ -39,10 +40,12 @@ public class MqttClientAioListener extends DefaultClientAioListener {
private static final Logger logger = LoggerFactory.getLogger(MqttClient.class);
private final MqttConnectMessage connectMessage;
private final IMqttClientConnectListener connectListener;
private final ScheduledThreadPoolExecutor executor;
public MqttClientAioListener(MqttClientCreator clientCreator) {
public MqttClientAioListener(MqttClientCreator clientCreator, ScheduledThreadPoolExecutor executor) {
this.connectMessage = getConnectMessage(Objects.requireNonNull(clientCreator));
this.connectListener = clientCreator.getConnectListener();
this.executor = executor;
}
/**
......@@ -109,11 +112,13 @@ public class MqttClientAioListener extends DefaultClientAioListener {
return;
}
// 2. 触发客户断开连接事件
try {
connectListener.onDisconnect(channelContext, throwable, remark, isRemove);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
executor.submit(() -> {
try {
connectListener.onDisconnect(channelContext, throwable, remark, isRemove);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
});
}
}
......@@ -393,11 +393,11 @@ public final class MqttClientCreator {
if (this.messageIdGenerator == null) {
this.messageIdGenerator = new DefaultMqttClientMessageIdGenerator();
}
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, DefaultThreadFactory.getInstance("MqttClient"));
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2, DefaultThreadFactory.getInstance("MqttClient"));
IMqttClientProcessor processor = new DefaultMqttClientProcessor(this, executor);
// 4. 初始化 mqtt 处理器
ClientAioHandler clientAioHandler = new MqttClientAioHandler(this, processor);
ClientAioListener clientAioListener = new MqttClientAioListener(this);
ClientAioListener clientAioListener = new MqttClientAioListener(this, executor);
// 5. 重连配置
ReconnConf reconnConf = null;
if (this.reconnect) {
......
......@@ -30,6 +30,8 @@ import org.tio.core.Tio;
import org.tio.core.intf.Packet;
import org.tio.utils.hutool.StrUtil;
import java.util.concurrent.ScheduledThreadPoolExecutor;
/**
* mqtt 服务监听
*
......@@ -41,12 +43,14 @@ public class MqttServerAioListener extends DefaultAioListener {
private final IMqttSessionManager sessionManager;
private final IMqttMessageDispatcher messageDispatcher;
private final IMqttConnectStatusListener connectStatusListener;
private final ScheduledThreadPoolExecutor executor;
public MqttServerAioListener(MqttServerCreator serverCreator) {
public MqttServerAioListener(MqttServerCreator serverCreator, ScheduledThreadPoolExecutor executor) {
this.messageStore = serverCreator.getMessageStore();
this.sessionManager = serverCreator.getSessionManager();
this.messageDispatcher = serverCreator.getMessageDispatcher();
this.connectStatusListener = serverCreator.getConnectStatusListener();
this.executor = executor;
}
@Override
......@@ -113,11 +117,13 @@ public class MqttServerAioListener extends DefaultAioListener {
}
private void notify(ChannelContext context, String clientId) {
try {
connectStatusListener.offline(context, clientId);
} catch (Throwable throwable) {
logger.error("Mqtt server clientId:{} offline notify error.", clientId, throwable);
}
executor.execute(() -> {
try {
connectStatusListener.offline(context, clientId);
} catch (Throwable throwable) {
logger.error("Mqtt server clientId:{} offline notify error.", clientId, throwable);
}
});
}
@Override
......
......@@ -465,7 +465,7 @@ public class MqttServerCreator {
// 1. 处理消息
ServerAioHandler handler = new MqttServerAioHandler(this, serverProcessor);
// 2. t-io 监听
ServerAioListener listener = new MqttServerAioListener(this);
ServerAioListener listener = new MqttServerAioListener(this, executor);
// 3. t-io 配置
ServerTioConfig tioConfig = new ServerTioConfig(this.name, handler, listener);
// 4. 设置 t-io 心跳 timeout
......
......@@ -157,16 +157,16 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
willMessage.setNode(nodeName);
messageStore.addWillMessage(uniqueId, willMessage);
}
// 9. 在线状态
try {
connectStatusListener.online(context, uniqueId);
} catch (Throwable e) {
logger.error("mqtt connectStatusListener", e);
connAckByReturnCode(clientId, uniqueId, context, MqttConnectReasonCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
return;
}
// 10. 返回 ack
// 9. 返回 ack
connAckByReturnCode(clientId, uniqueId, context, MqttConnectReasonCode.CONNECTION_ACCEPTED);
// 10. 在线状态
executor.execute(() -> {
try {
connectStatusListener.online(context, uniqueId);
} catch (Throwable e) {
logger.error("Mqtt server uniqueId:{} clientId:{} online notify error.", uniqueId, clientId, e);
}
});
}
private static void connAckByReturnCode(String clientId, String uniqueId, ChannelContext context, MqttConnectReasonCode returnCode) {
......
......@@ -2,7 +2,7 @@ log4j.rootLogger = DEBUG
appender.console.type = Console
appender.console.name = STDOUT
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d %-5p %c{2} - %m%n
appender.console.layout.pattern = %d [%t] %-5p %c{2} - %m%n
# debug
rootLogger.level = INFO
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册