提交 1f28b537 编写于 作者: 浅梦2013's avatar 浅梦2013

避免客户端启动卡住

上级 630aa184
......@@ -27,7 +27,6 @@ import org.tio.core.Tio;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledThreadPoolExecutor;
/**
......@@ -38,14 +37,11 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
public class DefaultMqttClientProcessor implements IMqttClientProcessor {
private static final Logger logger = LoggerFactory.getLogger(DefaultMqttClientProcessor.class);
private final MqttClientStore clientStore;
private final CountDownLatch connLatch;
private final ScheduledThreadPoolExecutor executor;
public DefaultMqttClientProcessor(MqttClientStore clientStore,
CountDownLatch connLatch,
ScheduledThreadPoolExecutor executor) {
this.clientStore = clientStore;
this.connLatch = connLatch;
this.executor = executor;
}
......@@ -60,7 +56,6 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor {
MqttConnectReturnCode returnCode = message.variableHeader().connectReturnCode();
switch (returnCode) {
case CONNECTION_ACCEPTED:
connLatch.countDown();
if (logger.isInfoEnabled()) {
Node node = context.getServerNode();
logger.info("MqttClient contextId:{} connection:{}:{} succeeded!", context.getId(), node.getIp(), node.getPort());
......
......@@ -31,7 +31,6 @@ import org.tio.core.ssl.SslConfig;
import org.tio.utils.hutool.StrUtil;
import org.tio.utils.thread.pool.DefaultThreadFactory;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.function.Consumer;
......@@ -282,10 +281,8 @@ public final class MqttClientCreator {
this.clientId("MICA-MQTT-" + Long.toString(System.nanoTime(), 36));
}
MqttClientStore clientStore = new MqttClientStore();
// 客户端处理器
CountDownLatch connLatch = new CountDownLatch(1);
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, DefaultThreadFactory.getInstance("MqttClient"));
IMqttClientProcessor processor = new DefaultMqttClientProcessor(clientStore, connLatch, executor);
IMqttClientProcessor processor = new DefaultMqttClientProcessor(clientStore, executor);
// 2. 初始化 mqtt 处理器
ClientAioHandler clientAioHandler = new MqttClientAioHandler(this.bufferAllocator, processor);
ClientAioListener clientAioListener = new MqttClientAioListener(this, clientStore, executor);
......@@ -307,8 +304,6 @@ public final class MqttClientCreator {
try {
TioClient tioClient = new TioClient(tioConfig);
ClientChannelContext context = tioClient.connect(new Node(this.ip, this.port), this.timeout);
// 7. 等待连接成功之后继续
connLatch.await();
return new MqttClient(tioClient, this, context, clientStore, executor);
} catch (Exception e) {
throw new IllegalStateException("Mica mqtt client start fail.", e);
......
......@@ -4,15 +4,13 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>mica-mqtt-spring-boot-example</artifactId>
<version>1.0.0</version>
<name>${artifactId}</name>
<url>https://www.dreamlu.net</url>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.3</version>
<relativePath/> <!-- lookup parent from repository -->
<groupId>net.dreamlu</groupId>
<artifactId>mica-mqtt</artifactId>
<version>${revision}</version>
</parent>
<properties>
......
......@@ -22,3 +22,8 @@ management:
web:
exposure:
include: '*'
logging:
level:
root: info
server: info # t-io 服务端默认日志
org.tio: info # t-io 服务端默认日志
......@@ -24,6 +24,7 @@ import net.dreamlu.iot.mqtt.core.client.MqttClient;
import net.dreamlu.iot.mqtt.core.client.MqttClientCreator;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.Ordered;
import org.tio.client.ClientChannelContext;
import java.nio.ByteBuffer;
......@@ -35,7 +36,7 @@ import java.nio.ByteBuffer;
*/
@Slf4j
@RequiredArgsConstructor
public class MqttClientTemplate implements InitializingBean, DisposableBean {
public class MqttClientTemplate implements InitializingBean, DisposableBean, Ordered {
private final MqttClientCreator mqttClientCreator;
private MqttClient client;
......@@ -190,4 +191,9 @@ public class MqttClientTemplate implements InitializingBean, DisposableBean {
client.stop();
}
@Override
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册