提交 345bb393 编写于 作者: 浅梦2013's avatar 浅梦2013

mica-mqtt client onSubscribed 改为异步

上级 851ab360
...@@ -189,13 +189,17 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor { ...@@ -189,13 +189,17 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor {
paddingSubscribe.onSubAckReceived(); paddingSubscribe.onSubAckReceived();
clientSession.removePaddingSubscribe(messageId); clientSession.removePaddingSubscribe(messageId);
clientSession.addSubscriptionList(subscribedList); clientSession.addSubscriptionList(subscribedList);
try { // 触发已经监听的事件
subscribedList.forEach(clientSubscription -> { subscribedList.forEach(clientSubscription -> {
clientSubscription.getListener().onSubscribed(clientSubscription.getTopicFilter(), clientSubscription.getMqttQoS()); IMqttClientMessageListener subscriptionListener = clientSubscription.getListener();
}); try {
} catch (Throwable e) { executor.execute(() ->
logger.error("MQTT SubscriptionList:{} subscribed onSubscribed event error.", subscribedList); subscriptionListener.onSubscribed(clientSubscription.getTopicFilter(), clientSubscription.getMqttQoS())
} );
} catch (Throwable e) {
logger.error("MQTT SubscriptionList:{} subscribed onSubscribed event error.", subscribedList);
}
});
} }
@Override @Override
......
...@@ -28,6 +28,7 @@ import org.tio.client.intf.ClientAioHandler; ...@@ -28,6 +28,7 @@ import org.tio.client.intf.ClientAioHandler;
import org.tio.client.intf.ClientAioListener; import org.tio.client.intf.ClientAioListener;
import org.tio.core.Node; import org.tio.core.Node;
import org.tio.core.ssl.SslConfig; import org.tio.core.ssl.SslConfig;
import org.tio.utils.Threads;
import org.tio.utils.hutool.StrUtil; import org.tio.utils.hutool.StrUtil;
import org.tio.utils.thread.pool.DefaultThreadFactory; import org.tio.utils.thread.pool.DefaultThreadFactory;
...@@ -393,7 +394,7 @@ public final class MqttClientCreator { ...@@ -393,7 +394,7 @@ public final class MqttClientCreator {
if (this.messageIdGenerator == null) { if (this.messageIdGenerator == null) {
this.messageIdGenerator = new DefaultMqttClientMessageIdGenerator(); this.messageIdGenerator = new DefaultMqttClientMessageIdGenerator();
} }
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2, DefaultThreadFactory.getInstance("MqttClient")); ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(Threads.CORE_POOL_SIZE, DefaultThreadFactory.getInstance("MqttClient"));
IMqttClientProcessor processor = new DefaultMqttClientProcessor(this, executor); IMqttClientProcessor processor = new DefaultMqttClientProcessor(this, executor);
// 4. 初始化 mqtt 处理器 // 4. 初始化 mqtt 处理器
ClientAioHandler clientAioHandler = new MqttClientAioHandler(this, processor); ClientAioHandler clientAioHandler = new MqttClientAioHandler(this, processor);
......
...@@ -40,6 +40,7 @@ import org.tio.server.ServerTioConfig; ...@@ -40,6 +40,7 @@ import org.tio.server.ServerTioConfig;
import org.tio.server.TioServer; import org.tio.server.TioServer;
import org.tio.server.intf.ServerAioHandler; import org.tio.server.intf.ServerAioHandler;
import org.tio.server.intf.ServerAioListener; import org.tio.server.intf.ServerAioListener;
import org.tio.utils.Threads;
import org.tio.utils.hutool.StrUtil; import org.tio.utils.hutool.StrUtil;
import org.tio.utils.thread.pool.DefaultThreadFactory; import org.tio.utils.thread.pool.DefaultThreadFactory;
...@@ -460,7 +461,7 @@ public class MqttServerCreator { ...@@ -460,7 +461,7 @@ public class MqttServerCreator {
if (this.connectStatusListener == null) { if (this.connectStatusListener == null) {
this.connectStatusListener = new DefaultMqttConnectStatusListener(); this.connectStatusListener = new DefaultMqttConnectStatusListener();
} }
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2, DefaultThreadFactory.getInstance("MqttServer")); ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(Threads.CORE_POOL_SIZE, DefaultThreadFactory.getInstance("MqttServer"));
DefaultMqttServerProcessor serverProcessor = new DefaultMqttServerProcessor(this, executor); DefaultMqttServerProcessor serverProcessor = new DefaultMqttServerProcessor(this, executor);
// 1. 处理消息 // 1. 处理消息
ServerAioHandler handler = new MqttServerAioHandler(this, serverProcessor); ServerAioHandler handler = new MqttServerAioHandler(this, serverProcessor);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册