提交 2d77c9a4 编写于 作者: 浅梦2013's avatar 浅梦2013

📝 更新文档

上级 8d568bcd
......@@ -22,22 +22,26 @@
```java
// 初始化 mqtt 客户端
MqttClient client = MqttClient.create()
.ip("127.0.0.1")
.port(1883) // 默认:1883
.username("admin")
.password("123456")
.version(MqttVersion.MQTT_5) // 默认:3_1_1
.clientId("xxxxxx") // 默认:MICA-MQTT- 前缀和 36进制的纳秒数
.ip("127.0.0.1") // mqtt 服务端 ip 地址
.port(1883) // 默认:1883
.username("admin") // 账号
.password("123456") // 密码
.version(MqttVersion.MQTT_5) // 默认:3_1_1
.clientId("xxxxxx") // 非常重要务必手动设置,一般设备 sn 号,默认:MICA-MQTT- 前缀和 36进制的纳秒数
.bufferAllocator(ByteBufferAllocator.DIRECT) // 堆内存和堆外内存,默认:堆内存
.readBufferSize(512) // 消息一起解析的长度,默认:为 8092 (mqtt 消息最大长度)
.keepAliveSecs(120) // 默认:60s
.timeout(10) // 超时时间,t-io 配置,可为 null,为 null 时,t-io 默认为 5
.reconnect(true) // 是否重连,默认:true
.reInterval(5000) // 重连重试时间,reconnect 为 true 时有效,t-io 默认为:5000
.readBufferSize(512) // 消息一起解析的长度,默认:为 8092 (mqtt 消息最大长度)
.maxBytesInMessage(1024 * 10) // 最大包体长度,如果包体过大需要设置此参数,默认为: 8092
.keepAliveSecs(120) // 默认:60s
.timeout(10) // 超时时间,t-io 配置,可为 null,为 null 时,t-io 默认为 5
.reconnect(true) // 是否重连,默认:true
.reInterval(5000) // 重连重试时间,reconnect 为 true 时有效,t-io 默认为:5000
.willMessage(builder -> {
builder.topic("/test/offline").messageText("hello"); // 遗嘱消息
builder.topic("/test/offline").messageText("hello"); // 遗嘱消息
})
.connectListener((context, isReconnect) -> {
logger.info("链接服务器成功...");
})
.properties() // mqtt5 properties
.properties() // mqtt5 properties
.connect();
// 消息订阅,同类方法 subxxx
......@@ -62,55 +66,57 @@ MqttClient client = MqttClient.create()
```java
// 注意:为了能接受更多链接(降低内存),请添加 jvm 参数 -Xss129k
MqttServer mqttServer = MqttServer.create()
// 默认:127.0.0.1
.ip("127.0.0.1")
// 默认:1883
.port(1883)
// 默认为: 8092(mqtt 默认最大消息大小),为了降低内存可以减小小此参数,如果消息过大 t-io 会尝试解析多次(建议根据实际业务情况而定)
.readBufferSize(512)
// 自定义认证
.authHandler((clientId, userName, password) -> true)
// 消息监听
.messageListener((clientId, topic, mqttQoS, payload) -> {
logger.info("clientId:{} topic:{} mqttQoS:{} message:{}", clientId, topic, mqttQoS, ByteBufferUtil.toString(payload));
})
// 堆内存和堆外内存选择,默认:堆内存
.bufferAllocator(ByteBufferAllocator.HEAP)
// 心跳超时时间,默认:120s
.heartbeatTimeout(120_1000L)
// ssl 配置
.useSsl("", "", "")
// 自定义客户端上下线监听
.connectStatusListener(new IMqttConnectStatusListener() {
@Override
public void online(String clientId) {
}
@Override
public void offline(String clientId) {
}
})
// 自定义消息转发,可用 mq 广播实现集群化处理
.messageDispatcher(new IMqttMessageDispatcher() {
@Override
public void config(MqttServer mqttServer) {
}
@Override
public boolean send(Message message) {
return false;
}
@Override
public boolean send(String clientId, Message message) {
return false;
}
})
.debug() // 开启 debug 信息日志
.start();
// 默认:127.0.0.1
.ip("127.0.0.1")
// 默认:1883
.port(1883)
// 默认为: 8092(mqtt 默认最大消息大小),为了降低内存可以减小小此参数,如果消息过大 t-io 会尝试解析多次(建议根据实际业务情况而定)
.readBufferSize(512)
// 最大包体长度,如果包体过大需要设置此参数,默认为: 8092
.maxBytesInMessage(1024 * 100)
// 自定义认证
.authHandler((clientId, userName, password) -> true)
// 消息监听
.messageListener((clientId, topic, mqttQoS, payload) -> {
logger.info("clientId:{} topic:{} mqttQoS:{} message:{}", clientId, topic, mqttQoS, ByteBufferUtil.toString(payload));
})
// 堆内存和堆外内存选择,默认:堆内存
.bufferAllocator(ByteBufferAllocator.HEAP)
// 心跳超时时间,默认:120s
.heartbeatTimeout(120_1000L)
// ssl 配置
.useSsl("", "", "")
// 自定义客户端上下线监听
.connectStatusListener(new IMqttConnectStatusListener() {
@Override
public void online(String clientId) {
}
@Override
public void offline(String clientId) {
}
})
// 自定义消息转发,可用 mq 广播实现集群化处理
.messageDispatcher(new IMqttMessageDispatcher() {
@Override
public void config(MqttServer mqttServer) {
}
@Override
public boolean send(Message message) {
return false;
}
@Override
public boolean send(String clientId, Message message) {
return false;
}
})
.debug() // 开启 debug 信息日志
.start();
// 发送给某个客户端
mqttServer.publish("clientId","/test/123", ByteBuffer.wrap("mica最牛皮".getBytes()));
......
......@@ -47,6 +47,10 @@ public class MqttClientTest {
// 最大包体长度,如果包体过大需要设置此参数
// .maxBytesInMessage(1024 * 10)
.version(MqttVersion.MQTT_5)
// 连接监听
.connectListener((context, isReconnect) -> {
logger.info("链接服务器成功...");
})
.connect();
client.subQos0("/test/#", (topic, payload) -> {
......
......@@ -42,6 +42,8 @@ mqtt:
read-buffer-size: 8092 # 接收数据的 buffer size,默认:8092
max-bytes-in-message: 8092 # 消息解析最大 bytes 长度,默认:8092
debug: true # 如果开启 prometheus 指标收集建议关闭
websocket-enable: true # 开启 websocket 子协议,默认开启
websocket-port: 8083 # websocket 端口,默认:8083
```
### 2.3 可实现接口(注册成 Spring Bean 即可)
......@@ -161,8 +163,9 @@ public class ServerService {
| mqtt.client.password | | 密码 |
| mqtt.client.client-id | | 客户端ID,非常重要, 默认为:MICA-MQTT- 前缀和 36进制的纳秒数 |
| mqtt.client.clean-session | true | 清除会话 <p> false 表示如果订阅的客户机断线了,那么要保存其要推送的消息,如果其重新连接时,则将这些消息推送。 true 表示消除,表示客户机是第一次连接,消息所以以前的连接信息。 </p> |
| mqtt.client.buffer-allocator | 8092 | ByteBuffer Allocator,支持堆内存和堆外内存,默认为:堆内存 |
| mqtt.client.buffer-allocator | 堆内存 | ByteBuffer Allocator,支持堆内存和堆外内存,默认为:堆内存 |
| mqtt.client.read-buffer-size | 8092 | t-io 每次消息读取长度,跟 maxBytesInMessage 相关 |
| mqtt.client.max-bytes-in-message | 8092 | 消息解析最大 bytes 长度,默认:8092 |
| mqtt.client.reconnect | true | 自动重连 |
| mqtt.client.re-interval | 5000 | 重连重试时间,单位毫秒 |
| mqtt.client.timeout | 5 | 超时时间,单位秒,t-io 配置,可为 null |
......@@ -185,11 +188,18 @@ mqtt:
re-interval: 5000 # 重连时间,默认 5000 毫秒
version: MQTT_5 # mqtt 协议版本,默认:3.1.1
read-buffer-size: 8092 # 接收数据的 buffer size,默认:8092
max-bytes-in-message: 8092 # 消息解析最大 bytes 长度,默认:8092
buffer-allocator: heap # 堆内存和堆外内存,默认:堆内存
keep-alive-secs: 60 # keep-alive 时间,单位:秒
clean-session: true # mqtt clean session,默认:true
```
### 3.3 可实现接口(注册成 Spring Bean 即可)
| 接口 | 是否必须 | 说明 |
| --------------------------- | -------------- | ------------------------- |
| IMqttClientConnectListener | 是 | 客户端连接成功监听 |
### 3.3 自定义 java 配置(可选)
```java
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册