From de5b48e12e096405e722391aae08e799732f5923 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A6=82=E6=A2=A6=E6=8A=80=E6=9C=AF?= <596392912@qq.com> Date: Fri, 3 Sep 2021 12:37:32 +0800 Subject: [PATCH] =?UTF-8?q?:bug:=20=E4=BF=AE=E5=A4=8D=20mqtt=20=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1=E7=AB=AF=E5=BF=83=E8=B7=B3=E6=97=B6=E9=97=B4=E9=97=AE?= =?UTF-8?q?=E9=A2=98=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../server/support/DefaultMqttServerProcessor.java | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/support/DefaultMqttServerProcessor.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/support/DefaultMqttServerProcessor.java index 4a61b41..9d45156 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/support/DefaultMqttServerProcessor.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/support/DefaultMqttServerProcessor.java @@ -39,7 +39,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; /** * mqtt broker 处理器 @@ -48,6 +47,11 @@ import java.util.concurrent.TimeUnit; */ public class DefaultMqttServerProcessor implements MqttServerProcessor { private static final Logger logger = LoggerFactory.getLogger(DefaultMqttServerProcessor.class); + /** + * 2 倍客户端 keepAlive 时间 + */ + private static final long KEEP_ALIVE_UNIT = 2000L; + private final long heartbeatTimeout; private final IMqttMessageStore messageStore; private final IMqttSessionManager sessionManager; private final IMqttServerAuthHandler authHandler; @@ -57,6 +61,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor { private final ScheduledThreadPoolExecutor executor; public DefaultMqttServerProcessor(MqttServerCreator serverCreator, ScheduledThreadPoolExecutor executor) { + this.heartbeatTimeout = serverCreator.getHeartbeatTimeout() == null ? 120_000L : serverCreator.getHeartbeatTimeout(); this.messageStore = serverCreator.getMessageStore(); this.sessionManager = serverCreator.getSessionManager(); this.authHandler = serverCreator.getAuthHandler(); @@ -93,9 +98,9 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor { MqttConnectVariableHeader variableHeader = mqttMessage.variableHeader(); // 5. 心跳超时时间,当然这个值如果小于全局配置(默认:120s),定时检查的时间间隔还是以全局为准,只是在判断时用此值 int keepAliveSeconds = variableHeader.keepAliveTimeSeconds(); - if (keepAliveSeconds > 0) { - int heartbeatTimeout = (int) (keepAliveSeconds * 1.5f); - context.setHeartbeatTimeout(TimeUnit.SECONDS.toMillis(heartbeatTimeout)); + // 2倍客户端 keepAlive 时间作为服务端心跳超时时间,如果配置同全局默认不设置,节约内存 + if (keepAliveSeconds > 0 && heartbeatTimeout != keepAliveSeconds * KEEP_ALIVE_UNIT) { + context.setHeartbeatTimeout(keepAliveSeconds * KEEP_ALIVE_UNIT); } // 6. session 处理,先默认全部连接关闭时清除 // boolean cleanSession = variableHeader.isCleanSession(); -- GitLab