提交 db453e1d 编写于 作者: 如梦技术's avatar 如梦技术 🐛

代码完善。

上级 69e77540
# 变更记录
## 发行版本
### v1.0.0 - 2021-07-31
- :sparkles: 基于低延迟高性能的 t-io AIO 框架。
- :sparkles: 支持 MQTT v3.1、v3.1.1 以及 v5.0 协议。
- :sparkles: 支持 MQTT client 客户端。
- :sparkles: 支持 MQTT server 服务端。
- :sparkles: 支持 MQTT 遗嘱消息。
- :sparkles: 支持 MQTT 保留消息。
- :sparkles: 支持自定义消息(mq)处理转发实现集群。
- :sparkles: 支持 GraalVM 编译成本机可执行程序。
\ No newline at end of file
......@@ -4,25 +4,29 @@
[![Codacy Badge](https://app.codacy.com/project/badge/Grade/30dad82f79f34e41bafbc3cef6b68fc3)](https://www.codacy.com/gh/lets-mica/mica-mqtt/dashboard?utm_source=github.com&utm_medium=referral&utm_content=lets-mica/mica-mqtt&utm_campaign=Badge_Grade)
[![GitHub](https://img.shields.io/github/license/lets-mica/mica-mqtt.svg?style=flat-square)](https://github.com/lets-mica/mica-mqtt/blob/master/LICENSE)
基于 `t-io` 实现的**低延迟****高性能**`mqtt` 物联网组件。
基于 `t-io` 实现的**低延迟****高性能**`mqtt` 物联网组件。更多使用方式详见: **mica-mqtt-example** 模块。
## 功能
- [x] 支持 MQTT v3.1、v3.1.1 以及 v5.0 协议。
- [x] MQTT client 客户端。
- [x] MQTT server 服务端。
- [x] 支持 MQTT client 客户端。
- [x] 支持 MQTT server 服务端。
- [x] 支持 MQTT 遗嘱消息。
- [x] 支持 MQTT 保留消息。
- [x] 支持自定义消息(mq)处理转发实现集群。
- [x] MQTT 客户端 阿里云 mqtt 连接 demo。
## 待办
- [ ] 完善使用文档。
- [ ] 添加 Spring boot stater。
- [ ] 添加 websocket 支持。
## 文档
- [mica-mqtt 使用文档](docs/docs.md)
- [mica-mqtt 发行版本](CHANGELOG.md)
- [t-io 官方文档](https://www.tiocloud.com/doc/tio/85)
- [mqtt 协议文档](https://github.com/mcxiaoke/mqtt)
## 快速开始
查看 `mica-mqtt-example` 中有 `mqtt` 服务端和客户端演示代码, `main` 方法运行即可。
查看 **mica-mqtt-example** 中有 `mqtt` 服务端和客户端演示代码, `main` 方法运行即可。
### 1. 启动 Server 端
......@@ -133,16 +137,23 @@ Mica-Mqtt-Server
详见 `mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/aliyun/MqttClientTest.java`
## 参考vs借鉴
- [netty codec-mqtt](https://github.com/netty/netty/tree/4.1/codec-mqtt)
- [netty codec mqtt](https://github.com/netty/netty/tree/4.1/codec-mqtt)
- [jmqtt](https://github.com/Cicizz/jmqtt)
- [iot-mqtt-server](https://gitee.com/recallcode/iot-mqtt-server)
- [moquette](https://github.com/moquette-io/moquette)
- [netty-mqtt-client](https://github.com/jetlinks/netty-mqtt-client)
## 工具
## mqtt 客户端工具
- [mqttx 优雅的跨平台 MQTT 5.0 客户端工具](https://mqttx.app/cn/)
- [mqttx.fx mqtt 客户端](http://mqttfx.org/)
## 开源推荐
- `Avue` 一款基于 vue 可配置化的神奇框架:[https://gitee.com/smallweigit/avue](https://gitee.com/smallweigit/avue)
- `pig` 宇宙最强微服务(架构师必备):[https://gitee.com/log4j/pig](https://gitee.com/log4j/pig)
- `SpringBlade` 完整的线上解决方案(企业开发必备):[https://gitee.com/smallc/SpringBlade](https://gitee.com/smallc/SpringBlade)
- `IJPay` 支付 SDK 让支付触手可及:[https://gitee.com/javen205/IJPay](https://gitee.com/javen205/IJPay)
- `JustAuth` 史上最全的整合第三方登录的开源库: [https://github.com/zhangyd-c/JustAuth](https://github.com/zhangyd-c/JustAuth)
- `spring-boot-demo` 深度学习并实战 spring boot 的项目: [https://github.com/xkcoding/spring-boot-demo](https://github.com/xkcoding/spring-boot-demo)
## 微信公众号
![如梦技术](docs/img/dreamlu-weixin.jpg)
......
# 文档备忘
## topic 通配符含义
- `/`:用来表示层次,比如a/b,a/b/c。
- `#`:表示匹配>=0个层次,比如a/#就匹配a/,a/b,a/b/c。单独的一个#表示匹配所有。不允许 a#和a/#/c。
- `+`:表示匹配一个层次,例如a/+匹配a/b,a/c,不匹配a/b/c。单独的一个+是允许的,a+不允许,a/+/b不允许
\ No newline at end of file
- `/`:用来表示层次,比如 a/b,a/b/c。
- `#`:表示匹配 `>=0` 个层次,比如 a/# 就匹配 a/,a/b,a/b/c。单独的一个 # 表示匹配所有。不允许 a# 和 a/#/c。
- `+`:表示匹配一个层次,例如 a/+ 匹配 a/b,a/c,不匹配 a/b/c。单独的一个 + 是允许的,a+ 不允许,a/+/b 不允许。
## 客户端使用
```java
// 初始化 mqtt 客户端
MqttClient client = MqttClient.create()
.ip("127.0.0.1")
.port(1883) // 默认:1883
.username("admin")
.password("123456")
.version(MqttVersion.MQTT_5) // 默认:3_1_1
.clientId("xxxxxx") // 默认:MICA-MQTT- 前缀和 36进制的纳秒数
.bufferAllocator(ByteBufferAllocator.DIRECT) // 堆内存和堆外内存,默认:堆内存
.readBufferSize(512) // 消息一起解析的长度,默认:为 8092 (mqtt 消息最大长度)
.keepAliveSecs(120) // 默认:60s
.timeout(10) // 超时时间,t-io 配置,可为 null,为 null 时,t-io 默认为 5
.reconnect(true) // 是否重连,默认:true
.reInterval(5000) // 重连重试时间,reconnect 为 true 时有效,t-io 默认为:5000
.willMessage(builder -> {
builder.topic("/test/offline").messageText("hello"); // 遗嘱消息
})
.properties() // mqtt5 properties
.connect();
// 消息订阅,同类方法 subxxx
client.subQos0("/test/#", (topic, payload) -> {
logger.info(topic + '\t' + ByteBufferUtil.toString(payload));
});
// 取消订阅
client.unSubscribe("/test/#");
// 发送消息
client.publish("/test/client", ByteBuffer.wrap("mica最牛皮".getBytes(StandardCharsets.UTF_8)));
// 断开连接
client.disconnect();
// 重连
client.reconnect();
// 停止
client.stop();
```
## 服务端使用
```java
// 注意:为了能接受更多链接(降低内存),请添加 jvm 参数 -Xss129k
MqttServer mqttServer = MqttServer.create()
// 默认:127.0.0.1
.ip("127.0.0.1")
// 默认:1883
.port(1883)
// 默认为: 8092(mqtt 默认最大消息大小),为了降低内存可以减小小此参数,如果消息过大 t-io 会尝试解析多次(建议根据实际业务情况而定)
.readBufferSize(512)
// 自定义认证
.authHandler((clientId, userName, password) -> true)
// 消息监听
.messageListener((clientId, topic, mqttQoS, payload) -> {
logger.info("clientId:{} topic:{} mqttQoS:{} message:{}", clientId, topic, mqttQoS, ByteBufferUtil.toString(payload));
})
// 堆内存和堆外内存选择,默认:堆内存
.bufferAllocator(ByteBufferAllocator.HEAP)
// 心跳超时时间,默认:120s
.heartbeatTimeout(120_1000L)
// ssl 配置
.useSsl("", "", "")
// 自定义客户端上下线监听
.connectStatusListener(new IMqttConnectStatusListener() {
@Override
public void online(String clientId) {
}
@Override
public void offline(String clientId) {
}
})
// 自定义消息转发,可用 mq 广播实现集群化处理
.messageDispatcher(new IMqttMessageDispatcher() {
@Override
public void config(MqttServer mqttServer) {
}
@Override
public boolean send(Message message) {
return false;
}
@Override
public boolean send(String clientId, Message message) {
return false;
}
})
.debug() // 开启 debug 信息日志
.start();
// 发送给某个客户端
mqttServer.publish("clientId","/test/123", ByteBuffer.wrap("mica最牛皮".getBytes()), MqttQoS.EXACTLY_ONCE);
// 发送给所有在线监听这个 topic 的客户端
mqttServer.publishAll("/test/123", ByteBuffer.wrap("mica最牛皮".getBytes()), MqttQoS.EXACTLY_ONCE);
// 停止服务
mqttServer.stop();
```
\ No newline at end of file
......@@ -62,7 +62,7 @@ public class MqttClientAioListener extends DefaultClientAioListener {
.username(clientConfig.getUsername())
.keepAlive(clientConfig.getKeepAliveSecs())
.cleanSession(clientConfig.isCleanSession())
.protocolVersion(clientConfig.getProtocolVersion())
.protocolVersion(clientConfig.getVersion())
.willFlag(willMessage != null);
// 2. 密码
String password = clientConfig.getPassword();
......
......@@ -86,7 +86,7 @@ public final class MqttClientCreator {
/**
* mqtt 协议,默认:3_1_1
*/
private MqttVersion protocolVersion = MqttVersion.MQTT_3_1_1;
private MqttVersion version = MqttVersion.MQTT_3_1_1;
/**
* 用户名
*/
......@@ -156,8 +156,8 @@ public final class MqttClientCreator {
return clientId;
}
public MqttVersion getProtocolVersion() {
return protocolVersion;
public MqttVersion getVersion() {
return version;
}
public String getUsername() {
......@@ -234,8 +234,8 @@ public final class MqttClientCreator {
return this;
}
public MqttClientCreator protocolVersion(MqttVersion protocolVersion) {
this.protocolVersion = protocolVersion;
public MqttClientCreator version(MqttVersion version) {
this.version = version;
return this;
}
......
......@@ -205,6 +205,15 @@ public class MqttServerCreator {
return this;
}
public IMqttMessageDispatcher getMessageDispatcher() {
return messageDispatcher;
}
public MqttServerCreator messageDispatcher(IMqttMessageDispatcher messageDispatcher) {
this.messageDispatcher = messageDispatcher;
return this;
}
public IMqttMessageStore getMessageStore() {
return messageStore;
}
......
......@@ -41,7 +41,7 @@ public class MqttClientTest {
.ip("127.0.0.1")
.username("admin")
.password("123456")
.protocolVersion(MqttVersion.MQTT_5)
.version(MqttVersion.MQTT_5)
.connect();
client.subQos0("/test/#", (topic, payload) -> {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册