From 9005449869bab2389d8b9a50c6d1c78570f335ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B5=85=E6=A2=A6?= <1101766085@qq.com> Date: Tue, 10 May 2022 12:29:46 +0800 Subject: [PATCH] =?UTF-8?q?:sparkles:=20=E5=BC=80=E5=A7=8B=201.3.3-SNAPSHO?= =?UTF-8?q?T?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../RedisMqttMessageExchangeReceiver.java | 77 +------------ .../cluster/MqttClusterMessageListener.java | 109 ++++++++++++++++++ .../core/server/cluster/package-info.java | 1 - pom.xml | 2 +- 4 files changed, 114 insertions(+), 75 deletions(-) create mode 100644 mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/cluster/MqttClusterMessageListener.java delete mode 100644 mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/cluster/package-info.java diff --git a/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/cluster/RedisMqttMessageExchangeReceiver.java b/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/cluster/RedisMqttMessageExchangeReceiver.java index e46962e..3eb9f00 100644 --- a/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/cluster/RedisMqttMessageExchangeReceiver.java +++ b/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/cluster/RedisMqttMessageExchangeReceiver.java @@ -16,20 +16,15 @@ package net.dreamlu.iot.mqtt.broker.cluster; -import net.dreamlu.iot.mqtt.codec.MqttQoS; import net.dreamlu.iot.mqtt.core.server.MqttServer; -import net.dreamlu.iot.mqtt.core.server.enums.MessageType; +import net.dreamlu.iot.mqtt.core.server.cluster.MqttClusterMessageListener; import net.dreamlu.iot.mqtt.core.server.model.Message; import net.dreamlu.iot.mqtt.core.server.serializer.IMessageSerializer; -import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager; -import net.dreamlu.mica.core.utils.StringUtil; import org.springframework.beans.factory.InitializingBean; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.RedisSerializer; -import org.tio.core.ChannelContext; -import org.tio.core.Tio; import java.util.Objects; @@ -42,9 +37,7 @@ public class RedisMqttMessageExchangeReceiver implements MessageListener, Initia private final RedisTemplate redisTemplate; private final IMessageSerializer messageSerializer; private final String channel; - private final MqttServer mqttServer; - private final String nodeName; - private final IMqttSessionManager sessionManager; + private final MqttClusterMessageListener clusterMessageListener; public RedisMqttMessageExchangeReceiver(RedisTemplate redisTemplate, IMessageSerializer messageSerializer, @@ -53,9 +46,7 @@ public class RedisMqttMessageExchangeReceiver implements MessageListener, Initia this.redisTemplate = redisTemplate; this.messageSerializer = messageSerializer; this.channel = Objects.requireNonNull(channel, "Redis pub/sub channel is null."); - this.mqttServer = mqttServer; - this.nodeName = mqttServer.getServerCreator().getNodeName(); - this.sessionManager = mqttServer.getServerCreator().getSessionManager(); + this.clusterMessageListener = new MqttClusterMessageListener(mqttServer); } @Override @@ -66,67 +57,7 @@ public class RedisMqttMessageExchangeReceiver implements MessageListener, Initia if (mqttMessage == null) { return; } - messageProcessing(mqttMessage); - } - - private void messageProcessing(Message message) { - MessageType messageType = message.getMessageType(); - String topic = message.getTopic(); - if (MessageType.CONNECT == messageType) { - // 1. 如果一个 clientId 在集群多个服务上连接时断开其他的 - String node = message.getNode(); - if (nodeName.equals(node)) { - return; - } - String clientId = message.getClientId(); - ChannelContext context = Tio.getByBsId(mqttServer.getServerConfig(), clientId); - if (context != null) { - Tio.remove(context, String.format("clientId:[%s] now bind on mqtt node:[%s]", clientId, node)); - } - } else if (MessageType.SUBSCRIBE == messageType) { - // http api 订阅广播 - String formClientId = message.getFromClientId(); - ChannelContext context = mqttServer.getChannelContext(formClientId); - if (context != null) { - sessionManager.addSubscribe(topic, formClientId, message.getQos()); - } - } else if (MessageType.UNSUBSCRIBE == messageType) { - // http api 取消订阅广播 - String formClientId = message.getFromClientId(); - ChannelContext context = mqttServer.getChannelContext(formClientId); - if (context != null) { - sessionManager.removeSubscribe(topic, formClientId); - } - } else if (MessageType.UP_STREAM == messageType) { - // mqtt 上行消息,需要发送到对应的监听的客户端 - sendToClient(topic, message); - } else if (MessageType.DOWN_STREAM == messageType) { - // http rest api 下行消息也会转发到此 - sendToClient(topic, message); - } else if (MessageType.DISCONNECT == messageType) { - String clientId = message.getClientId(); - ChannelContext context = mqttServer.getChannelContext(clientId); - if (context != null) { - Tio.remove(context, "Mqtt server delete clients:" + clientId); - } - } - } - - /** - * 发送消息到客户端 - * - * @param topic topic - * @param message Message - */ - private void sendToClient(String topic, Message message) { - // 客户端id - String clientId = message.getClientId(); - MqttQoS mqttQoS = MqttQoS.valueOf(message.getQos()); - if (StringUtil.isBlank(clientId)) { - mqttServer.publishAll(topic, message.getPayload(), mqttQoS, message.isRetain()); - } else { - mqttServer.publish(clientId, topic, message.getPayload(), mqttQoS, message.isRetain()); - } + clusterMessageListener.onMessage(mqttMessage); } @Override diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/cluster/MqttClusterMessageListener.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/cluster/MqttClusterMessageListener.java new file mode 100644 index 0000000..8ab4d06 --- /dev/null +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/cluster/MqttClusterMessageListener.java @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package net.dreamlu.iot.mqtt.core.server.cluster; + +import net.dreamlu.iot.mqtt.codec.MqttQoS; +import net.dreamlu.iot.mqtt.core.server.MqttServer; +import net.dreamlu.iot.mqtt.core.server.enums.MessageType; +import net.dreamlu.iot.mqtt.core.server.model.Message; +import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager; +import org.tio.core.ChannelContext; +import org.tio.core.Tio; +import org.tio.utils.hutool.StrUtil; + +/** + * mqtt 集群消息处理 + * + * @author L.cm + */ +public class MqttClusterMessageListener { + private final String nodeName; + private final IMqttSessionManager sessionManager; + private final MqttServer mqttServer; + + public MqttClusterMessageListener(MqttServer mqttServer) { + this.nodeName = mqttServer.getServerCreator().getNodeName(); + this.sessionManager = mqttServer.getServerCreator().getSessionManager(); + this.mqttServer = mqttServer; + } + + /** + * 来着集群的消息 + * + * @param message Message + */ + public void onMessage(Message message) { + MessageType messageType = message.getMessageType(); + String topic = message.getTopic(); + if (MessageType.CONNECT == messageType) { + // 1. 如果一个 clientId 在集群多个服务上连接时断开其他的 + String node = message.getNode(); + if (nodeName.equals(node)) { + return; + } + String clientId = message.getClientId(); + ChannelContext context = Tio.getByBsId(mqttServer.getServerConfig(), clientId); + if (context != null) { + Tio.remove(context, String.format("clientId:[%s] now bind on mqtt node:[%s]", clientId, node)); + } + } else if (MessageType.SUBSCRIBE == messageType) { + // http api 订阅广播 + String formClientId = message.getFromClientId(); + ChannelContext context = mqttServer.getChannelContext(formClientId); + if (context != null) { + sessionManager.addSubscribe(topic, formClientId, message.getQos()); + } + } else if (MessageType.UNSUBSCRIBE == messageType) { + // http api 取消订阅广播 + String formClientId = message.getFromClientId(); + ChannelContext context = mqttServer.getChannelContext(formClientId); + if (context != null) { + sessionManager.removeSubscribe(topic, formClientId); + } + } else if (MessageType.UP_STREAM == messageType) { + // mqtt 上行消息,需要发送到对应的监听的客户端 + sendToClient(topic, message); + } else if (MessageType.DOWN_STREAM == messageType) { + // http rest api 下行消息也会转发到此 + sendToClient(topic, message); + } else if (MessageType.DISCONNECT == messageType) { + String clientId = message.getClientId(); + ChannelContext context = mqttServer.getChannelContext(clientId); + if (context != null) { + Tio.remove(context, "Mqtt server delete clients:" + clientId); + } + } + } + + /** + * 发送消息到客户端 + * + * @param topic topic + * @param message Message + */ + private void sendToClient(String topic, Message message) { + // 客户端id + String clientId = message.getClientId(); + MqttQoS mqttQoS = MqttQoS.valueOf(message.getQos()); + if (StrUtil.isBlank(clientId)) { + mqttServer.publishAll(topic, message.getPayload(), mqttQoS, message.isRetain()); + } else { + mqttServer.publish(clientId, topic, message.getPayload(), mqttQoS, message.isRetain()); + } + } + +} diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/cluster/package-info.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/cluster/package-info.java deleted file mode 100644 index f64c4bb..0000000 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/cluster/package-info.java +++ /dev/null @@ -1 +0,0 @@ -package net.dreamlu.iot.mqtt.core.server.cluster; diff --git a/pom.xml b/pom.xml index 883215f..bd24abd 100644 --- a/pom.xml +++ b/pom.xml @@ -13,7 +13,7 @@ - 1.3.2 + 1.3.3-SNAPSHOT 1.8 UTF-8 -- GitLab