提交 90054498 编写于 作者: 浅梦2013's avatar 浅梦2013

开始 1.3.3-SNAPSHOT

上级 0dc1de9e
......@@ -16,20 +16,15 @@
package net.dreamlu.iot.mqtt.broker.cluster;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.server.MqttServer;
import net.dreamlu.iot.mqtt.core.server.enums.MessageType;
import net.dreamlu.iot.mqtt.core.server.cluster.MqttClusterMessageListener;
import net.dreamlu.iot.mqtt.core.server.model.Message;
import net.dreamlu.iot.mqtt.core.server.serializer.IMessageSerializer;
import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager;
import net.dreamlu.mica.core.utils.StringUtil;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import java.util.Objects;
......@@ -42,9 +37,7 @@ public class RedisMqttMessageExchangeReceiver implements MessageListener, Initia
private final RedisTemplate<String, Object> redisTemplate;
private final IMessageSerializer messageSerializer;
private final String channel;
private final MqttServer mqttServer;
private final String nodeName;
private final IMqttSessionManager sessionManager;
private final MqttClusterMessageListener clusterMessageListener;
public RedisMqttMessageExchangeReceiver(RedisTemplate<String, Object> redisTemplate,
IMessageSerializer messageSerializer,
......@@ -53,9 +46,7 @@ public class RedisMqttMessageExchangeReceiver implements MessageListener, Initia
this.redisTemplate = redisTemplate;
this.messageSerializer = messageSerializer;
this.channel = Objects.requireNonNull(channel, "Redis pub/sub channel is null.");
this.mqttServer = mqttServer;
this.nodeName = mqttServer.getServerCreator().getNodeName();
this.sessionManager = mqttServer.getServerCreator().getSessionManager();
this.clusterMessageListener = new MqttClusterMessageListener(mqttServer);
}
@Override
......@@ -66,67 +57,7 @@ public class RedisMqttMessageExchangeReceiver implements MessageListener, Initia
if (mqttMessage == null) {
return;
}
messageProcessing(mqttMessage);
}
private void messageProcessing(Message message) {
MessageType messageType = message.getMessageType();
String topic = message.getTopic();
if (MessageType.CONNECT == messageType) {
// 1. 如果一个 clientId 在集群多个服务上连接时断开其他的
String node = message.getNode();
if (nodeName.equals(node)) {
return;
}
String clientId = message.getClientId();
ChannelContext context = Tio.getByBsId(mqttServer.getServerConfig(), clientId);
if (context != null) {
Tio.remove(context, String.format("clientId:[%s] now bind on mqtt node:[%s]", clientId, node));
}
} else if (MessageType.SUBSCRIBE == messageType) {
// http api 订阅广播
String formClientId = message.getFromClientId();
ChannelContext context = mqttServer.getChannelContext(formClientId);
if (context != null) {
sessionManager.addSubscribe(topic, formClientId, message.getQos());
}
} else if (MessageType.UNSUBSCRIBE == messageType) {
// http api 取消订阅广播
String formClientId = message.getFromClientId();
ChannelContext context = mqttServer.getChannelContext(formClientId);
if (context != null) {
sessionManager.removeSubscribe(topic, formClientId);
}
} else if (MessageType.UP_STREAM == messageType) {
// mqtt 上行消息,需要发送到对应的监听的客户端
sendToClient(topic, message);
} else if (MessageType.DOWN_STREAM == messageType) {
// http rest api 下行消息也会转发到此
sendToClient(topic, message);
} else if (MessageType.DISCONNECT == messageType) {
String clientId = message.getClientId();
ChannelContext context = mqttServer.getChannelContext(clientId);
if (context != null) {
Tio.remove(context, "Mqtt server delete clients:" + clientId);
}
}
}
/**
* 发送消息到客户端
*
* @param topic topic
* @param message Message
*/
private void sendToClient(String topic, Message message) {
// 客户端id
String clientId = message.getClientId();
MqttQoS mqttQoS = MqttQoS.valueOf(message.getQos());
if (StringUtil.isBlank(clientId)) {
mqttServer.publishAll(topic, message.getPayload(), mqttQoS, message.isRetain());
} else {
mqttServer.publish(clientId, topic, message.getPayload(), mqttQoS, message.isRetain());
}
clusterMessageListener.onMessage(mqttMessage);
}
@Override
......
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.server.cluster;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.server.MqttServer;
import net.dreamlu.iot.mqtt.core.server.enums.MessageType;
import net.dreamlu.iot.mqtt.core.server.model.Message;
import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.utils.hutool.StrUtil;
/**
* mqtt 集群消息处理
*
* @author L.cm
*/
public class MqttClusterMessageListener {
private final String nodeName;
private final IMqttSessionManager sessionManager;
private final MqttServer mqttServer;
public MqttClusterMessageListener(MqttServer mqttServer) {
this.nodeName = mqttServer.getServerCreator().getNodeName();
this.sessionManager = mqttServer.getServerCreator().getSessionManager();
this.mqttServer = mqttServer;
}
/**
* 来着集群的消息
*
* @param message Message
*/
public void onMessage(Message message) {
MessageType messageType = message.getMessageType();
String topic = message.getTopic();
if (MessageType.CONNECT == messageType) {
// 1. 如果一个 clientId 在集群多个服务上连接时断开其他的
String node = message.getNode();
if (nodeName.equals(node)) {
return;
}
String clientId = message.getClientId();
ChannelContext context = Tio.getByBsId(mqttServer.getServerConfig(), clientId);
if (context != null) {
Tio.remove(context, String.format("clientId:[%s] now bind on mqtt node:[%s]", clientId, node));
}
} else if (MessageType.SUBSCRIBE == messageType) {
// http api 订阅广播
String formClientId = message.getFromClientId();
ChannelContext context = mqttServer.getChannelContext(formClientId);
if (context != null) {
sessionManager.addSubscribe(topic, formClientId, message.getQos());
}
} else if (MessageType.UNSUBSCRIBE == messageType) {
// http api 取消订阅广播
String formClientId = message.getFromClientId();
ChannelContext context = mqttServer.getChannelContext(formClientId);
if (context != null) {
sessionManager.removeSubscribe(topic, formClientId);
}
} else if (MessageType.UP_STREAM == messageType) {
// mqtt 上行消息,需要发送到对应的监听的客户端
sendToClient(topic, message);
} else if (MessageType.DOWN_STREAM == messageType) {
// http rest api 下行消息也会转发到此
sendToClient(topic, message);
} else if (MessageType.DISCONNECT == messageType) {
String clientId = message.getClientId();
ChannelContext context = mqttServer.getChannelContext(clientId);
if (context != null) {
Tio.remove(context, "Mqtt server delete clients:" + clientId);
}
}
}
/**
* 发送消息到客户端
*
* @param topic topic
* @param message Message
*/
private void sendToClient(String topic, Message message) {
// 客户端id
String clientId = message.getClientId();
MqttQoS mqttQoS = MqttQoS.valueOf(message.getQos());
if (StrUtil.isBlank(clientId)) {
mqttServer.publishAll(topic, message.getPayload(), mqttQoS, message.isRetain());
} else {
mqttServer.publish(clientId, topic, message.getPayload(), mqttQoS, message.isRetain());
}
}
}
......@@ -13,7 +13,7 @@
<properties>
<!-- mica-mqtt version -->
<revision>1.3.2</revision>
<revision>1.3.3-SNAPSHOT</revision>
<!-- java version -->
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册