Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
java胎教生
mica-mqtt
提交
86ab7ebe
mica-mqtt
项目概览
java胎教生
/
mica-mqtt
与 Fork 源项目一致
Fork自
mica / mica-mqtt
通知
5
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
mica-mqtt
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
86ab7ebe
编写于
8月 10, 2021
作者:
浅梦2013
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
✨
mqtt 日志优化。
上级
e1f6c182
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
14 addition
and
15 deletion
+14
-15
mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientProcessor.java
...amlu/iot/mqtt/core/client/DefaultMqttClientProcessor.java
+0
-1
mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServer.java
...ain/java/net/dreamlu/iot/mqtt/core/server/MqttServer.java
+6
-6
mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/support/DefaultMqttServerProcessor.java
.../mqtt/core/server/support/DefaultMqttServerProcessor.java
+8
-8
未找到文件。
mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientProcessor.java
浏览文件 @
86ab7ebe
...
...
@@ -24,7 +24,6 @@ import org.slf4j.LoggerFactory;
import
org.tio.core.ChannelContext
;
import
org.tio.core.Node
;
import
org.tio.core.Tio
;
import
org.tio.core.TioConfig
;
import
java.nio.ByteBuffer
;
import
java.util.List
;
...
...
mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServer.java
浏览文件 @
86ab7ebe
...
...
@@ -125,12 +125,12 @@ public final class MqttServer {
public
boolean
publish
(
String
clientId
,
String
topic
,
ByteBuffer
payload
,
MqttQoS
qos
,
boolean
retain
)
{
ChannelContext
context
=
Tio
.
getByBsId
(
getServerConfig
(),
clientId
);
if
(
context
==
null
||
context
.
isClosed
)
{
logger
.
warn
(
"Mqtt
publish to clientId:{} ChannelContext is null may be disconnected."
,
clientId
);
logger
.
warn
(
"Mqtt
Topic:{} publish to clientId:{} ChannelContext is null may be disconnected."
,
topic
,
clientId
);
return
false
;
}
List
<
Subscribe
>
subscribeList
=
sessionManager
.
searchSubscribe
(
topic
,
clientId
);
if
(
subscribeList
.
isEmpty
())
{
logger
.
warn
(
"Mqtt
publish but clientId:{} subscribeList is empty."
,
clientId
);
logger
.
warn
(
"Mqtt
Topic:{} publish but clientId:{} subscribeList is empty."
,
topic
,
clientId
);
return
false
;
}
for
(
Subscribe
subscribe
:
subscribeList
)
{
...
...
@@ -163,7 +163,7 @@ public final class MqttServer {
.
messageId
(
messageId
)
.
build
();
boolean
result
=
Tio
.
send
(
context
,
message
);
logger
.
debug
(
"MQTT publish topic:{} qos:{} retain:{}
result:{}"
,
topic
,
qos
,
retain
,
result
);
logger
.
info
(
"MQTT Topic:{} qos:{} retain:{} publish
result:{}"
,
topic
,
qos
,
retain
,
result
);
if
(
isHighLevelQoS
)
{
MqttPendingPublish
pendingPublish
=
new
MqttPendingPublish
(
payload
,
message
,
qos
);
sessionManager
.
addPendingPublish
(
clientId
,
messageId
,
pendingPublish
);
...
...
@@ -220,14 +220,14 @@ public final class MqttServer {
// 查找订阅该 topic 的客户端
List
<
Subscribe
>
subscribeList
=
sessionManager
.
searchSubscribe
(
topic
);
if
(
subscribeList
.
isEmpty
())
{
logger
.
warn
(
"Mqtt
publish but topic:{}
subscribe client list is empty."
,
topic
);
logger
.
warn
(
"Mqtt
Topic:{} publishAll but
subscribe client list is empty."
,
topic
);
return
false
;
}
for
(
Subscribe
subscribe
:
subscribeList
)
{
String
clientId
=
subscribe
.
getClientId
();
ChannelContext
context
=
Tio
.
getByBsId
(
getServerConfig
(),
clientId
);
if
(
context
==
null
||
context
.
isClosed
)
{
logger
.
warn
(
"Mqtt
publish to clientId:{} ChannelContext may be disconnected."
,
clientId
);
logger
.
warn
(
"Mqtt
Topic:{} publish to clientId:{} channel is null may be disconnected."
,
topic
,
clientId
);
continue
;
}
int
subMqttQoS
=
subscribe
.
getMqttQoS
();
...
...
@@ -243,7 +243,7 @@ public final class MqttServer {
try
{
sessionManager
.
clean
();
}
catch
(
Throwable
e
)
{
logger
.
error
(
"Mqtt
s
erver stop session clean error."
,
e
);
logger
.
error
(
"Mqtt
S
erver stop session clean error."
,
e
);
}
this
.
executor
.
shutdown
();
return
result
;
...
...
mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/support/DefaultMqttServerProcessor.java
浏览文件 @
86ab7ebe
...
...
@@ -127,7 +127,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
.
sessionPresent
(
false
)
.
build
();
Tio
.
send
(
context
,
message
);
logger
.
debug
(
"Connect ack send - clientId: {} returnCode:{}"
,
clientId
,
returnCode
);
logger
.
info
(
"Connect ack send - clientId: {} returnCode:{}"
,
clientId
,
returnCode
);
}
@Override
...
...
@@ -174,7 +174,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
public
void
processPubAck
(
ChannelContext
context
,
MqttMessageIdVariableHeader
variableHeader
)
{
int
messageId
=
variableHeader
.
messageId
();
String
clientId
=
context
.
getBsId
();
logger
.
debug
(
"PubAck - clientId:{}, messageId:
{}"
,
clientId
,
messageId
);
logger
.
debug
(
"PubAck - clientId:{}, messageId:{}"
,
clientId
,
messageId
);
MqttPendingPublish
pendingPublish
=
sessionManager
.
getPendingPublish
(
clientId
,
messageId
);
if
(
pendingPublish
==
null
)
{
return
;
...
...
@@ -188,7 +188,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
public
void
processPubRec
(
ChannelContext
context
,
MqttMessageIdVariableHeader
variableHeader
)
{
String
clientId
=
context
.
getBsId
();
int
messageId
=
variableHeader
.
messageId
();
logger
.
debug
(
"PubRec - clientId:{}, messageId:
{}"
,
clientId
,
messageId
);
logger
.
debug
(
"PubRec - clientId:{}, messageId:{}"
,
clientId
,
messageId
);
MqttPendingPublish
pendingPublish
=
sessionManager
.
getPendingPublish
(
clientId
,
messageId
);
if
(
pendingPublish
==
null
)
{
return
;
...
...
@@ -207,7 +207,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
public
void
processPubRel
(
ChannelContext
context
,
MqttMessageIdVariableHeader
variableHeader
)
{
String
clientId
=
context
.
getBsId
();
int
messageId
=
variableHeader
.
messageId
();
logger
.
debug
(
"PubRel - clientId:{}, messageId:
{}"
,
clientId
,
messageId
);
logger
.
debug
(
"PubRel - clientId:{}, messageId:{}"
,
clientId
,
messageId
);
MqttPendingQos2Publish
pendingQos2Publish
=
sessionManager
.
getPendingQos2Publish
(
clientId
,
messageId
);
if
(
pendingQos2Publish
!=
null
)
{
MqttPublishMessage
incomingPublish
=
pendingQos2Publish
.
getIncomingPublish
();
...
...
@@ -229,7 +229,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
public
void
processPubComp
(
ChannelContext
context
,
MqttMessageIdVariableHeader
variableHeader
)
{
int
messageId
=
variableHeader
.
messageId
();
String
clientId
=
context
.
getBsId
();
logger
.
debug
(
"PubComp - clientId:{}, messageId:
{}"
,
clientId
,
messageId
);
logger
.
debug
(
"PubComp - clientId:{}, messageId:{}"
,
clientId
,
messageId
);
MqttPendingPublish
pendingPublish
=
sessionManager
.
getPendingPublish
(
clientId
,
messageId
);
if
(
pendingPublish
!=
null
)
{
pendingPublish
.
getPayload
().
clear
();
...
...
@@ -253,8 +253,8 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
mqttQosList
.
add
(
mqttQoS
);
topicList
.
add
(
topicName
);
sessionManager
.
addSubscribe
(
topicName
,
clientId
,
mqttQoS
);
logger
.
debug
(
"Subscribe - clientId:{} messageId:{} topicFilter:{} mqttQoS:{}"
,
clientId
,
messageId
,
topicName
,
mqttQoS
);
}
logger
.
info
(
"Subscribe - clientId:{} Topic:{} mqttQoS:{} messageId:{}"
,
clientId
,
topicList
,
mqttQosList
,
messageId
);
// 3. 返回 ack
MqttMessage
subAckMessage
=
MqttMessageBuilders
.
subAck
()
.
addGrantedQosList
(
mqttQosList
)
...
...
@@ -277,8 +277,8 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
List
<
String
>
topicFilterList
=
message
.
payload
().
topics
();
for
(
String
topicFilter
:
topicFilterList
)
{
sessionManager
.
removeSubscribe
(
topicFilter
,
clientId
);
logger
.
debug
(
"UnSubscribe - clientId:{} messageId:{} topicFilter:{}"
,
clientId
,
messageId
,
topicFilter
);
}
logger
.
info
(
"UnSubscribe - clientId:{} Topic:{} messageId:{}"
,
clientId
,
topicFilterList
,
messageId
);
MqttMessage
unSubMessage
=
MqttMessageBuilders
.
unsubAck
()
.
packetId
(
messageId
)
.
build
();
...
...
@@ -295,7 +295,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
@Override
public
void
processDisConnect
(
ChannelContext
context
)
{
String
clientId
=
context
.
getBsId
();
logger
.
info
(
"DisConnect - clientId:{}
"
,
clientId
);
logger
.
info
(
"DisConnect - clientId:{}
contextId:{}"
,
clientId
,
context
.
getId
()
);
// 设置正常断开的标识
context
.
set
(
MqttConst
.
DIS_CONNECTED
,
(
byte
)
1
);
Tio
.
remove
(
context
,
"Mqtt DisConnect"
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录