提交 e1f6c182 编写于 作者: 浅梦2013's avatar 浅梦2013

mqtt 客户端日志优化。

上级 beca6336
...@@ -22,7 +22,9 @@ import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish; ...@@ -22,7 +22,9 @@ import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext; import org.tio.core.ChannelContext;
import org.tio.core.Node;
import org.tio.core.Tio; import org.tio.core.Tio;
import org.tio.core.TioConfig;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
...@@ -60,7 +62,10 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor { ...@@ -60,7 +62,10 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor {
switch (returnCode) { switch (returnCode) {
case CONNECTION_ACCEPTED: case CONNECTION_ACCEPTED:
connLatch.countDown(); connLatch.countDown();
logger.info("MqttClient connection succeeded!"); if (logger.isInfoEnabled()) {
Node node = context.getServerNode();
logger.info("MqttClient contextId:{} connection:{}:{} succeeded!", context.getId(), node.getIp(), node.getPort());
}
break; break;
case CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD: case CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD:
case CONNECTION_REFUSED_IDENTIFIER_REJECTED: case CONNECTION_REFUSED_IDENTIFIER_REJECTED:
...@@ -82,6 +87,9 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor { ...@@ -82,6 +87,9 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor {
if (paddingSubscribe == null) { if (paddingSubscribe == null) {
return; return;
} }
if (logger.isInfoEnabled()) {
logger.info("MQTT Topic:{} successfully subscribed messageId:{}", paddingSubscribe.getTopicFilter(), messageId);
}
paddingSubscribe.onSubAckReceived(); paddingSubscribe.onSubAckReceived();
clientStore.removePaddingSubscribe(messageId); clientStore.removePaddingSubscribe(messageId);
clientStore.addSubscription(paddingSubscribe.toSubscription()); clientStore.addSubscription(paddingSubscribe.toSubscription());
...@@ -130,6 +138,9 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor { ...@@ -130,6 +138,9 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor {
if (pendingUnSubscription == null) { if (pendingUnSubscription == null) {
return; return;
} }
if (logger.isInfoEnabled()) {
logger.info("MQTT Topic:{} successfully unSubscribed messageId:{}", pendingUnSubscription.getTopic(), messageId);
}
pendingUnSubscription.onUnSubAckReceived(); pendingUnSubscription.onUnSubAckReceived();
clientStore.removePaddingUnSubscribe(messageId); clientStore.removePaddingUnSubscribe(messageId);
clientStore.removeSubscriptions(pendingUnSubscription.getTopic()); clientStore.removeSubscriptions(pendingUnSubscription.getTopic());
...@@ -143,6 +154,10 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor { ...@@ -143,6 +154,10 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor {
if (pendingPublish == null) { if (pendingPublish == null) {
return; return;
} }
if (logger.isInfoEnabled()) {
String topicName = pendingPublish.getMessage().variableHeader().topicName();
logger.info("MQTT Topic:{} successfully PubAck messageId:{}", topicName, messageId);
}
pendingPublish.onPubAckReceived(); pendingPublish.onPubAckReceived();
clientStore.removePendingPublish(messageId); clientStore.removePendingPublish(messageId);
pendingPublish.getPayload().clear(); pendingPublish.getPayload().clear();
...@@ -188,6 +203,10 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor { ...@@ -188,6 +203,10 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor {
if (pendingPublish == null) { if (pendingPublish == null) {
return; return;
} }
if (logger.isInfoEnabled()) {
String topicName = pendingPublish.getMessage().variableHeader().topicName();
logger.info("MQTT Topic:{} successfully PubComp", topicName);
}
pendingPublish.getPayload().clear(); pendingPublish.getPayload().clear();
pendingPublish.onPubCompReceived(); pendingPublish.onPubCompReceived();
clientStore.removePendingPublish(messageId); clientStore.removePendingPublish(messageId);
......
...@@ -105,7 +105,7 @@ public final class MqttClient { ...@@ -105,7 +105,7 @@ public final class MqttClient {
.build(); .build();
MqttPendingSubscription pendingSubscription = new MqttPendingSubscription(mqttQoS, topicFilter, listener, message); MqttPendingSubscription pendingSubscription = new MqttPendingSubscription(mqttQoS, topicFilter, listener, message);
Boolean result = Tio.send(context, message); Boolean result = Tio.send(context, message);
logger.debug("MQTT subscribe topicFilter:{} mqttQoS:{} messageId:{} result:{}", topicFilter, mqttQoS, messageId, result); logger.info("MQTT Topic:{} mqttQoS:{} messageId:{} subscribing result:{}", topicFilter, mqttQoS, messageId, result);
pendingSubscription.startRetransmitTimer(executor, (msg) -> Tio.send(context, message)); pendingSubscription.startRetransmitTimer(executor, (msg) -> Tio.send(context, message));
clientStore.addPaddingSubscribe(messageId, pendingSubscription); clientStore.addPaddingSubscribe(messageId, pendingSubscription);
return this; return this;
...@@ -125,7 +125,7 @@ public final class MqttClient { ...@@ -125,7 +125,7 @@ public final class MqttClient {
.build(); .build();
MqttPendingUnSubscription pendingUnSubscription = new MqttPendingUnSubscription(topicFilter, message); MqttPendingUnSubscription pendingUnSubscription = new MqttPendingUnSubscription(topicFilter, message);
Boolean result = Tio.send(context, message); Boolean result = Tio.send(context, message);
logger.debug("MQTT unSubscribe topicFilter:{} messageId:{} result:{}", topicFilter, messageId, result); logger.info("MQTT Topic:{} messageId:{} unSubscribing result:{}", topicFilter, messageId, result);
// 解绑 subManage listener // 解绑 subManage listener
clientStore.addPaddingUnSubscribe(messageId, pendingUnSubscription); clientStore.addPaddingUnSubscribe(messageId, pendingUnSubscription);
pendingUnSubscription.startRetransmissionTimer(executor, msg -> Tio.send(context, msg)); pendingUnSubscription.startRetransmissionTimer(executor, msg -> Tio.send(context, msg));
...@@ -187,7 +187,7 @@ public final class MqttClient { ...@@ -187,7 +187,7 @@ public final class MqttClient {
.messageId(messageId) .messageId(messageId)
.build(); .build();
boolean result = Tio.send(context, message); boolean result = Tio.send(context, message);
logger.debug("MQTT publish topic:{} qos:{} retain:{} result:{}", topic, qos, retain, result); logger.info("MQTT Topic:{} qos:{} retain:{} publish result:{}", topic, qos, retain, result);
if (isHighLevelQoS) { if (isHighLevelQoS) {
MqttPendingPublish pendingPublish = new MqttPendingPublish(payload, message, qos); MqttPendingPublish pendingPublish = new MqttPendingPublish(payload, message, qos);
clientStore.addPendingPublish(messageId, pendingPublish); clientStore.addPendingPublish(messageId, pendingPublish);
......
...@@ -102,7 +102,7 @@ public class MqttClientAioListener extends DefaultClientAioListener { ...@@ -102,7 +102,7 @@ public class MqttClientAioListener extends DefaultClientAioListener {
.build(); .build();
MqttPendingSubscription pendingSubscription = new MqttPendingSubscription(mqttQoS, topicFilter, subscription.getListener(), message); MqttPendingSubscription pendingSubscription = new MqttPendingSubscription(mqttQoS, topicFilter, subscription.getListener(), message);
Boolean result = Tio.send(context, message); Boolean result = Tio.send(context, message);
logger.info("MQTT reconnect subscribe topicFilter:{} mqttQoS:{} messageId:{} result:{}", topicFilter, mqttQoS, messageId, result); logger.info("MQTT Topic:{} mqttQoS:{} messageId:{} resubscribing result:{}", topicFilter, mqttQoS, messageId, result);
pendingSubscription.startRetransmitTimer(executor, (msg) -> Tio.send(context, message)); pendingSubscription.startRetransmitTimer(executor, (msg) -> Tio.send(context, message));
clientStore.addPaddingSubscribe(messageId, pendingSubscription); clientStore.addPaddingSubscribe(messageId, pendingSubscription);
} }
......
...@@ -223,11 +223,13 @@ public class MqttWebServerAioHandler implements ServerAioHandler { ...@@ -223,11 +223,13 @@ public class MqttWebServerAioHandler implements ServerAioHandler {
*/ */
private static final String NOT_FINAL_WEBSOCKET_PACKET_PARTS = "TIO_N_F_W_P_P"; private static final String NOT_FINAL_WEBSOCKET_PACKET_PARTS = "TIO_N_F_W_P_P";
/** /**
* SEC_WEBSOCKET_KEY后缀 * SEC_WEBSOCKET_KEY 后缀
*/ */
private static final String SEC_WEBSOCKET_KEY_SUFFIX = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; private static final String SEC_WEBSOCKET_KEY_SUFFIX = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
private static final byte[] SEC_WEBSOCKET_KEY_SUFFIX_BYTES = SEC_WEBSOCKET_KEY_SUFFIX.getBytes(); private static final byte[] SEC_WEBSOCKET_KEY_SUFFIX_BYTES = SEC_WEBSOCKET_KEY_SUFFIX.getBytes();
/**
* websocket子协议
*/
private static final String Sec_Websocket_Protocol = "sec-websocket-protocol"; private static final String Sec_Websocket_Protocol = "sec-websocket-protocol";
private static final HeaderName Header_Name_Sec_Websocket_Protocol = HeaderName.from(Sec_Websocket_Protocol); private static final HeaderName Header_Name_Sec_Websocket_Protocol = HeaderName.from(Sec_Websocket_Protocol);
...@@ -254,7 +256,7 @@ public class MqttWebServerAioHandler implements ServerAioHandler { ...@@ -254,7 +256,7 @@ public class MqttWebServerAioHandler implements ServerAioHandler {
return null; return null;
} }
HttpResponse httpResponse = updateWebSocketProtocol(request); HttpResponse httpResponse = updateWebSocketProtocol(request);
// 非 websocket // 普通 http 非 websocket
if (httpResponse == null) { if (httpResponse == null) {
return request; return request;
} }
...@@ -270,16 +272,16 @@ public class MqttWebServerAioHandler implements ServerAioHandler { ...@@ -270,16 +272,16 @@ public class MqttWebServerAioHandler implements ServerAioHandler {
if (websocketPacket != null) { if (websocketPacket != null) {
// 数据包尚未完成 // 数据包尚未完成
if (!websocketPacket.isWsEof()) { if (!websocketPacket.isWsEof()) {
List<WsRequest> parts = (List<WsRequest>) channelContext.getAttribute(NOT_FINAL_WEBSOCKET_PACKET_PARTS); List<WsRequest> parts = (List<WsRequest>) channelContext.get(NOT_FINAL_WEBSOCKET_PACKET_PARTS);
if (parts == null) { if (parts == null) {
parts = new ArrayList<>(); parts = new ArrayList<>();
channelContext.setAttribute(NOT_FINAL_WEBSOCKET_PACKET_PARTS, parts); channelContext.set(NOT_FINAL_WEBSOCKET_PACKET_PARTS, parts);
} }
parts.add(websocketPacket); parts.add(websocketPacket);
} else { } else {
List<WsRequest> parts = (List<WsRequest>) channelContext.getAttribute(NOT_FINAL_WEBSOCKET_PACKET_PARTS); List<WsRequest> parts = (List<WsRequest>) channelContext.get(NOT_FINAL_WEBSOCKET_PACKET_PARTS);
if (parts != null) { if (parts != null) {
channelContext.setAttribute(NOT_FINAL_WEBSOCKET_PACKET_PARTS, null); channelContext.set(NOT_FINAL_WEBSOCKET_PACKET_PARTS, null);
parts.add(websocketPacket); parts.add(websocketPacket);
WsRequest first = parts.get(0); WsRequest first = parts.get(0);
...@@ -335,7 +337,6 @@ public class MqttWebServerAioHandler implements ServerAioHandler { ...@@ -335,7 +337,6 @@ public class MqttWebServerAioHandler implements ServerAioHandler {
wsResponse = wsMsgHandler.encodeSubProtocol(packet, tioConfig, channelContext); wsResponse = wsMsgHandler.encodeSubProtocol(packet, tioConfig, channelContext);
Objects.requireNonNull(wsResponse, "IWsMsgHandler encodeSubProtocol WsResponse is null."); Objects.requireNonNull(wsResponse, "IWsMsgHandler encodeSubProtocol WsResponse is null.");
} }
// 握手包 // 握手包
if (wsResponse.isHandShake()) { if (wsResponse.isHandShake()) {
WsSessionContext imSessionContext = (WsSessionContext) channelContext.get(); WsSessionContext imSessionContext = (WsSessionContext) channelContext.get();
...@@ -350,7 +351,7 @@ public class MqttWebServerAioHandler implements ServerAioHandler { ...@@ -350,7 +351,7 @@ public class MqttWebServerAioHandler implements ServerAioHandler {
return WsServerEncoder.encode(wsResponse, tioConfig, channelContext); return WsServerEncoder.encode(wsResponse, tioConfig, channelContext);
} }
private WsResponse h(WsRequest websocketPacket, byte[] bytes, Opcode opcode, ChannelContext channelContext) throws Exception { private WsResponse handlerWs(WsRequest websocketPacket, byte[] bytes, Opcode opcode, ChannelContext channelContext) throws Exception {
if (opcode == Opcode.TEXT) { if (opcode == Opcode.TEXT) {
if (bytes == null || bytes.length == 0) { if (bytes == null || bytes.length == 0) {
Tio.remove(channelContext, "错误的websocket包,body为空"); Tio.remove(channelContext, "错误的websocket包,body为空");
...@@ -423,17 +424,13 @@ public class MqttWebServerAioHandler implements ServerAioHandler { ...@@ -423,17 +424,13 @@ public class MqttWebServerAioHandler implements ServerAioHandler {
wsResponse.setHandShake(true); wsResponse.setHandShake(true);
Tio.send(channelContext, wsResponse); Tio.send(channelContext, wsResponse);
wsSessionContext.setHandshaked(true); wsSessionContext.setHandshaked(true);
wsMsgHandler.onAfterHandshaked(request, httpResponse, channelContext); wsMsgHandler.onAfterHandshaked(request, httpResponse, channelContext);
return; return;
} }
if (!wsRequest.isWsEof()) { if (!wsRequest.isWsEof()) {
return; return;
} }
WsResponse wsResponse = handlerWs(wsRequest, wsRequest.getBody(), wsRequest.getWsOpcode(), channelContext);
WsResponse wsResponse = h(wsRequest, wsRequest.getBody(), wsRequest.getWsOpcode(), channelContext);
if (wsResponse != null) { if (wsResponse != null) {
Tio.send(channelContext, wsResponse); Tio.send(channelContext, wsResponse);
} }
...@@ -468,21 +465,21 @@ public class MqttWebServerAioHandler implements ServerAioHandler { ...@@ -468,21 +465,21 @@ public class MqttWebServerAioHandler implements ServerAioHandler {
*/ */
public HttpResponse updateWebSocketProtocol(HttpRequest request) { public HttpResponse updateWebSocketProtocol(HttpRequest request) {
Map<String, String> headers = request.getHeaders(); Map<String, String> headers = request.getHeaders();
String Sec_WebSocket_Key = headers.get(HttpConst.RequestHeaderKey.Sec_WebSocket_Key); String secWebSocketKey = headers.get(HttpConst.RequestHeaderKey.Sec_WebSocket_Key);
if (StrUtil.isNotBlank(Sec_WebSocket_Key)) { if (StrUtil.isNotBlank(secWebSocketKey)) {
byte[] Sec_WebSocket_Key_Bytes; byte[] secWebSocketKeyBytes;
try { try {
Sec_WebSocket_Key_Bytes = Sec_WebSocket_Key.getBytes(request.getCharset()); secWebSocketKeyBytes = secWebSocketKey.getBytes(request.getCharset());
} catch (UnsupportedEncodingException e) { } catch (UnsupportedEncodingException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
byte[] allBs = new byte[Sec_WebSocket_Key_Bytes.length + SEC_WEBSOCKET_KEY_SUFFIX_BYTES.length]; byte[] allBs = new byte[secWebSocketKeyBytes.length + SEC_WEBSOCKET_KEY_SUFFIX_BYTES.length];
System.arraycopy(Sec_WebSocket_Key_Bytes, 0, allBs, 0, Sec_WebSocket_Key_Bytes.length); System.arraycopy(secWebSocketKeyBytes, 0, allBs, 0, secWebSocketKeyBytes.length);
System.arraycopy(SEC_WEBSOCKET_KEY_SUFFIX_BYTES, 0, allBs, Sec_WebSocket_Key_Bytes.length, SEC_WEBSOCKET_KEY_SUFFIX_BYTES.length); System.arraycopy(SEC_WEBSOCKET_KEY_SUFFIX_BYTES, 0, allBs, secWebSocketKeyBytes.length, SEC_WEBSOCKET_KEY_SUFFIX_BYTES.length);
byte[] key_array = SHA1Util.SHA1(allBs); byte[] keyArray = SHA1Util.SHA1(allBs);
String acceptKey = BASE64Util.byteArrayToBase64(key_array); String acceptKey = BASE64Util.byteArrayToBase64(keyArray);
HttpResponse httpResponse = new HttpResponse(request); HttpResponse httpResponse = new HttpResponse(request);
httpResponse.setStatus(HttpResponseStatus.C101); httpResponse.setStatus(HttpResponseStatus.C101);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册