提交 5b657864 编写于 作者: 浅梦2013's avatar 浅梦2013

🐛 修复默认的消息转发器逻辑。

上级 cd76f2ad
...@@ -58,13 +58,17 @@ public abstract class AbstractMqttMessageDispatcher implements IMqttMessageDispa ...@@ -58,13 +58,17 @@ public abstract class AbstractMqttMessageDispatcher implements IMqttMessageDispa
@Override @Override
public boolean send(Message message) { public boolean send(Message message) {
Objects.requireNonNull(mqttServer, "MqttServer require not Null."); Objects.requireNonNull(mqttServer, "MqttServer require not Null.");
// 1. 先发送到本服务
ByteBuffer payload = ByteBuffer.wrap(message.getPayload());
MqttQoS qoS = MqttQoS.valueOf(message.getQos());
mqttServer.publishAll(message.getTopic(), payload, qoS);
return sendAll(message); return sendAll(message);
} }
@Override @Override
public boolean send(String clientId, Message message) { public boolean send(String clientId, Message message) {
Objects.requireNonNull(mqttServer, "MqttServer require not Null."); Objects.requireNonNull(mqttServer, "MqttServer require not Null.");
// 判断如果 clientId 就在本服务 // 1. 判断如果 clientId 就在本服务,存在则发送
ServerTioConfig config = this.mqttServer.getServerConfig(); ServerTioConfig config = this.mqttServer.getServerConfig();
ChannelContext context = Tio.getByBsId(config, clientId); ChannelContext context = Tio.getByBsId(config, clientId);
if (context != null) { if (context != null) {
......
...@@ -16,12 +16,9 @@ ...@@ -16,12 +16,9 @@
package net.dreamlu.iot.mqtt.core.server.support; package net.dreamlu.iot.mqtt.core.server.support;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.server.dispatcher.AbstractMqttMessageDispatcher; import net.dreamlu.iot.mqtt.core.server.dispatcher.AbstractMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.model.Message; import net.dreamlu.iot.mqtt.core.server.model.Message;
import java.nio.ByteBuffer;
/** /**
* 默认的消息转发器 * 默认的消息转发器
* *
...@@ -31,16 +28,12 @@ public class DefaultMqttMessageDispatcher extends AbstractMqttMessageDispatcher ...@@ -31,16 +28,12 @@ public class DefaultMqttMessageDispatcher extends AbstractMqttMessageDispatcher
@Override @Override
public boolean sendAll(Message message) { public boolean sendAll(Message message) {
ByteBuffer payload = ByteBuffer.wrap(message.getPayload()); return true;
MqttQoS qoS = MqttQoS.valueOf(message.getQos());
return mqttServer.publishAll(message.getTopic(), payload, qoS);
} }
@Override @Override
public boolean sendTo(String clientId, Message message) { public boolean sendTo(String clientId, Message message) {
ByteBuffer payload = ByteBuffer.wrap(message.getPayload()); return true;
MqttQoS qoS = MqttQoS.valueOf(message.getQos());
return mqttServer.publish(clientId, message.getTopic(), payload, qoS);
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册