提交 fa47da1c 编写于 作者: 如梦技术's avatar 如梦技术 🐛

🐛 修复 websocket 下线无法触发offline gitee #I47K13

上级 b51f497c
......@@ -27,5 +27,9 @@ public interface MqttConst {
* 正常断开连接
*/
String DIS_CONNECTED = "disconnected";
/**
* 是 http 协议
*/
String IS_HTTP = "is_http";
}
......@@ -26,6 +26,11 @@ import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.DefaultAioListener;
import org.tio.core.Tio;
import org.tio.core.TioConfig;
import org.tio.core.intf.Packet;
import org.tio.http.common.HttpConst;
import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse;
import org.tio.utils.hutool.StrUtil;
/**
......@@ -56,26 +61,31 @@ public class MqttServerAioListener extends DefaultAioListener {
@Override
public void onBeforeClose(ChannelContext context, Throwable throwable, String remark, boolean isRemove) {
// 1. 业务 id
// 1. http 请求跳过
boolean isHttpRequest = context.get(MqttConst.IS_HTTP) != null;
if (isHttpRequest) {
return;
}
// 2. 业务 id
String clientId = context.getBsId();
// 2. 判断是否正常断开
// 3. 判断是否正常断开
boolean isNotNormalDisconnect = context.get(MqttConst.DIS_CONNECTED) == null;
if (isNotNormalDisconnect || throwable != null) {
logger.error("Mqtt server close clientId isBlank, remark:{} isRemove:{}", remark, isRemove, throwable);
} else {
logger.info("Mqtt server close clientId:{} remark:{} isRemove:{}", clientId, remark, isRemove);
}
// 3. 业务 id 不能为空
// 4. 业务 id 不能为空
if (StrUtil.isBlank(clientId)) {
return;
}
// 4. 对于异常断开连接,处理遗嘱消息
// 5. 对于异常断开连接,处理遗嘱消息
sendWillMessage(context, clientId);
// 5. 会话清理
// 6. 会话清理
cleanSession(clientId);
// 6. 解绑 clientId
// 7. 解绑 clientId
Tio.unbindBsId(context);
// 7. 下线事件
// 8. 下线事件
notify(clientId);
}
......@@ -115,4 +125,39 @@ public class MqttServerAioListener extends DefaultAioListener {
logger.error("Mqtt server clientId:{} offline notify error.", clientId, throwable);
}
}
@Override
public void onAfterSent(ChannelContext context, Packet packet, boolean isSentSuccess) {
if (!(packet instanceof HttpResponse)) {
return;
}
// 1. 短链接数据解绑
TioConfig tioConfig = context.getTioConfig();
tioConfig.groups.unbind(context);
tioConfig.bsIds.unbind(context);
tioConfig.ids.unbind(context);
tioConfig.clientNodes.remove(context);
tioConfig.tokens.unbind(context);
// 2. 关闭
HttpResponse httpResponse = (HttpResponse) packet;
HttpRequest request = httpResponse.getHttpRequest();
if (request != null) {
if (request.httpConfig.compatible1_0) {
if (HttpConst.HttpVersion.V1_0.equals(request.requestLine.version)) {
if (!HttpConst.RequestHeaderValue.Connection.keep_alive.equals(request.getConnection())) {
Tio.remove(context, "http 请求头Connection!=keep-alive:" + request.getRequestLine());
}
} else {
if (HttpConst.RequestHeaderValue.Connection.close.equals(request.getConnection())) {
Tio.remove(context, "http 请求头Connection=close:" + request.getRequestLine());
}
}
} else {
if (HttpConst.RequestHeaderValue.Connection.close.equals(request.getConnection())) {
Tio.remove(context, "http 请求头Connection=close:" + request.getRequestLine());
}
}
}
}
}
......@@ -208,6 +208,7 @@ import org.tio.http.common.HttpUuid;
import org.tio.http.common.handler.HttpRequestHandler;
import org.tio.server.ServerTioConfig;
import org.tio.server.TioServer;
import org.tio.server.intf.ServerAioListener;
import org.tio.utils.Threads;
import org.tio.utils.hutool.StrUtil;
import org.tio.utils.thread.pool.SynThreadPoolExecutor;
......@@ -226,31 +227,26 @@ import java.util.concurrent.ThreadPoolExecutor;
public class MqttWebServer {
private static final Logger logger = LoggerFactory.getLogger(MqttWebServer.class);
private static final String TIO_SYSTEM_TIMER_PERIOD = "tio.system.timer.period";
private final MqttWebServerAioListener mqttWebServerAioListener;
private final ServerAioListener serverAioListener;
private final HttpRequestHandler httpRequestHandler;
private HttpConfig httpConfig = null;
private ServerTioConfig serverTioConfig = null;
private MqttWebServerAioHandler mqttWebServerAioHandler = null;
private TioServer tioServer = null;
public MqttWebServer(MqttServerCreator serverCreator, IWsMsgHandler wsMsgHandler) {
this(serverCreator, new MqttHttpRequestHandler(), wsMsgHandler);
public MqttWebServer(MqttServerCreator serverCreator, ServerAioListener serverAioListener, IWsMsgHandler wsMsgHandler) {
this(serverCreator, serverAioListener, new MqttHttpRequestHandler(), wsMsgHandler, null, null);
}
public MqttWebServer(MqttServerCreator serverCreator, HttpRequestHandler requestHandler, IWsMsgHandler wsMsgHandler) {
this(serverCreator, requestHandler, wsMsgHandler, null, null);
}
public MqttWebServer(MqttServerCreator serverCreator, HttpRequestHandler requestHandler, IWsMsgHandler wsMsgHandler, SynThreadPoolExecutor tioExecutor, ThreadPoolExecutor groupExecutor) {
public MqttWebServer(MqttServerCreator serverCreator, ServerAioListener serverAioListener, HttpRequestHandler requestHandler, IWsMsgHandler wsMsgHandler, SynThreadPoolExecutor tioExecutor, ThreadPoolExecutor groupExecutor) {
if (tioExecutor == null) {
tioExecutor = Threads.getTioExecutor();
}
if (groupExecutor == null) {
groupExecutor = Threads.getGroupExecutor();
}
this.httpRequestHandler = requestHandler;
this.mqttWebServerAioListener = new MqttWebServerAioListener();
this.serverAioListener = serverAioListener;
init(serverCreator, wsMsgHandler, tioExecutor, groupExecutor);
}
......@@ -266,10 +262,6 @@ public class MqttWebServer {
return mqttWebServerAioHandler;
}
public MqttWebServerAioListener getMqttWebServerAioListener() {
return mqttWebServerAioListener;
}
public ServerTioConfig getServerTioConfig() {
return serverTioConfig;
}
......@@ -287,7 +279,7 @@ public class MqttWebServer {
this.httpConfig = httpConfig;
this.httpConfig.setHttpRequestHandler(this.httpRequestHandler);
this.mqttWebServerAioHandler = new MqttWebServerAioHandler(httpConfig, this.httpRequestHandler, wsMsgHandler);
this.serverTioConfig = new ServerTioConfig(this.httpConfig.getName(), mqttWebServerAioHandler, mqttWebServerAioListener, tioExecutor, groupExecutor);
this.serverTioConfig = new ServerTioConfig(this.httpConfig.getName(), mqttWebServerAioHandler, this.serverAioListener, tioExecutor, groupExecutor);
this.serverTioConfig.setHeartbeatTimeout(0);
this.serverTioConfig.setReadBufferSize(1024 * 30);
this.serverTioConfig.setTioUuid(new HttpUuid());
......@@ -336,8 +328,9 @@ public class MqttWebServer {
}
// 3. 初始化处理器
AioHandler mqttAioHandler = mqttServerConfig.getAioHandler();
ServerAioListener mqttAioListener = (ServerAioListener) mqttServerConfig.getAioListener();
IWsMsgHandler mqttWsMsgHandler = new MqttWsMsgHandler(serverCreator, mqttAioHandler);
MqttWebServer httpServerStarter = new MqttWebServer(serverCreator, mqttWsMsgHandler);
MqttWebServer httpServerStarter = new MqttWebServer(serverCreator, mqttAioListener, mqttWsMsgHandler);
ServerTioConfig httpIioConfig = httpServerStarter.getServerTioConfig();
// 4. tcp + websocket mqtt 共享公共配置
httpIioConfig.share(mqttServerConfig);
......
......@@ -194,6 +194,7 @@
package net.dreamlu.iot.mqtt.core.server.http.core;
import net.dreamlu.iot.mqtt.core.server.MqttConst;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
......@@ -250,41 +251,42 @@ public class MqttWebServerAioHandler implements ServerAioHandler {
@SuppressWarnings("unchecked")
@Override
public Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext) throws TioDecodeException {
WsSessionContext wsSessionContext = (WsSessionContext) channelContext.get();
public Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext context) throws TioDecodeException {
WsSessionContext wsSessionContext = (WsSessionContext) context.get();
// 尚未握手
if (wsSessionContext == null) {
HttpRequest request = HttpRequestDecoder.decode(buffer, limit, position, readableLength, channelContext, httpConfig);
HttpRequest request = HttpRequestDecoder.decode(buffer, limit, position, readableLength, context, httpConfig);
if (request == null) {
return null;
}
HttpResponse httpResponse = updateWebSocketProtocol(request);
// 普通 http 非 websocket
if (httpResponse == null) {
context.set(MqttConst.IS_HTTP, (byte) 1);
return request;
}
wsSessionContext = new WsSessionContext();
channelContext.set(wsSessionContext);
context.set(wsSessionContext);
wsSessionContext.setHandshakeRequest(request);
wsSessionContext.setHandshakeResponse(httpResponse);
WsRequest wsRequestPacket = new WsRequest();
wsRequestPacket.setHandShake(true);
return wsRequestPacket;
}
WsRequest websocketPacket = WsServerDecoder.decode(buffer, channelContext);
WsRequest websocketPacket = WsServerDecoder.decode(buffer, context);
if (websocketPacket != null) {
// 数据包尚未完成
if (!websocketPacket.isWsEof()) {
List<WsRequest> parts = (List<WsRequest>) channelContext.get(NOT_FINAL_WEBSOCKET_PACKET_PARTS);
List<WsRequest> parts = (List<WsRequest>) context.get(NOT_FINAL_WEBSOCKET_PACKET_PARTS);
if (parts == null) {
parts = new ArrayList<>();
channelContext.set(NOT_FINAL_WEBSOCKET_PACKET_PARTS, parts);
context.set(NOT_FINAL_WEBSOCKET_PACKET_PARTS, parts);
}
parts.add(websocketPacket);
} else {
List<WsRequest> parts = (List<WsRequest>) channelContext.get(NOT_FINAL_WEBSOCKET_PACKET_PARTS);
List<WsRequest> parts = (List<WsRequest>) context.get(NOT_FINAL_WEBSOCKET_PACKET_PARTS);
if (parts != null) {
channelContext.set(NOT_FINAL_WEBSOCKET_PACKET_PARTS, null);
context.set(NOT_FINAL_WEBSOCKET_PACKET_PARTS, null);
parts.add(websocketPacket);
WsRequest first = parts.get(0);
......
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.server.http.core;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.core.TioConfig;
import org.tio.core.intf.Packet;
import org.tio.http.common.HttpConst;
import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse;
import org.tio.server.intf.ServerAioListener;
/**
* 兼容 websocket,参考 HTTPServerAioListener
*
* @author tanyaowu
* @author L.cm
*/
public class MqttWebServerAioListener implements ServerAioListener {
@Override
public void onAfterConnected(ChannelContext context, boolean isConnected, boolean isReconnect) {
}
@Override
public void onAfterDecoded(ChannelContext context, Packet packet, int packetSize) {
}
@Override
public void onAfterSent(ChannelContext context, Packet packet, boolean isSentSuccess) {
if (!(packet instanceof HttpResponse)) {
return;
}
// 1. 短链接数据解绑
TioConfig tioConfig = context.getTioConfig();
tioConfig.groups.unbind(context);
tioConfig.bsIds.unbind(context);
tioConfig.ids.unbind(context);
tioConfig.clientNodes.remove(context);
tioConfig.tokens.unbind(context);
// 2. 关闭
HttpResponse httpResponse = (HttpResponse) packet;
HttpRequest request = httpResponse.getHttpRequest();
if (request != null) {
if (request.httpConfig.compatible1_0) {
if (HttpConst.HttpVersion.V1_0.equals(request.requestLine.version)) {
if (!HttpConst.RequestHeaderValue.Connection.keep_alive.equals(request.getConnection())) {
Tio.remove(context, "http 请求头Connection!=keep-alive:" + request.getRequestLine());
}
} else {
if (HttpConst.RequestHeaderValue.Connection.close.equals(request.getConnection())) {
Tio.remove(context, "http 请求头Connection=close:" + request.getRequestLine());
}
}
} else {
if (HttpConst.RequestHeaderValue.Connection.close.equals(request.getConnection())) {
Tio.remove(context, "http 请求头Connection=close:" + request.getRequestLine());
}
}
}
}
@Override
public void onBeforeClose(ChannelContext context, Throwable throwable, String remark, boolean isRemove) {
}
@Override
public void onAfterHandled(ChannelContext context, Packet packet, long cost) throws Exception {
}
@Override
public void onAfterReceivedBytes(ChannelContext context, int receivedBytes) throws Exception {
}
@Override
public boolean onHeartbeatTimeout(ChannelContext context, Long interval, int heartbeatTimeoutCount) {
return false;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册