提交 ede842be 编写于 作者: 浅梦2013's avatar 浅梦2013

回退部分代码,保证低版本兼容性。

上级 39bb58ee
......@@ -72,6 +72,15 @@ public final class MqttServer {
return this.tioServer;
}
/**
* 获取 http、websocket 服务
*
* @return MqttWebServer
*/
public MqttWebServer getWebServer() {
return webServer;
}
/**
* 获取 ServerTioConfig
*
......@@ -81,6 +90,15 @@ public final class MqttServer {
return this.tioServer.getServerTioConfig();
}
/**
* 获取 mqtt 配置
*
* @return MqttServerCreator
*/
public MqttServerCreator getServerCreator() {
return serverCreator;
}
/**
* 发布消息
*
......@@ -244,6 +262,30 @@ public final class MqttServer {
return true;
}
/**
* 获取 ChannelContext
*
* @param clientId clientId
* @return ChannelContext
*/
public ChannelContext getChannelContext(String clientId) {
return Tio.getByBsId(getServerConfig(), clientId);
}
/**
* 服务端主动断开连接
*
* @param clientId clientId
*/
public void close(String clientId) {
Tio.remove(getChannelContext(clientId), "Mqtt server close this connects.");
}
/**
* 启动服务
*
* @return 是否启动
*/
public boolean start() {
// 1. 启动 mqtt tcp
try {
......@@ -262,6 +304,11 @@ public final class MqttServer {
return true;
}
/**
* 停止服务
*
* @return 是否停止
*/
public boolean stop() {
boolean result = this.tioServer.stop();
logger.info("Mqtt tcp server stop result:{}", result);
......
......@@ -76,7 +76,7 @@ public class MqttServerAioListener extends DefaultAioListener {
// 6. 解绑 clientId
Tio.unbindBsId(context);
// 7. 下线事件
notify(context, clientId);
notify(clientId);
}
private void sendWillMessage(ChannelContext context, String clientId) {
......@@ -108,9 +108,9 @@ public class MqttServerAioListener extends DefaultAioListener {
}
}
private void notify(ChannelContext context, String clientId) {
private void notify(String clientId) {
try {
connectStatusListener.offline(context, clientId);
connectStatusListener.offline(clientId);
} catch (Throwable throwable) {
logger.error("Mqtt server clientId:{} offline notify error.", clientId, throwable);
}
......
......@@ -31,7 +31,6 @@ import net.dreamlu.iot.mqtt.core.server.support.DefaultMqttConnectStatusListener
import net.dreamlu.iot.mqtt.core.server.support.DefaultMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.support.DefaultMqttServerAuthHandler;
import net.dreamlu.iot.mqtt.core.server.support.DefaultMqttServerProcessor;
import net.dreamlu.iot.mqtt.core.server.websocket.MqttWsMsgHandler;
import org.tio.core.ssl.SslConfig;
import org.tio.core.stat.IpStatListener;
import org.tio.server.ServerTioConfig;
......@@ -40,13 +39,7 @@ import org.tio.server.intf.ServerAioHandler;
import org.tio.server.intf.ServerAioListener;
import org.tio.utils.hutool.StrUtil;
import org.tio.utils.thread.pool.DefaultThreadFactory;
import org.tio.websocket.common.WsTioUuid;
import org.tio.websocket.server.WsServerAioHandler;
import org.tio.websocket.server.WsServerAioListener;
import org.tio.websocket.server.WsServerConfig;
import org.tio.websocket.server.handler.IWsMsgHandler;
import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;
import java.util.concurrent.ScheduledThreadPoolExecutor;
......
......@@ -16,8 +16,6 @@
package net.dreamlu.iot.mqtt.core.server.event;
import org.tio.core.ChannelContext;
/**
* mqtt 链接状态事件
*
......@@ -28,17 +26,15 @@ public interface IMqttConnectStatusListener {
/**
* 设备上线(连接成功)
*
* @param context ChannelContext
* @param clientId clientId
*/
void online(ChannelContext context, String clientId);
void online(String clientId);
/**
* 设备离线
*
* @param context ChannelContext
* @param clientId clientId
*/
void offline(ChannelContext context, String clientId);
void offline(String clientId);
}
......@@ -17,7 +17,6 @@
package net.dreamlu.iot.mqtt.core.server.event;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import org.tio.core.ChannelContext;
import java.nio.ByteBuffer;
......@@ -32,12 +31,11 @@ public interface IMqttMessageListener {
/**
* 监听到消息
*
* @param context ChannelContext
* @param clientId clientId
* @param topic topic
* @param mqttQoS MqttQoS
* @param payload payload
*/
void onMessage(ChannelContext context, String clientId, String topic, MqttQoS mqttQoS, ByteBuffer payload);
void onMessage(String clientId, String topic, MqttQoS mqttQoS, ByteBuffer payload);
}
......@@ -22,13 +22,13 @@ import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.http.api.code.ResultCode;
import net.dreamlu.iot.mqtt.core.server.http.api.form.BaseForm;
import net.dreamlu.iot.mqtt.core.util.PayloadEncode;
import net.dreamlu.iot.mqtt.core.server.http.api.form.PublishForm;
import net.dreamlu.iot.mqtt.core.server.http.api.form.SubscribeForm;
import net.dreamlu.iot.mqtt.core.server.http.api.result.Result;
import net.dreamlu.iot.mqtt.core.server.http.handler.MqttHttpRoutes;
import net.dreamlu.iot.mqtt.core.server.model.Message;
import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager;
import net.dreamlu.iot.mqtt.core.util.PayloadEncode;
import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse;
import org.tio.http.common.Method;
......
......@@ -19,7 +19,6 @@ package net.dreamlu.iot.mqtt.core.server.support;
import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
/**
* 默认的链接状态监听
......@@ -30,12 +29,12 @@ public class DefaultMqttConnectStatusListener implements IMqttConnectStatusListe
private static final Logger logger = LoggerFactory.getLogger(DefaultMqttConnectStatusListener.class);
@Override
public void online(ChannelContext context, String clientId) {
public void online(String clientId) {
logger.info("Mqtt clientId:{} online.", clientId);
}
@Override
public void offline(ChannelContext context, String clientId) {
public void offline(String clientId) {
logger.info("Mqtt clientId:{} offline.", clientId);
}
}
......@@ -118,7 +118,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
// 8. 返回 ack
connAckByReturnCode(clientId, context, MqttConnectReturnCode.CONNECTION_ACCEPTED);
// 9. 在线状态
connectStatusListener.online(context, clientId);
connectStatusListener.online(clientId);
}
private void connAckByReturnCode(String clientId, ChannelContext context, MqttConnectReturnCode returnCode) {
......@@ -141,10 +141,10 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
logger.debug("Publish - clientId:{} topicName:{} mqttQoS:{} packetId:{}", clientId, topicName, mqttQoS, packetId);
switch (mqttQoS) {
case AT_MOST_ONCE:
invokeListenerForPublish(context, clientId, mqttQoS, topicName, message, fixedHeader.isRetain());
invokeListenerForPublish(clientId, mqttQoS, topicName, message, fixedHeader.isRetain());
break;
case AT_LEAST_ONCE:
invokeListenerForPublish(context, clientId, mqttQoS, topicName, message, fixedHeader.isRetain());
invokeListenerForPublish(clientId, mqttQoS, topicName, message, fixedHeader.isRetain());
if (packetId != -1) {
MqttMessage messageAck = MqttMessageBuilders.pubAck()
.packetId(packetId)
......@@ -215,7 +215,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
MqttFixedHeader incomingFixedHeader = incomingPublish.fixedHeader();
MqttQoS mqttQoS = incomingFixedHeader.qosLevel();
boolean retain = incomingFixedHeader.isRetain();
invokeListenerForPublish(context, clientId, mqttQoS, topicName, incomingPublish, retain);
invokeListenerForPublish(clientId, mqttQoS, topicName, incomingPublish, retain);
pendingQos2Publish.onPubRelReceived();
sessionManager.removePendingQos2Publish(clientId, messageId);
}
......@@ -312,8 +312,8 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
* @param topicName topicName
* @param message MqttPublishMessage
*/
private void invokeListenerForPublish(ChannelContext context, String clientId, MqttQoS mqttQoS,
String topicName, MqttPublishMessage message, boolean isRetain) {
private void invokeListenerForPublish(String clientId, MqttQoS mqttQoS, String topicName,
MqttPublishMessage message, boolean isRetain) {
ByteBuffer payload = message.payload();
// 1. retain 消息逻辑
if (isRetain) {
......@@ -331,7 +331,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
}
}
// 2. 消息发布
messageListener.onMessage(context, clientId, topicName, mqttQoS, payload);
messageListener.onMessage(clientId, topicName, mqttQoS, payload);
}
}
......@@ -40,7 +40,7 @@ public class Server {
// 1. 消息转发处理器,可用来实现集群
IMqttMessageDispatcher messageDispatcher = new DefaultMqttMessageDispatcher();
// 2. 收到消息,将消息转发出去
IMqttMessageListener messageListener = (context, clientId, topic, mqttQoS, payload) -> {
IMqttMessageListener messageListener = (clientId, topic, mqttQoS, payload) -> {
Message message = new Message();
message.setTopic(topic);
message.setQos(mqttQoS.value());
......
......@@ -46,7 +46,7 @@ public class MqttServerTest {
// .maxBytesInMessage(1024 * 100)
// mqtt 3.1 协议会校验 clientId 长度。
// .maxClientIdLength(64)
.messageListener((context, clientId, topic, mqttQoS, payload) -> {
.messageListener((clientId, topic, mqttQoS, payload) -> {
logger.info("clientId:{} topic:{} mqttQoS:{} message:{}", clientId, topic, mqttQoS, ByteBufferUtil.toString(payload));
})
.debug() // 开启 debug 信息日志
......
......@@ -45,7 +45,7 @@ public class MqttServerTest {
.readBufferSize(512)
// 关闭 websocket,避免和 spring boot 启动的冲突
.websocketEnable(false)
.messageListener((context, clientId, topic, mqttQoS, payload) -> {
.messageListener((clientId, topic, mqttQoS, payload) -> {
logger.info("clientId:{} topic:{} mqttQoS:{} message:{}", clientId, topic, mqttQoS, ByteBufferUtil.toString(payload));
})
.debug() // 开启 debug 信息日志
......
......@@ -6,7 +6,6 @@ import net.dreamlu.iot.mqtt.core.server.event.IMqttMessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.tio.core.ChannelContext;
import java.nio.ByteBuffer;
......@@ -18,7 +17,7 @@ public class MqttServerMessageListener implements IMqttMessageListener {
private static final Logger logger = LoggerFactory.getLogger(MqttServerMessageListener.class);
@Override
public void onMessage(ChannelContext context, String clientId, String topic, MqttQoS mqttQoS, ByteBuffer byteBuffer) {
public void onMessage(String clientId, String topic, MqttQoS mqttQoS, ByteBuffer byteBuffer) {
logger.info("clientId:{} topic:{} mqttQoS:{} message:{}", clientId, topic, mqttQoS, ByteBufferUtil.toString(byteBuffer));
}
}
......@@ -19,7 +19,6 @@ package net.dreamlu.iot.mqtt.spring.client;
import net.dreamlu.iot.mqtt.core.client.IMqttClientConnectListener;
import net.dreamlu.iot.mqtt.core.client.MqttClient;
import net.dreamlu.iot.mqtt.core.client.MqttClientCreator;
import net.dreamlu.iot.mqtt.core.client.MqttWillMessage;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
......
......@@ -145,4 +145,24 @@ public class MqttServerTemplate {
public boolean publishAll(String topic, ByteBuffer payload, MqttQoS qos, boolean retain) {
return mqttServer.publishAll(topic, payload, qos, retain);
}
/**
* 获取 ChannelContext
*
* @param clientId clientId
* @return ChannelContext
*/
public ChannelContext getChannelContext(String clientId) {
return mqttServer.getChannelContext(clientId);
}
/**
* 服务端主动断开连接
*
* @param clientId clientId
*/
public void close(String clientId) {
mqttServer.close(clientId);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册