提交 f123f171 编写于 作者: 如梦技术's avatar 如梦技术 🐛

完善场景示例。

上级 489275d9
......@@ -9,22 +9,22 @@
### MQTT 遗嘱消息场景
> 当客户端断开连接时,发送给相关的订阅者的遗嘱消息。在设备 A 进行连接时候,遗嘱消息设定为 `offline`,手机App B 订阅这个遗嘱主题。
> 当 A 异常断开时,手机App B 会收到这个 `offline` 的遗嘱消息,从而知道设备 A 离线了。
- 当客户端断开连接时,发送给相关的订阅者的遗嘱消息。在设备 A 进行连接时候,遗嘱消息设定为 `offline`,手机App B 订阅这个遗嘱主题。
- 当 A 异常断开时,手机App B 会收到这个 `offline` 的遗嘱消息,从而知道设备 A 离线了。
### MQTT 保留消息场景
> 例如,某设备定期发布自身 GPS 坐标,但对于订阅者而言,从它发起订阅到第一次收到数据可能需要几秒钟,也可能需要十几分钟甚至更多,这样并不友好。因此 MQTT 引入了保留消息。
> 而每当有订阅者建立订阅时,服务端就会查找是否存在匹配该订阅的保留消息,如果保留消息存在,就会立即转发给订阅者。
> 借助保留消息,新的订阅者能够立即获取最近的状态。
- 例如,某设备定期发布自身 GPS 坐标,但对于订阅者而言,从它发起订阅到第一次收到数据可能需要几秒钟,也可能需要十几分钟甚至更多,这样并不友好。因此 MQTT 引入了保留消息。
- 而每当有订阅者建立订阅时,服务端就会查找是否存在匹配该订阅的保留消息,如果保留消息存在,就会立即转发给订阅者。
- 借助保留消息,新的订阅者能够立即获取最近的状态。
### mica-mqtt 多个客户端直接交互
> A APP 端订阅 `/a/door/open`,
> B web 网页端 mqtt.js 订阅 `/a/door/open`,
> Mqtt 服务端实现 `IMqttMessageListener`,将消息转交给 `AbstractMqttMessageDispatcher`(自定义实现)处理。
> C 发布 `/a/door/open`
> 结果:A 和 B 将收到 C 发布的消息,并完成相应的效果展示。
- A APP 端订阅 `/a/door/open`
- B web 网页端 mqtt.js 订阅 `/a/door/open`
- Mqtt 服务端实现 `IMqttMessageListener`,将消息转交给 `AbstractMqttMessageDispatcher`(自定义实现)处理。
- C 发布 `/a/door/open`
- 结果:A 和 B 将收到 C 发布的消息,并完成相应的效果展示。
## 客户端使用
```java
......
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & www.net.dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.biz;
import net.dreamlu.iot.mqtt.client.MqttClientTest;
import net.dreamlu.iot.mqtt.codec.ByteBufferUtil;
import net.dreamlu.iot.mqtt.core.client.MqttClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 设备 A,这里默认 APP 应用端
*
* @author L.cm
*/
public class DeviceA {
private static final Logger logger = LoggerFactory.getLogger(MqttClientTest.class);
public static void main(String[] args) {
// 初始化 mqtt 客户端
MqttClient client = MqttClient.create()
.ip("127.0.0.1")
.port(1883)
.username("admin")
.password("123456")
.connect();
client.subQos0("/a/door/open", (topic, payload) -> {
logger.info(topic + '\t' + ByteBufferUtil.toString(payload));
});
}
}
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & www.net.dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.biz;
import net.dreamlu.iot.mqtt.client.MqttClientTest;
import net.dreamlu.iot.mqtt.codec.ByteBufferUtil;
import net.dreamlu.iot.mqtt.core.client.MqttClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 设备 B,这里默认 web 端
*
* @author L.cm
*/
public class DeviceB {
private static final Logger logger = LoggerFactory.getLogger(MqttClientTest.class);
public static void main(String[] args) {
// 初始化 mqtt 客户端
MqttClient client = MqttClient.create()
.ip("127.0.0.1")
.port(1883)
.username("admin")
.password("123456")
.connect();
client.subQos0("/a/door/open", (topic, payload) -> {
logger.info(topic + '\t' + ByteBufferUtil.toString(payload));
});
}
}
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & www.net.dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.biz;
import net.dreamlu.iot.mqtt.core.client.MqttClient;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Timer;
import java.util.TimerTask;
/**
* 设备 C,每 5 秒上报一个数据
*
* @author L.cm
*/
public class DeviceC {
public static void main(String[] args) {
// 初始化 mqtt 客户端
MqttClient client = MqttClient.create()
.ip("127.0.0.1")
.port(1883)
.username("admin")
.password("123456")
.connect();
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
client.publish("/a/door/open", ByteBuffer.wrap("open".getBytes(StandardCharsets.UTF_8)));
}
}, 5000, 5000);
}
}
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & www.net.dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.biz;
import net.dreamlu.iot.mqtt.core.server.MqttServer;
import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.event.IMqttMessageListener;
import net.dreamlu.iot.mqtt.core.server.model.Message;
import net.dreamlu.iot.mqtt.core.server.support.DefaultMqttMessageDispatcher;
/**
* 服务端,单纯的做消息转发
*
* @author L.cm
*/
public class Server {
/**
* 客户端 A 模拟 APP 端订阅 `/a/door/open`,
* 客户端 B 模拟 web 网页端 mqtt.js 订阅 `/a/door/open`,
* Mqtt 服务端实现 `IMqttMessageListener`,将消息转交给 `AbstractMqttMessageDispatcher`(自定义实现)处理。
* 客户端 C 定时上报转态给 `/a/door/open`
* 结果:A 和 B 将收到 C 发布的消息,并完成相应的效果展示。
*/
public static void main(String[] args) {
// 1. 消息转发处理器,可用来实现集群
IMqttMessageDispatcher messageDispatcher = new DefaultMqttMessageDispatcher();
// 2. 收到消息,将消息转发出去
IMqttMessageListener messageListener = (clientId, topic, mqttQoS, payload) -> {
Message message = new Message();
message.setTopic(topic);
message.setQos(mqttQoS.value());
message.setPayload(payload.array());
messageDispatcher.send(message);
};
// 3. 启动服务
MqttServer.create()
.ip("0.0.0.0")
.port(1883)
.readBufferSize(512)
.messageDispatcher(messageDispatcher)
.messageListener(messageListener)
.debug()
.start();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册