From fa47da1c27543cf88c4c73ebe383926f113624b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A6=82=E6=A2=A6=E6=8A=80=E6=9C=AF?= <596392912@qq.com> Date: Fri, 27 Aug 2021 17:16:41 +0800 Subject: [PATCH] =?UTF-8?q?:bug:=20=E4=BF=AE=E5=A4=8D=20websocket=20?= =?UTF-8?q?=E4=B8=8B=E7=BA=BF=E6=97=A0=E6=B3=95=E8=A7=A6=E5=8F=91offline?= =?UTF-8?q?=20gitee=20#I47K13?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/mqtt/core/server/MqttConst.java | 4 + .../core/server/MqttServerAioListener.java | 59 +++++++++-- .../core/server/http/core/MqttWebServer.java | 25 ++--- .../http/core/MqttWebServerAioHandler.java | 20 ++-- .../http/core/MqttWebServerAioListener.java | 100 ------------------ 5 files changed, 76 insertions(+), 132 deletions(-) delete mode 100644 mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServerAioListener.java diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttConst.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttConst.java index 8202788..cbec22a 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttConst.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttConst.java @@ -27,5 +27,9 @@ public interface MqttConst { * 正常断开连接 */ String DIS_CONNECTED = "disconnected"; + /** + * 是 http 协议 + */ + String IS_HTTP = "is_http"; } diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServerAioListener.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServerAioListener.java index 2918107..1d97eb2 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServerAioListener.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServerAioListener.java @@ -26,6 +26,11 @@ import org.slf4j.LoggerFactory; import org.tio.core.ChannelContext; import org.tio.core.DefaultAioListener; import org.tio.core.Tio; +import org.tio.core.TioConfig; +import org.tio.core.intf.Packet; +import org.tio.http.common.HttpConst; +import org.tio.http.common.HttpRequest; +import org.tio.http.common.HttpResponse; import org.tio.utils.hutool.StrUtil; /** @@ -56,26 +61,31 @@ public class MqttServerAioListener extends DefaultAioListener { @Override public void onBeforeClose(ChannelContext context, Throwable throwable, String remark, boolean isRemove) { - // 1. 业务 id + // 1. http 请求跳过 + boolean isHttpRequest = context.get(MqttConst.IS_HTTP) != null; + if (isHttpRequest) { + return; + } + // 2. 业务 id String clientId = context.getBsId(); - // 2. 判断是否正常断开 + // 3. 判断是否正常断开 boolean isNotNormalDisconnect = context.get(MqttConst.DIS_CONNECTED) == null; if (isNotNormalDisconnect || throwable != null) { logger.error("Mqtt server close clientId isBlank, remark:{} isRemove:{}", remark, isRemove, throwable); } else { logger.info("Mqtt server close clientId:{} remark:{} isRemove:{}", clientId, remark, isRemove); } - // 3. 业务 id 不能为空 + // 4. 业务 id 不能为空 if (StrUtil.isBlank(clientId)) { return; } - // 4. 对于异常断开连接,处理遗嘱消息 + // 5. 对于异常断开连接,处理遗嘱消息 sendWillMessage(context, clientId); - // 5. 会话清理 + // 6. 会话清理 cleanSession(clientId); - // 6. 解绑 clientId + // 7. 解绑 clientId Tio.unbindBsId(context); - // 7. 下线事件 + // 8. 下线事件 notify(clientId); } @@ -115,4 +125,39 @@ public class MqttServerAioListener extends DefaultAioListener { logger.error("Mqtt server clientId:{} offline notify error.", clientId, throwable); } } + + @Override + public void onAfterSent(ChannelContext context, Packet packet, boolean isSentSuccess) { + if (!(packet instanceof HttpResponse)) { + return; + } + // 1. 短链接数据解绑 + TioConfig tioConfig = context.getTioConfig(); + tioConfig.groups.unbind(context); + tioConfig.bsIds.unbind(context); + tioConfig.ids.unbind(context); + tioConfig.clientNodes.remove(context); + tioConfig.tokens.unbind(context); + // 2. 关闭 + HttpResponse httpResponse = (HttpResponse) packet; + HttpRequest request = httpResponse.getHttpRequest(); + if (request != null) { + if (request.httpConfig.compatible1_0) { + if (HttpConst.HttpVersion.V1_0.equals(request.requestLine.version)) { + if (!HttpConst.RequestHeaderValue.Connection.keep_alive.equals(request.getConnection())) { + Tio.remove(context, "http 请求头Connection!=keep-alive:" + request.getRequestLine()); + } + } else { + if (HttpConst.RequestHeaderValue.Connection.close.equals(request.getConnection())) { + Tio.remove(context, "http 请求头Connection=close:" + request.getRequestLine()); + } + } + } else { + if (HttpConst.RequestHeaderValue.Connection.close.equals(request.getConnection())) { + Tio.remove(context, "http 请求头Connection=close:" + request.getRequestLine()); + } + } + } + } + } diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServer.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServer.java index 18f1011..c9e78ef 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServer.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServer.java @@ -208,6 +208,7 @@ import org.tio.http.common.HttpUuid; import org.tio.http.common.handler.HttpRequestHandler; import org.tio.server.ServerTioConfig; import org.tio.server.TioServer; +import org.tio.server.intf.ServerAioListener; import org.tio.utils.Threads; import org.tio.utils.hutool.StrUtil; import org.tio.utils.thread.pool.SynThreadPoolExecutor; @@ -226,31 +227,26 @@ import java.util.concurrent.ThreadPoolExecutor; public class MqttWebServer { private static final Logger logger = LoggerFactory.getLogger(MqttWebServer.class); private static final String TIO_SYSTEM_TIMER_PERIOD = "tio.system.timer.period"; - private final MqttWebServerAioListener mqttWebServerAioListener; + private final ServerAioListener serverAioListener; private final HttpRequestHandler httpRequestHandler; private HttpConfig httpConfig = null; private ServerTioConfig serverTioConfig = null; private MqttWebServerAioHandler mqttWebServerAioHandler = null; private TioServer tioServer = null; - public MqttWebServer(MqttServerCreator serverCreator, IWsMsgHandler wsMsgHandler) { - this(serverCreator, new MqttHttpRequestHandler(), wsMsgHandler); + public MqttWebServer(MqttServerCreator serverCreator, ServerAioListener serverAioListener, IWsMsgHandler wsMsgHandler) { + this(serverCreator, serverAioListener, new MqttHttpRequestHandler(), wsMsgHandler, null, null); } - public MqttWebServer(MqttServerCreator serverCreator, HttpRequestHandler requestHandler, IWsMsgHandler wsMsgHandler) { - this(serverCreator, requestHandler, wsMsgHandler, null, null); - } - - public MqttWebServer(MqttServerCreator serverCreator, HttpRequestHandler requestHandler, IWsMsgHandler wsMsgHandler, SynThreadPoolExecutor tioExecutor, ThreadPoolExecutor groupExecutor) { + public MqttWebServer(MqttServerCreator serverCreator, ServerAioListener serverAioListener, HttpRequestHandler requestHandler, IWsMsgHandler wsMsgHandler, SynThreadPoolExecutor tioExecutor, ThreadPoolExecutor groupExecutor) { if (tioExecutor == null) { tioExecutor = Threads.getTioExecutor(); } if (groupExecutor == null) { groupExecutor = Threads.getGroupExecutor(); } - this.httpRequestHandler = requestHandler; - this.mqttWebServerAioListener = new MqttWebServerAioListener(); + this.serverAioListener = serverAioListener; init(serverCreator, wsMsgHandler, tioExecutor, groupExecutor); } @@ -266,10 +262,6 @@ public class MqttWebServer { return mqttWebServerAioHandler; } - public MqttWebServerAioListener getMqttWebServerAioListener() { - return mqttWebServerAioListener; - } - public ServerTioConfig getServerTioConfig() { return serverTioConfig; } @@ -287,7 +279,7 @@ public class MqttWebServer { this.httpConfig = httpConfig; this.httpConfig.setHttpRequestHandler(this.httpRequestHandler); this.mqttWebServerAioHandler = new MqttWebServerAioHandler(httpConfig, this.httpRequestHandler, wsMsgHandler); - this.serverTioConfig = new ServerTioConfig(this.httpConfig.getName(), mqttWebServerAioHandler, mqttWebServerAioListener, tioExecutor, groupExecutor); + this.serverTioConfig = new ServerTioConfig(this.httpConfig.getName(), mqttWebServerAioHandler, this.serverAioListener, tioExecutor, groupExecutor); this.serverTioConfig.setHeartbeatTimeout(0); this.serverTioConfig.setReadBufferSize(1024 * 30); this.serverTioConfig.setTioUuid(new HttpUuid()); @@ -336,8 +328,9 @@ public class MqttWebServer { } // 3. 初始化处理器 AioHandler mqttAioHandler = mqttServerConfig.getAioHandler(); + ServerAioListener mqttAioListener = (ServerAioListener) mqttServerConfig.getAioListener(); IWsMsgHandler mqttWsMsgHandler = new MqttWsMsgHandler(serverCreator, mqttAioHandler); - MqttWebServer httpServerStarter = new MqttWebServer(serverCreator, mqttWsMsgHandler); + MqttWebServer httpServerStarter = new MqttWebServer(serverCreator, mqttAioListener, mqttWsMsgHandler); ServerTioConfig httpIioConfig = httpServerStarter.getServerTioConfig(); // 4. tcp + websocket mqtt 共享公共配置 httpIioConfig.share(mqttServerConfig); diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServerAioHandler.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServerAioHandler.java index 5770480..f8a9015 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServerAioHandler.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServerAioHandler.java @@ -194,6 +194,7 @@ package net.dreamlu.iot.mqtt.core.server.http.core; +import net.dreamlu.iot.mqtt.core.server.MqttConst; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tio.core.ChannelContext; @@ -250,41 +251,42 @@ public class MqttWebServerAioHandler implements ServerAioHandler { @SuppressWarnings("unchecked") @Override - public Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext) throws TioDecodeException { - WsSessionContext wsSessionContext = (WsSessionContext) channelContext.get(); + public Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext context) throws TioDecodeException { + WsSessionContext wsSessionContext = (WsSessionContext) context.get(); // 尚未握手 if (wsSessionContext == null) { - HttpRequest request = HttpRequestDecoder.decode(buffer, limit, position, readableLength, channelContext, httpConfig); + HttpRequest request = HttpRequestDecoder.decode(buffer, limit, position, readableLength, context, httpConfig); if (request == null) { return null; } HttpResponse httpResponse = updateWebSocketProtocol(request); // 普通 http 非 websocket if (httpResponse == null) { + context.set(MqttConst.IS_HTTP, (byte) 1); return request; } wsSessionContext = new WsSessionContext(); - channelContext.set(wsSessionContext); + context.set(wsSessionContext); wsSessionContext.setHandshakeRequest(request); wsSessionContext.setHandshakeResponse(httpResponse); WsRequest wsRequestPacket = new WsRequest(); wsRequestPacket.setHandShake(true); return wsRequestPacket; } - WsRequest websocketPacket = WsServerDecoder.decode(buffer, channelContext); + WsRequest websocketPacket = WsServerDecoder.decode(buffer, context); if (websocketPacket != null) { // 数据包尚未完成 if (!websocketPacket.isWsEof()) { - List parts = (List) channelContext.get(NOT_FINAL_WEBSOCKET_PACKET_PARTS); + List parts = (List) context.get(NOT_FINAL_WEBSOCKET_PACKET_PARTS); if (parts == null) { parts = new ArrayList<>(); - channelContext.set(NOT_FINAL_WEBSOCKET_PACKET_PARTS, parts); + context.set(NOT_FINAL_WEBSOCKET_PACKET_PARTS, parts); } parts.add(websocketPacket); } else { - List parts = (List) channelContext.get(NOT_FINAL_WEBSOCKET_PACKET_PARTS); + List parts = (List) context.get(NOT_FINAL_WEBSOCKET_PACKET_PARTS); if (parts != null) { - channelContext.set(NOT_FINAL_WEBSOCKET_PACKET_PARTS, null); + context.set(NOT_FINAL_WEBSOCKET_PACKET_PARTS, null); parts.add(websocketPacket); WsRequest first = parts.get(0); diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServerAioListener.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServerAioListener.java deleted file mode 100644 index 3409147..0000000 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServerAioListener.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net). - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package net.dreamlu.iot.mqtt.core.server.http.core; - -import org.tio.core.ChannelContext; -import org.tio.core.Tio; -import org.tio.core.TioConfig; -import org.tio.core.intf.Packet; -import org.tio.http.common.HttpConst; -import org.tio.http.common.HttpRequest; -import org.tio.http.common.HttpResponse; -import org.tio.server.intf.ServerAioListener; - -/** - * 兼容 websocket,参考 HTTPServerAioListener - * - * @author tanyaowu - * @author L.cm - */ -public class MqttWebServerAioListener implements ServerAioListener { - - @Override - public void onAfterConnected(ChannelContext context, boolean isConnected, boolean isReconnect) { - - } - - @Override - public void onAfterDecoded(ChannelContext context, Packet packet, int packetSize) { - - } - - @Override - public void onAfterSent(ChannelContext context, Packet packet, boolean isSentSuccess) { - if (!(packet instanceof HttpResponse)) { - return; - } - // 1. 短链接数据解绑 - TioConfig tioConfig = context.getTioConfig(); - tioConfig.groups.unbind(context); - tioConfig.bsIds.unbind(context); - tioConfig.ids.unbind(context); - tioConfig.clientNodes.remove(context); - tioConfig.tokens.unbind(context); - // 2. 关闭 - HttpResponse httpResponse = (HttpResponse) packet; - HttpRequest request = httpResponse.getHttpRequest(); - if (request != null) { - if (request.httpConfig.compatible1_0) { - if (HttpConst.HttpVersion.V1_0.equals(request.requestLine.version)) { - if (!HttpConst.RequestHeaderValue.Connection.keep_alive.equals(request.getConnection())) { - Tio.remove(context, "http 请求头Connection!=keep-alive:" + request.getRequestLine()); - } - } else { - if (HttpConst.RequestHeaderValue.Connection.close.equals(request.getConnection())) { - Tio.remove(context, "http 请求头Connection=close:" + request.getRequestLine()); - } - } - } else { - if (HttpConst.RequestHeaderValue.Connection.close.equals(request.getConnection())) { - Tio.remove(context, "http 请求头Connection=close:" + request.getRequestLine()); - } - } - } - } - - @Override - public void onBeforeClose(ChannelContext context, Throwable throwable, String remark, boolean isRemove) { - - } - - @Override - public void onAfterHandled(ChannelContext context, Packet packet, long cost) throws Exception { - - } - - @Override - public void onAfterReceivedBytes(ChannelContext context, int receivedBytes) throws Exception { - - } - - @Override - public boolean onHeartbeatTimeout(ChannelContext context, Long interval, int heartbeatTimeoutCount) { - return false; - } - -} -- GitLab