提交 622031c4 编写于 作者: 浅梦2013's avatar 浅梦2013

Merge branch 'master' of https://github.com/lets-mica/mica-mqtt

...@@ -4,7 +4,8 @@ ...@@ -4,7 +4,8 @@
### v1.2.2 - 2021-12-26 ### v1.2.2 - 2021-12-26
- :sparkles: mica-mqtt server 添加发布权限接口,无权限直接断开连接,避免高级别 qos 重试浪费资源。 - :sparkles: mica-mqtt server 添加发布权限接口,无权限直接断开连接,避免高级别 qos 重试浪费资源。
- :sparkles: mica-mqtt-broker 优化节点信息存储 - :sparkles: mica-mqtt-broker 优化节点信息存储
- :sparkles: mqtt client 抽象 IMqttClientSession 接口。 - :sparkles: mica-mqtt client 重复订阅优化。感谢 `@一片小雨滴`
- :sparkles: mica-mqtt client 抽象 IMqttClientSession 接口。
- :bug: 修复重构 AbstractMqttMessageDispatcher 保持跟 mica-mqtt-broker 逻辑一致 gitee #I4MA6A 感谢 `@胡萝博` - :bug: 修复重构 AbstractMqttMessageDispatcher 保持跟 mica-mqtt-broker 逻辑一致 gitee #I4MA6A 感谢 `@胡萝博`
- :arrow_up: mica-mqtt-example 升级 log4j2 到 2.17.0 - :arrow_up: mica-mqtt-example 升级 log4j2 到 2.17.0
......
...@@ -143,11 +143,3 @@ mqttServer.publishAll("/test/123", ByteBuffer.wrap("mica最牛皮".getBytes())); ...@@ -143,11 +143,3 @@ mqttServer.publishAll("/test/123", ByteBuffer.wrap("mica最牛皮".getBytes()));
// 停止服务 // 停止服务
mqttServer.stop(); mqttServer.stop();
``` ```
## 基于 mq 消息广播集群处理
- 实现 `IMqttConnectStatusListener` 处理设备状态存储。
- 实现 `IMqttMessageListener` 将消息转发到 mq,业务按需处理 mq 消息。
- 实现 `IMqttMessageStore` 存储遗嘱和保留消息。
- 实现 `AbstractMqttMessageDispatcher` 将消息发往 mq,mq 再广播回 mqtt 集群,mqtt 将消息发送到设备。
- 业务消息发送到 mq,mq 广播到 mqtt 集群,mqtt 将消息发送到设备。
...@@ -33,8 +33,9 @@ public interface IMqttServerPublishPermission { ...@@ -33,8 +33,9 @@ public interface IMqttServerPublishPermission {
* @param clientId 客户端 id * @param clientId 客户端 id
* @param topic topic * @param topic topic
* @param qoS MqttQoS * @param qoS MqttQoS
* @param isRetain 是否保留消息
* @return 否有发布权限 * @return 否有发布权限
*/ */
boolean hasPermission(ChannelContext context, String clientId, String topic, MqttQoS qoS); boolean hasPermission(ChannelContext context, String clientId, String topic, MqttQoS qoS, boolean isRetain);
} }
...@@ -199,7 +199,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor { ...@@ -199,7 +199,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
MqttPublishVariableHeader variableHeader = message.variableHeader(); MqttPublishVariableHeader variableHeader = message.variableHeader();
String topicName = variableHeader.topicName(); String topicName = variableHeader.topicName();
// 1. 判断是否有发布权限,没有权限则断开 mqtt 连接 mqtt 5.x qos1、qos2 可以响应 reasonCode // 1. 判断是否有发布权限,没有权限则断开 mqtt 连接 mqtt 5.x qos1、qos2 可以响应 reasonCode
if (publishPermission != null && !publishPermission.hasPermission(context, clientId, topicName, mqttQoS)) { if (publishPermission != null && !publishPermission.hasPermission(context, clientId, topicName, mqttQoS, fixedHeader.isRetain())) {
Tio.remove(context, "Mqtt clientId:" + clientId + " publish topic: " + topicName + " no permission."); Tio.remove(context, "Mqtt clientId:" + clientId + " publish topic: " + topicName + " no permission.");
return; return;
} }
...@@ -428,7 +428,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor { ...@@ -428,7 +428,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
message.setPayload(payload); message.setPayload(payload);
} }
message.setMessageType(MessageType.UP_STREAM); message.setMessageType(MessageType.UP_STREAM);
message.setRetain(fixedHeader.isRetain()); message.setRetain(isRetain);
message.setDup(fixedHeader.isDup()); message.setDup(fixedHeader.isDup());
message.setTimestamp(System.currentTimeMillis()); message.setTimestamp(System.currentTimeMillis());
Node clientNode = context.getClientNode(); Node clientNode = context.getClientNode();
......
...@@ -63,7 +63,8 @@ mqtt: ...@@ -63,7 +63,8 @@ mqtt:
| --------------------------- | -------------- | ------------------------- | | --------------------------- | -------------- | ------------------------- |
| IMqttServerUniqueIdService | 否 | 用于 clientId 不唯一时,自定义实现唯一标识,后续接口使用它替代 clientId | | IMqttServerUniqueIdService | 否 | 用于 clientId 不唯一时,自定义实现唯一标识,后续接口使用它替代 clientId |
| IMqttServerAuthHandler | 是 | 用于服务端认证 | | IMqttServerAuthHandler | 是 | 用于服务端认证 |
| IMqttServerSubscribeValidator | 是 | 1.1.3 新增,用于服务端订阅校验 | | IMqttServerSubscribeValidator | 否(建议实现) | 1.1.3 新增,用于对客户端订阅校验 |
| IMqttServerPublishPermission | 否(建议实现) | 1.2.2 新增,用于对客户端发布权限校验 |
| IMqttMessageListener | 是 | 消息监听 | | IMqttMessageListener | 是 | 消息监听 |
| IMqttConnectStatusListener | 是 | 连接状态监听 | | IMqttConnectStatusListener | 是 | 连接状态监听 |
| IMqttSessionManager | 否 | session 管理 | | IMqttSessionManager | 否 | session 管理 |
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册