diff --git a/mica-mqtt-core/README.md b/mica-mqtt-core/README.md index 1b6c77c893427dced3be7e36fd0560935c1965aa..b2112bb2f2e86107feb16d5144f9ec0788c38dbf 100644 --- a/mica-mqtt-core/README.md +++ b/mica-mqtt-core/README.md @@ -9,22 +9,22 @@ ### MQTT 遗嘱消息场景 -> 当客户端断开连接时,发送给相关的订阅者的遗嘱消息。在设备 A 进行连接时候,遗嘱消息设定为 `offline`,手机App B 订阅这个遗嘱主题。 -> 当 A 异常断开时,手机App B 会收到这个 `offline` 的遗嘱消息,从而知道设备 A 离线了。 +- 当客户端断开连接时,发送给相关的订阅者的遗嘱消息。在设备 A 进行连接时候,遗嘱消息设定为 `offline`,手机App B 订阅这个遗嘱主题。 +- 当 A 异常断开时,手机App B 会收到这个 `offline` 的遗嘱消息,从而知道设备 A 离线了。 ### MQTT 保留消息场景 -> 例如,某设备定期发布自身 GPS 坐标,但对于订阅者而言,从它发起订阅到第一次收到数据可能需要几秒钟,也可能需要十几分钟甚至更多,这样并不友好。因此 MQTT 引入了保留消息。 -> 而每当有订阅者建立订阅时,服务端就会查找是否存在匹配该订阅的保留消息,如果保留消息存在,就会立即转发给订阅者。 -> 借助保留消息,新的订阅者能够立即获取最近的状态。 +- 例如,某设备定期发布自身 GPS 坐标,但对于订阅者而言,从它发起订阅到第一次收到数据可能需要几秒钟,也可能需要十几分钟甚至更多,这样并不友好。因此 MQTT 引入了保留消息。 +- 而每当有订阅者建立订阅时,服务端就会查找是否存在匹配该订阅的保留消息,如果保留消息存在,就会立即转发给订阅者。 +- 借助保留消息,新的订阅者能够立即获取最近的状态。 ### mica-mqtt 多个客户端直接交互 -> A APP 端订阅 `/a/door/open`, -> B web 网页端 mqtt.js 订阅 `/a/door/open`, -> Mqtt 服务端实现 `IMqttMessageListener`,将消息转交给 `AbstractMqttMessageDispatcher`(自定义实现)处理。 -> C 发布 `/a/door/open` -> 结果:A 和 B 将收到 C 发布的消息,并完成相应的效果展示。 +- A APP 端订阅 `/a/door/open`, +- B web 网页端 mqtt.js 订阅 `/a/door/open`, +- Mqtt 服务端实现 `IMqttMessageListener`,将消息转交给 `AbstractMqttMessageDispatcher`(自定义实现)处理。 +- C 发布 `/a/door/open` +- 结果:A 和 B 将收到 C 发布的消息,并完成相应的效果展示。 ## 客户端使用 ```java diff --git a/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/biz/DeviceA.java b/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/biz/DeviceA.java new file mode 100644 index 0000000000000000000000000000000000000000..3cc52e707832b3e8cb47012c0125344d9c9b3ec7 --- /dev/null +++ b/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/biz/DeviceA.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & www.net.dreamlu.net). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package net.dreamlu.iot.mqtt.biz; + +import net.dreamlu.iot.mqtt.client.MqttClientTest; +import net.dreamlu.iot.mqtt.codec.ByteBufferUtil; +import net.dreamlu.iot.mqtt.core.client.MqttClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * 设备 A,这里默认 APP 应用端 + * + * @author L.cm + */ +public class DeviceA { + + private static final Logger logger = LoggerFactory.getLogger(MqttClientTest.class); + + public static void main(String[] args) { + // 初始化 mqtt 客户端 + MqttClient client = MqttClient.create() + .ip("127.0.0.1") + .port(1883) + .username("admin") + .password("123456") + .connect(); + + client.subQos0("/a/door/open", (topic, payload) -> { + logger.info(topic + '\t' + ByteBufferUtil.toString(payload)); + }); + } + +} diff --git a/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/biz/DeviceB.java b/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/biz/DeviceB.java new file mode 100644 index 0000000000000000000000000000000000000000..d0444c183b698360d9a290ce66b27ec706612d84 --- /dev/null +++ b/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/biz/DeviceB.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & www.net.dreamlu.net). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package net.dreamlu.iot.mqtt.biz; + +import net.dreamlu.iot.mqtt.client.MqttClientTest; +import net.dreamlu.iot.mqtt.codec.ByteBufferUtil; +import net.dreamlu.iot.mqtt.core.client.MqttClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * 设备 B,这里默认 web 端 + * + * @author L.cm + */ +public class DeviceB { + + private static final Logger logger = LoggerFactory.getLogger(MqttClientTest.class); + + public static void main(String[] args) { + // 初始化 mqtt 客户端 + MqttClient client = MqttClient.create() + .ip("127.0.0.1") + .port(1883) + .username("admin") + .password("123456") + .connect(); + + client.subQos0("/a/door/open", (topic, payload) -> { + logger.info(topic + '\t' + ByteBufferUtil.toString(payload)); + }); + } +} diff --git a/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/biz/DeviceC.java b/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/biz/DeviceC.java new file mode 100644 index 0000000000000000000000000000000000000000..c0dbdb7d00dddcf7a92f1d9f06bb18c01ead094a --- /dev/null +++ b/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/biz/DeviceC.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & www.net.dreamlu.net). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package net.dreamlu.iot.mqtt.biz; + +import net.dreamlu.iot.mqtt.core.client.MqttClient; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Timer; +import java.util.TimerTask; + +/** + * 设备 C,每 5 秒上报一个数据 + * + * @author L.cm + */ +public class DeviceC { + + public static void main(String[] args) { + // 初始化 mqtt 客户端 + MqttClient client = MqttClient.create() + .ip("127.0.0.1") + .port(1883) + .username("admin") + .password("123456") + .connect(); + + Timer timer = new Timer(); + timer.schedule(new TimerTask() { + @Override + public void run() { + client.publish("/a/door/open", ByteBuffer.wrap("open".getBytes(StandardCharsets.UTF_8))); + } + }, 5000, 5000); + } + +} diff --git a/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/biz/Server.java b/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/biz/Server.java new file mode 100644 index 0000000000000000000000000000000000000000..9cc1e7d0fd7474805696aa7c193f6ef82e5a9225 --- /dev/null +++ b/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/biz/Server.java @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & www.net.dreamlu.net). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package net.dreamlu.iot.mqtt.biz; + +import net.dreamlu.iot.mqtt.core.server.MqttServer; +import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher; +import net.dreamlu.iot.mqtt.core.server.event.IMqttMessageListener; +import net.dreamlu.iot.mqtt.core.server.model.Message; +import net.dreamlu.iot.mqtt.core.server.support.DefaultMqttMessageDispatcher; + +/** + * 服务端,单纯的做消息转发 + * + * @author L.cm + */ +public class Server { + + /** + * 客户端 A 模拟 APP 端订阅 `/a/door/open`, + * 客户端 B 模拟 web 网页端 mqtt.js 订阅 `/a/door/open`, + * Mqtt 服务端实现 `IMqttMessageListener`,将消息转交给 `AbstractMqttMessageDispatcher`(自定义实现)处理。 + * 客户端 C 定时上报转态给 `/a/door/open` + * 结果:A 和 B 将收到 C 发布的消息,并完成相应的效果展示。 + */ + public static void main(String[] args) { + // 1. 消息转发处理器,可用来实现集群 + IMqttMessageDispatcher messageDispatcher = new DefaultMqttMessageDispatcher(); + // 2. 收到消息,将消息转发出去 + IMqttMessageListener messageListener = (clientId, topic, mqttQoS, payload) -> { + Message message = new Message(); + message.setTopic(topic); + message.setQos(mqttQoS.value()); + message.setPayload(payload.array()); + messageDispatcher.send(message); + }; + // 3. 启动服务 + MqttServer.create() + .ip("0.0.0.0") + .port(1883) + .readBufferSize(512) + .messageDispatcher(messageDispatcher) + .messageListener(messageListener) + .debug() + .start(); + } +}