Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
mica
mica-mqtt
提交
69e0bcf0
mica-mqtt
项目概览
mica
/
mica-mqtt
通知
71
Star
1
Fork
1
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
mica-mqtt
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
69e0bcf0
编写于
5月 08, 2022
作者:
如梦技术
🐛
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
🐛
重构 topic 匹配规则,修复 gitee #I56BTC
上级
d2c1c8eb
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
110 addition
and
44 deletion
+110
-44
mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/cluster/RedisMqttMessageStore.java
...reamlu/iot/mqtt/broker/cluster/RedisMqttMessageStore.java
+1
-3
mica-mqtt-core/pom.xml
mica-mqtt-core/pom.xml
+5
-0
mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/session/InMemoryMqttSessionManager.java
.../mqtt/core/server/session/InMemoryMqttSessionManager.java
+1
-1
mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/store/InMemoryMqttMessageStore.java
.../iot/mqtt/core/server/store/InMemoryMqttMessageStore.java
+1
-3
mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/util/TopicUtil.java
...c/main/java/net/dreamlu/iot/mqtt/core/util/TopicUtil.java
+50
-37
mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/util/TopicUtilTest.java
...st/java/net/dreamlu/iot/mqtt/core/util/TopicUtilTest.java
+52
-0
未找到文件。
mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/cluster/RedisMqttMessageStore.java
浏览文件 @
69e0bcf0
...
...
@@ -27,7 +27,6 @@ import net.dreamlu.mica.redis.cache.MicaRedisCache;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.regex.Pattern
;
/**
* redis mqtt 遗嘱和保留消息存储
...
...
@@ -71,14 +70,13 @@ public class RedisMqttMessageStore implements IMqttMessageStore {
@Override
public
List
<
Message
>
getRetainMessage
(
String
topicFilter
)
{
List
<
Message
>
retainMessageList
=
new
ArrayList
<>();
Pattern
topicPattern
=
TopicUtil
.
getTopicPattern
(
topicFilter
);
RedisKeys
redisKey
=
RedisKeys
.
MESSAGE_STORE_RETAIN
;
String
redisKeyPrefix
=
redisKey
.
getKey
();
String
redisKeyPattern
=
redisKeyPrefix
.
concat
(
RedisUtil
.
getTopicPattern
(
topicFilter
));
int
keyPrefixLength
=
redisKeyPrefix
.
length
();
redisCache
.
scan
(
redisKeyPattern
,
(
key
)
->
{
String
keySuffix
=
key
.
substring
(
keyPrefixLength
);
if
(
topicPattern
.
matcher
(
keySuffix
).
matches
(
))
{
if
(
TopicUtil
.
match
(
topicFilter
,
keySuffix
))
{
Message
message
=
redisCache
.
get
(
key
,
messageSerializer:
:
deserialize
);
if
(
message
!=
null
)
{
retainMessageList
.
add
(
message
);
...
...
mica-mqtt-core/pom.xml
浏览文件 @
69e0bcf0
...
...
@@ -32,6 +32,11 @@
<groupId>
org.t-io
</groupId>
<artifactId>
tio-websocket-server
</artifactId>
</dependency>
<dependency>
<groupId>
junit
</groupId>
<artifactId>
junit
</artifactId>
<scope>
test
</scope>
</dependency>
</dependencies>
</project>
mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/session/InMemoryMqttSessionManager.java
浏览文件 @
69e0bcf0
...
...
@@ -89,7 +89,7 @@ public class InMemoryMqttSessionManager implements IMqttSessionManager {
Integer
qosValue
=
null
;
Set
<
String
>
topicFilterSet
=
subscribeStore
.
keySet
();
for
(
String
topicFilter
:
topicFilterSet
)
{
if
(
TopicUtil
.
getTopicPattern
(
topicFilter
).
matcher
(
topicName
).
matches
(
))
{
if
(
TopicUtil
.
match
(
topicFilter
,
topicName
))
{
ConcurrentMap
<
String
,
Integer
>
data
=
subscribeStore
.
get
(
topicFilter
);
if
(
data
!=
null
&&
!
data
.
isEmpty
())
{
Integer
mqttQoS
=
data
.
get
(
clientId
);
...
...
mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/store/InMemoryMqttMessageStore.java
浏览文件 @
69e0bcf0
...
...
@@ -24,7 +24,6 @@ import java.util.ArrayList;
import
java.util.List
;
import
java.util.concurrent.ConcurrentHashMap
;
import
java.util.concurrent.ConcurrentMap
;
import
java.util.regex.Pattern
;
/**
* message store
...
...
@@ -72,10 +71,9 @@ public class InMemoryMqttMessageStore implements IMqttMessageStore {
@Override
public
List
<
Message
>
getRetainMessage
(
String
topicFilter
)
{
Pattern
topicPattern
=
TopicUtil
.
getTopicPattern
(
topicFilter
);
List
<
Message
>
retainMessageList
=
new
ArrayList
<>();
retainStore
.
forEach
((
topic
,
message
)
->
{
if
(
topicPattern
.
matcher
(
topic
).
matches
(
))
{
if
(
TopicUtil
.
match
(
topicFilter
,
topic
))
{
retainMessageList
.
add
(
message
);
}
});
...
...
mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/util/TopicUtil.java
浏览文件 @
69e0bcf0
...
...
@@ -16,19 +16,12 @@
package
net.dreamlu.iot.mqtt.core.util
;
import
net.dreamlu.iot.mqtt.codec.MqttCodecUtil
;
import
java.util.Map
;
import
java.util.concurrent.ConcurrentHashMap
;
import
java.util.regex.Pattern
;
/**
* Mqtt Topic 工具
*
* @author L.cm
*/
public
final
class
TopicUtil
{
private
static
final
Map
<
String
,
Pattern
>
TOPIC_FILTER_PATTERN_CACHE
=
new
ConcurrentHashMap
<>(
32
);
/**
* 判断 topicFilter topicName 是否匹配
...
...
@@ -38,37 +31,57 @@ public final class TopicUtil {
* @return 是否匹配
*/
public
static
boolean
match
(
String
topicFilter
,
String
topicName
)
{
if
(
MqttCodecUtil
.
isTopicFilter
(
topicFilter
))
{
return
getTopicPattern
(
topicFilter
).
matcher
(
topicName
).
matches
();
}
else
{
return
topicFilter
.
equals
(
topicName
);
char
[]
topicFilterChars
=
topicFilter
.
toCharArray
();
char
[]
topicNameChars
=
topicName
.
toCharArray
();
int
topicFilterLength
=
topicFilterChars
.
length
;
int
topicNameLength
=
topicNameChars
.
length
;
int
topicFilterIdxEnd
=
topicFilterLength
-
1
;
char
ch
;
// 是否进入 + 号层级通配符
boolean
inLayerWildcard
=
false
;
for
(
int
i
=
0
;
i
<
topicFilterLength
;
i
++)
{
ch
=
topicFilterChars
[
i
];
if
(
ch
==
'#'
)
{
// 校验: # 通配符只能在最后一位
if
(
i
<
topicFilterIdxEnd
)
{
throw
new
IllegalArgumentException
(
"Mqtt subscribe topicFilter illegal:"
+
topicFilter
);
}
return
true
;
}
else
if
(
ch
==
'+'
)
{
// 校验: 单独 + 是允许的,判断 + 号前一位是否为 /
if
(
i
>
0
&&
topicFilterChars
[
i
-
1
]
!=
'/'
)
{
throw
new
IllegalArgumentException
(
"Mqtt subscribe topicFilter illegal:"
+
topicFilter
);
}
// 如果 + 是最后一位,判断 topicName 中是否还存在层级 /
if
(
i
==
topicFilterIdxEnd
&&
topicNameLength
>
i
)
{
for
(
int
j
=
i
;
j
<
topicNameLength
;
j
++)
{
if
(
topicNameChars
[
j
]
==
'/'
)
{
return
false
;
}
}
}
inLayerWildcard
=
true
;
continue
;
}
else
if
(
ch
==
'/'
)
{
if
(
inLayerWildcard
)
{
inLayerWildcard
=
false
;
}
// 预读下一位,如果是 #,并且 topicName 位数已经不足
int
next
=
i
+
1
;
if
((
topicFilterLength
>
next
)
&&
topicFilterChars
[
next
]
==
'#'
&&
topicNameLength
<
next
)
{
return
true
;
}
}
// topicName 长度不够了
if
(
topicNameLength
<
i
)
{
return
false
;
}
// 进入通配符
if
(!
inLayerWildcard
&&
ch
!=
topicNameChars
[
i
])
{
return
false
;
}
}
}
/**
* mqtt topicFilter 转正则
*
* @param topicFilter topicFilter
* @return Pattern
*/
public
static
Pattern
getTopicPattern
(
String
topicFilter
)
{
return
TOPIC_FILTER_PATTERN_CACHE
.
computeIfAbsent
(
topicFilter
,
TopicUtil:
:
getTopicFilterPattern
);
}
/**
* mqtt topicFilter 转正则
*
* @param topicFilter topicFilter
* @return Pattern
*/
public
static
Pattern
getTopicFilterPattern
(
String
topicFilter
)
{
// mqtt 分享主题 $share/{ShareName}/{filter}
String
topicRegex
=
topicFilter
.
startsWith
(
"$"
)
?
"\\"
+
topicFilter
:
topicFilter
;
return
Pattern
.
compile
(
topicRegex
.
replace
(
"+"
,
"[^/]+"
)
.
replace
(
"#"
,
".+"
)
.
concat
(
"$"
)
);
return
true
;
}
/**
...
...
mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/util/TopicUtilTest.java
0 → 100644
浏览文件 @
69e0bcf0
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
net.dreamlu.iot.mqtt.core.util
;
import
org.junit.Assert
;
import
org.junit.Test
;
/**
* TopicUtil 测试
*
* @author L.cm
*/
public
class
TopicUtilTest
{
@Test
public
void
test
()
{
// gitee issues #I56BTC /iot/test/# 无法匹配到 /iot/test 和 /iot/test/
Assert
.
assertFalse
(
TopicUtil
.
match
(
"+"
,
"/iot/test"
));
Assert
.
assertFalse
(
TopicUtil
.
match
(
"+"
,
"iot/test"
));
Assert
.
assertFalse
(
TopicUtil
.
match
(
"+"
,
"/iot/test"
));
Assert
.
assertFalse
(
TopicUtil
.
match
(
"+"
,
"/iot"
));
Assert
.
assertFalse
(
TopicUtil
.
match
(
"+/test"
,
"/iot/test"
));
Assert
.
assertFalse
(
TopicUtil
.
match
(
"/iot/test/+/"
,
"/iot/test/123"
));
Assert
.
assertTrue
(
TopicUtil
.
match
(
"/iot/test/+"
,
"/iot/test/123"
));
Assert
.
assertFalse
(
TopicUtil
.
match
(
"/iot/test/+"
,
"/iot/test/123/"
));
Assert
.
assertTrue
(
TopicUtil
.
match
(
"#"
,
"/iot/test"
));
Assert
.
assertTrue
(
TopicUtil
.
match
(
"/iot/test/#"
,
"/iot/test"
));
Assert
.
assertTrue
(
TopicUtil
.
match
(
"/iot/test/#"
,
"/iot/test"
));
Assert
.
assertTrue
(
TopicUtil
.
match
(
"/iot/test/#"
,
"/iot/test/"
));
Assert
.
assertTrue
(
TopicUtil
.
match
(
"/iot/test/#"
,
"/iot/test/123123/12312"
));
Assert
.
assertTrue
(
TopicUtil
.
match
(
"/iot/test/123"
,
"/iot/test/123"
));
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录