提交 5e05576b 编写于 作者: 如梦技术's avatar 如梦技术 🐛

完善 mica-mqtt, 待续

上级 cd257261
......@@ -565,6 +565,11 @@ public final class MqttMessageBuilders {
return this;
}
public SubAckBuilder addGrantedQosList(List<MqttQoS> qosList) {
this.grantedQosList.addAll(qosList);
return this;
}
public MqttSubAckMessage build() {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
......
......@@ -22,11 +22,11 @@ public enum MqttQoS {
*/
AT_MOST_ONCE(0),
/**
* QoS level 1 都要在可变头部中附加一个16位的消息ID,SUBSCRIBE 和 UNSUBSCRIBE 消息使用 QoS level 1。
* QoS level 1 至少一次,都要在可变头部中附加一个16位的消息ID,SUBSCRIBE 和 UNSUBSCRIBE 消息使用 QoS level 1。
*/
AT_LEAST_ONCE(1),
/**
* QoS level 2 仅仅在 PUBLISH 类型消息中出现,要求在可变头部中要附加消息ID。
* QoS level 2 确保只有一次,仅仅在 PUBLISH 类型消息中出现,要求在可变头部中要附加消息ID。
*/
EXACTLY_ONCE(2),
/**
......
......@@ -17,6 +17,8 @@
package net.dreamlu.iot.mqtt.core.client;
import net.dreamlu.iot.mqtt.codec.*;
import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish;
import net.dreamlu.iot.mqtt.core.common.MqttSubscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
......@@ -111,7 +113,6 @@ public class DefaultMqttClientProcessor implements MqttClientProcessor {
MqttMessage pubRecMessage = new MqttMessage(fixedHeader, MqttMessageIdVariableHeader.from(packetId));
MqttPendingQos2Publish pendingQos2Publish = new MqttPendingQos2Publish(message, pubRecMessage);
subscriptionManager.addPendingQos2Publish(packetId, pendingQos2Publish);
message.payload().clear();
pendingQos2Publish.startPubRecRetransmitTimer(executor, msg -> Tio.send(context, msg));
}
break;
......
......@@ -23,7 +23,6 @@ import org.slf4j.LoggerFactory;
import org.tio.client.ClientChannelContext;
import org.tio.client.TioClient;
import org.tio.core.Tio;
import org.tio.utils.thread.pool.DefaultThreadFactory;
import java.nio.ByteBuffer;
import java.util.concurrent.ScheduledThreadPoolExecutor;
......@@ -122,7 +121,7 @@ public final class MqttClient {
int messageId = MqttClientMessageId.getId();
MqttUnsubscribeMessage message = MqttMessageBuilders.unsubscribe()
.addTopicFilter(topicFilter)
.messageId(MqttClientMessageId.getId())
.messageId(messageId)
.build();
MqttPendingUnSubscription pendingUnSubscription = new MqttPendingUnSubscription(topicFilter, message);
Boolean result = Tio.send(context, message);
......@@ -178,7 +177,8 @@ public final class MqttClient {
* @return 是否发送成功
*/
public Boolean publish(String topic, ByteBuffer payload, MqttQoS qos, boolean retain) {
int messageId = MqttClientMessageId.getId();
boolean isHighLevelQoS = MqttQoS.AT_LEAST_ONCE == qos || MqttQoS.EXACTLY_ONCE == qos;
int messageId = isHighLevelQoS ? MqttClientMessageId.getId() : -1;
MqttPublishMessage message = MqttMessageBuilders.publish()
.topicName(topic)
.payload(payload)
......@@ -189,7 +189,7 @@ public final class MqttClient {
MqttPendingPublish pendingPublish = new MqttPendingPublish(payload, message, qos);
Boolean result = Tio.send(context, message);
logger.debug("MQTT publish topic:{} qos:{} retain:{} result:{}", topic, qos, retain, result);
if (MqttQoS.AT_LEAST_ONCE == qos || MqttQoS.EXACTLY_ONCE == qos) {
if (isHighLevelQoS) {
subscriptionManager.addPendingPublish(messageId, pendingPublish);
pendingPublish.startPublishRetransmissionTimer(executor, msg -> Tio.send(context, msg));
}
......@@ -225,7 +225,10 @@ public final class MqttClient {
public boolean stop() {
// 先断开连接
this.disconnect();
return tioClient.stop();
boolean result = tioClient.stop();
logger.info("MqttClient stop result:{}", result);
this.executor.shutdown();
return result;
}
/**
......
......@@ -20,6 +20,7 @@ import net.dreamlu.iot.mqtt.codec.MqttMessageBuilders;
import net.dreamlu.iot.mqtt.codec.MqttProperties;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.codec.MqttSubscribeMessage;
import net.dreamlu.iot.mqtt.core.common.MqttSubscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.client.DefaultClientAioListener;
......@@ -85,22 +86,22 @@ public class MqttClientAioListener extends DefaultClientAioListener {
// 5. 发送 mqtt 连接消息
Boolean result = Tio.send(context, builder.build());
logger.info("MqttClient reconnect send connect result:{}", result);
// 6. 发送重新订阅
// 6. 重连时发送重新订阅
reSendSubscription(context);
}
}
private void reSendSubscription(ChannelContext context) {
List<MqttSubscription> subscriptionList = subscriptionManager.getAndCleanSubscription();
for (MqttSubscription mqttSubscription : subscriptionList) {
for (MqttSubscription subscription : subscriptionList) {
int messageId = MqttClientMessageId.getId();
MqttQoS mqttQoS = mqttSubscription.getMqttQoS();
String topicFilter = mqttSubscription.getTopicFilter();
MqttQoS mqttQoS = subscription.getMqttQoS();
String topicFilter = subscription.getTopicFilter();
MqttSubscribeMessage message = MqttMessageBuilders.subscribe()
.addSubscription(mqttQoS, topicFilter)
.messageId(messageId)
.build();
MqttPendingSubscription pendingSubscription = new MqttPendingSubscription(mqttQoS, topicFilter, mqttSubscription.getListener(), message);
MqttPendingSubscription pendingSubscription = new MqttPendingSubscription(mqttQoS, topicFilter, subscription.getListener(), message);
Boolean result = Tio.send(context, message);
logger.info("MQTT reconnect subscribe topicFilter:{} mqttQoS:{} messageId:{} result:{}", topicFilter, mqttQoS, messageId, result);
pendingSubscription.startRetransmitTimer(executor, (msg) -> Tio.send(context, message));
......
......@@ -16,6 +16,8 @@
package net.dreamlu.iot.mqtt.core.client;
import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish;
import net.dreamlu.iot.mqtt.core.common.MqttSubscription;
import net.dreamlu.iot.mqtt.core.util.MultiValueMap;
import java.util.*;
......
......@@ -3,6 +3,7 @@ package net.dreamlu.iot.mqtt.core.client;
import net.dreamlu.iot.mqtt.codec.MqttMessage;
import net.dreamlu.iot.mqtt.codec.MqttPublishMessage;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.common.RetryProcessor;
import java.nio.ByteBuffer;
import java.util.concurrent.ScheduledThreadPoolExecutor;
......
......@@ -5,6 +5,8 @@ import net.dreamlu.iot.mqtt.codec.MqttMessage;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.codec.MqttSubscribeMessage;
import net.dreamlu.iot.mqtt.core.common.MqttMessageListener;
import net.dreamlu.iot.mqtt.core.common.MqttSubscription;
import net.dreamlu.iot.mqtt.core.common.RetryProcessor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.function.Consumer;
......
......@@ -2,6 +2,7 @@ package net.dreamlu.iot.mqtt.core.client;
import net.dreamlu.iot.mqtt.codec.MqttMessage;
import net.dreamlu.iot.mqtt.codec.MqttUnsubscribeMessage;
import net.dreamlu.iot.mqtt.core.common.RetryProcessor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.function.Consumer;
......
package net.dreamlu.iot.mqtt.core.client;
package net.dreamlu.iot.mqtt.core.common;
import net.dreamlu.iot.mqtt.codec.MqttMessage;
import net.dreamlu.iot.mqtt.codec.MqttPublishMessage;
......@@ -9,27 +9,26 @@ import java.util.function.Consumer;
/**
* MqttPendingPublish,参考于 netty-mqtt-client
*/
final class MqttPendingQos2Publish {
public final class MqttPendingQos2Publish {
private final MqttPublishMessage incomingPublish;
private final RetryProcessor<MqttMessage> retransmissionHandler = new RetryProcessor<>();
MqttPendingQos2Publish(MqttPublishMessage incomingPublish, MqttMessage originalMessage) {
public MqttPendingQos2Publish(MqttPublishMessage incomingPublish, MqttMessage originalMessage) {
this.incomingPublish = incomingPublish;
this.retransmissionHandler.setOriginalMessage(originalMessage);
}
MqttPublishMessage getIncomingPublish() {
public MqttPublishMessage getIncomingPublish() {
return incomingPublish;
}
void startPubRecRetransmitTimer(ScheduledThreadPoolExecutor executor, Consumer<MqttMessage> sendPacket) {
public void startPubRecRetransmitTimer(ScheduledThreadPoolExecutor executor, Consumer<MqttMessage> sendPacket) {
this.retransmissionHandler.setHandle((fixedHeader, originalMessage) ->
sendPacket.accept(new MqttMessage(fixedHeader, originalMessage.variableHeader())));
this.retransmissionHandler.start(executor);
}
void onPubRelReceived() {
public void onPubRelReceived() {
this.retransmissionHandler.stop();
}
......
......@@ -14,11 +14,11 @@
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.client;
package net.dreamlu.iot.mqtt.core.common;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.common.MqttMessageListener;
import java.io.Serializable;
import java.util.Objects;
import java.util.regex.Pattern;
......@@ -27,7 +27,7 @@ import java.util.regex.Pattern;
*
* @author L.cm
*/
final class MqttSubscription {
public final class MqttSubscription implements Serializable {
private final String topicFilter;
private final MqttQoS mqttQoS;
private final Pattern topicRegex;
......@@ -54,7 +54,7 @@ final class MqttSubscription {
return listener;
}
boolean matches(String topic) {
public boolean matches(String topic) {
return this.topicRegex.matcher(topic).matches();
}
......
package net.dreamlu.iot.mqtt.core.client;
package net.dreamlu.iot.mqtt.core.common;
import net.dreamlu.iot.mqtt.codec.MqttFixedHeader;
......@@ -15,14 +15,14 @@ import java.util.function.BiConsumer;
*
* @param <T> MqttMessage
*/
final class RetryProcessor<T extends MqttMessage> {
public final class RetryProcessor<T extends MqttMessage> {
private ScheduledFuture<?> timer;
private int timeout = 10;
private BiConsumer<MqttFixedHeader, T> handler;
private T originalMessage;
void start(ScheduledThreadPoolExecutor executor) {
public void start(ScheduledThreadPoolExecutor executor) {
Objects.requireNonNull(executor, "RetryProcessor executor is null.");
Objects.requireNonNull(this.handler, "RetryProcessor handler is null.");
this.timeout = 10;
......@@ -38,17 +38,17 @@ final class RetryProcessor<T extends MqttMessage> {
}, timeout, TimeUnit.SECONDS);
}
void stop() {
public void stop() {
if (this.timer != null) {
this.timer.cancel(true);
}
}
void setHandle(BiConsumer<MqttFixedHeader, T> runnable) {
public void setHandle(BiConsumer<MqttFixedHeader, T> runnable) {
this.handler = runnable;
}
void setOriginalMessage(T originalMessage) {
public void setOriginalMessage(T originalMessage) {
this.originalMessage = originalMessage;
}
......
package net.dreamlu.iot.mqtt.core.server;
/**
* mqtt 服务端,认证处理器
*
* @author L.cm
*/
public interface IMqttAuthHandler {
/**
* 认证
*
* @param clientId 客户端 ID
* @param userName 用户名
* @param password 密码
* @return 是否认证成功
*/
boolean authenticate(String clientId, String userName, String password);
}
package net.dreamlu.iot.mqtt.core.server;
/**
* 服务端消息id
*
* @author L.cm
*/
public interface IMqttMessageIdGenerator {
/**
* 获取消息 id
*
* @return 消息id
*/
int getId();
}
package net.dreamlu.iot.mqtt.core.server;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.common.MqttSubscription;
import java.util.List;
/**
* mqtt 服务端 订阅管理
*
* @author L.cm
*/
public interface IMqttSubManager {
/**
* 注册订阅
*
* @param subscription 订阅信息
*/
void register(MqttSubscription subscription);
/**
* 获取匹配的订阅
*
* @param topicName topicName
* @param mqttQoS MqttQoS
* @return 订阅信息
*/
List<MqttSubscription> getMatchedSubscription(String topicName, MqttQoS mqttQoS);
/**
* 清理
*/
void clean();
}
......@@ -16,10 +16,20 @@
package net.dreamlu.iot.mqtt.core.server;
import net.dreamlu.iot.mqtt.codec.MqttMessageBuilders;
import net.dreamlu.iot.mqtt.codec.MqttPublishMessage;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.server.ServerTioConfig;
import org.tio.server.TioServer;
import org.tio.utils.lock.SetWithLock;
import java.nio.ByteBuffer;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
/**
* mqtt 服务端
......@@ -29,9 +39,15 @@ import org.tio.server.TioServer;
public final class MqttServer {
private static final Logger logger = LoggerFactory.getLogger(MqttServer.class);
private final TioServer tioServer;
private final IMqttMessageIdGenerator messageIdGenerator;
private final ScheduledThreadPoolExecutor executor;
MqttServer(TioServer tioServer) {
MqttServer(TioServer tioServer,
IMqttMessageIdGenerator messageIdGenerator,
ScheduledThreadPoolExecutor executor) {
this.tioServer = tioServer;
this.messageIdGenerator = messageIdGenerator;
this.executor = executor;
}
public static MqttServerCreator create() {
......@@ -47,9 +63,154 @@ public final class MqttServer {
return this.tioServer.getServerTioConfig();
}
/**
* 发布消息
*
* @param clientId clientId
* @param topic topic
* @param payload 消息体
* @return 是否发送成功
*/
public Boolean publish(String clientId, String topic, ByteBuffer payload) {
return publish(clientId, topic, payload, MqttQoS.AT_MOST_ONCE);
}
/**
* 发布消息
*
* @param clientId clientId
* @param topic topic
* @param payload 消息体
* @param qos MqttQoS
* @return 是否发送成功
*/
public Boolean publish(String clientId, String topic, ByteBuffer payload, MqttQoS qos) {
return publish(clientId, topic, payload, qos, false);
}
/**
* 发布消息
*
* @param clientId clientId
* @param topic topic
* @param payload 消息体
* @param retain 是否在服务器上保留消息
* @return 是否发送成功
*/
public Boolean publish(String clientId, String topic, ByteBuffer payload, boolean retain) {
return publish(clientId, topic, payload, MqttQoS.AT_MOST_ONCE, retain);
}
/**
* 发布消息
*
* @param clientId clientId
* @param topic topic
* @param payload 消息体
* @param qos MqttQoS
* @param retain 是否在服务器上保留消息
* @return 是否发送成功
*/
public Boolean publish(String clientId, String topic, ByteBuffer payload, MqttQoS qos, boolean retain) {
ChannelContext context = Tio.getByBsId(getServerConfig(), clientId);
if (context == null) {
logger.warn("Mqtt publish to clientId:{} ChannelContext is null May be disconnected.", clientId);
return false;
}
return publish(context, topic, payload, qos, retain);
}
/**
* 发布消息
*
* @param context ChannelContext
* @param topic topic
* @param payload 消息体
* @param qos MqttQoS
* @param retain 是否在服务器上保留消息
* @return 是否发送成功
*/
private Boolean publish(ChannelContext context, String topic, ByteBuffer payload, MqttQoS qos, boolean retain) {
boolean isHighLevelQoS = MqttQoS.AT_LEAST_ONCE == qos || MqttQoS.EXACTLY_ONCE == qos;
int messageId = isHighLevelQoS ? messageIdGenerator.getId() : -1;
MqttPublishMessage message = MqttMessageBuilders.publish()
.topicName(topic)
.payload(payload)
.qos(qos)
.retained(retain)
.messageId(messageId)
.build();
Boolean result = Tio.send(context, message);
logger.debug("MQTT publish topic:{} qos:{} retain:{} result:{}", topic, qos, retain, result);
if (isHighLevelQoS) {
}
return result;
}
/**
* 发布消息给所以的在线设备
*
* @param topic topic
* @param payload 消息体
* @return 是否发送成功
*/
public Boolean publishAll(String topic, ByteBuffer payload) {
return publishAll(topic, payload, MqttQoS.AT_MOST_ONCE);
}
/**
* 发布消息
*
* @param topic topic
* @param payload 消息体
* @param qos MqttQoS
* @return 是否发送成功
*/
public Boolean publishAll(String topic, ByteBuffer payload, MqttQoS qos) {
return publishAll(topic, payload, qos, false);
}
/**
* 发布消息给所以的在线设备
*
* @param topic topic
* @param payload 消息体
* @param retain 是否在服务器上保留消息
* @return 是否发送成功
*/
public Boolean publishAll(String topic, ByteBuffer payload, boolean retain) {
return publishAll(topic, payload, MqttQoS.AT_MOST_ONCE, retain);
}
/**
* 发布消息给所以的在线设备
*
* @param topic topic
* @param payload 消息体
* @param qos MqttQoS
* @param retain 是否在服务器上保留消息
* @return 是否发送成功
*/
public Boolean publishAll(String topic, ByteBuffer payload, MqttQoS qos, boolean retain) {
SetWithLock<ChannelContext> contextSet = Tio.getAll(getServerConfig());
Set<ChannelContext> channelContexts = contextSet.getObj();
if (channelContexts.isEmpty()) {
logger.warn("Mqtt publish to all ChannelContext is empty.");
return false;
}
for (ChannelContext context : channelContexts) {
String clientId = context.getBsId();
payload.rewind();
publish(clientId, topic, payload, qos, retain);
}
return true;
}
public boolean stop() {
boolean result = this.tioServer.stop();
logger.info("MqttServer stop result:{}", result);
this.executor.shutdown();
return result;
}
......
......@@ -16,8 +16,11 @@
package net.dreamlu.iot.mqtt.core.server;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.DefaultAioListener;
import org.tio.core.Tio;
/**
* mqtt 服务监听
......@@ -25,11 +28,21 @@ import org.tio.core.DefaultAioListener;
* @author L.cm
*/
public class MqttServerAioListener extends DefaultAioListener {
private static final Logger logger = LoggerFactory.getLogger(MqttServerAioListener.class);
@Override
public boolean onHeartbeatTimeout(ChannelContext context, Long interval, int heartbeatTimeoutCount) {
// TODO L.cm 微调此处,三次超时时断开,避免长时间占用服务器连接
String clientId = context.getBsId();
logger.info("Mqtt HeartbeatTimeout clientId:{} interval:{} count:{}", clientId, interval, heartbeatTimeoutCount);
return true;
}
@Override
public void onBeforeClose(ChannelContext context, Throwable throwable, String remark, boolean isRemove) {
String clientId = context.getBsId();
logger.info("Mqtt server close clientId:{} remark:{} isRemove:{}", clientId, remark, isRemove);
Tio.unbindBsId(context);
}
}
......@@ -23,10 +23,12 @@ import org.tio.server.ServerTioConfig;
import org.tio.server.TioServer;
import org.tio.server.intf.ServerAioHandler;
import org.tio.server.intf.ServerAioListener;
import org.tio.utils.thread.pool.DefaultThreadFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;
import java.util.concurrent.ScheduledThreadPoolExecutor;
/**
* mqtt 服务端参数构造
......@@ -67,6 +69,10 @@ public class MqttServerCreator {
* tio 的 IpStatListener
*/
private IpStatListener ipStatListener;
/**
* messageId 生成
*/
private IMqttMessageIdGenerator messageIdGenerator;
/**
* mqtt 服务端处理逻辑
*/
......@@ -157,6 +163,15 @@ public class MqttServerCreator {
return this;
}
public IMqttMessageIdGenerator getMessageIdGenerator() {
return messageIdGenerator;
}
public MqttServerCreator messageIdGenerator(IMqttMessageIdGenerator messageIdGenerator) {
this.messageIdGenerator = messageIdGenerator;
return this;
}
public MqttServerProcessor getMqttServerCreatorProcessor() {
return mqttServerProcessor;
}
......@@ -168,6 +183,7 @@ public class MqttServerCreator {
public MqttServer start() throws IOException {
Objects.requireNonNull(this.mqttServerProcessor, "Argument mqttServerProcessor is null.");
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, DefaultThreadFactory.getInstance("MqttServer"));
// 处理消息
ServerAioHandler handler = new MqttServerAioHandler(this.bufferAllocator, this.mqttServerProcessor);
// 监听
......@@ -192,7 +208,7 @@ public class MqttServerCreator {
tioServer.setCheckLastVersion(false);
// 启动
tioServer.start(this.ip, this.port);
return new MqttServer(tioServer);
return new MqttServer(tioServer, this.messageIdGenerator, executor);
}
}
package net.dreamlu.iot.mqtt.core.server;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.common.MqttSubscription;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
/**
* 默认的 mqtt 订阅管理
*
* @author L.cm
*/
public class MqttServerDefaultSubManager implements IMqttSubManager {
private final List<MqttSubscription> subscriptionList = new LinkedList<>();
@Override
public void register(MqttSubscription subscription) {
subscriptionList.add(subscription);
}
@Override
public List<MqttSubscription> getMatchedSubscription(String topicName, MqttQoS mqttQoS) {
List<MqttSubscription> list = new ArrayList<>();
for (MqttSubscription subscription : subscriptionList) {
MqttQoS qos = subscription.getMqttQoS();
if (subscription.matches(topicName) && (qos == null || qos == mqttQoS)) {
list.add(subscription);
}
}
return Collections.unmodifiableList(list);
}
@Override
public void clean() {
subscriptionList.clear();
}
}
......@@ -49,7 +49,7 @@ public class MqttClientTest {
timer.schedule(new TimerTask() {
@Override
public void run() {
client.publish("testtopicxx", ByteBuffer.wrap("mica最牛皮".getBytes(StandardCharsets.UTF_8)));
client.publish("/test/client", ByteBuffer.wrap("mica最牛皮".getBytes(StandardCharsets.UTF_8)));
}
}, 1000, 2000);
}
......
package net.dreamlu.iot.mqtt.server;
import net.dreamlu.iot.mqtt.core.server.IMqttMessageIdGenerator;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 仅仅作为演示,实际需要考虑集群
*/
public class MqttMessageIdGenerator implements IMqttMessageIdGenerator {
private final AtomicInteger value = new AtomicInteger(1);
@Override
public int getId() {
this.value.compareAndSet(0xffff, 1);
return this.value.getAndIncrement();
}
}
......@@ -17,15 +17,21 @@
package net.dreamlu.iot.mqtt.server;
import net.dreamlu.iot.mqtt.codec.*;
import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish;
import net.dreamlu.iot.mqtt.core.common.MqttMessageListener;
import net.dreamlu.iot.mqtt.core.common.MqttSubscription;
import net.dreamlu.iot.mqtt.core.server.IMqttAuthHandler;
import net.dreamlu.iot.mqtt.core.server.IMqttMessageIdGenerator;
import net.dreamlu.iot.mqtt.core.server.MqttServerProcessor;
import net.dreamlu.iot.mqtt.core.server.IMqttSubManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.utils.hutool.StrUtil;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.stream.Collectors;
/**
......@@ -33,63 +39,88 @@ import java.util.stream.Collectors;
*
* @author L.cm
*/
public class MqttBrokerProcessorImpl implements MqttServerProcessor {
private static final Logger log = LoggerFactory.getLogger(MqttBrokerProcessorImpl.class);
public class MqttServerProcessorImpl implements MqttServerProcessor {
private static final Logger logger = LoggerFactory.getLogger(MqttServerProcessorImpl.class);
private final IMqttAuthHandler authHandler;
private final IMqttMessageIdGenerator messageIdGenerator;
private final IMqttSubManager subManager;
private final ScheduledThreadPoolExecutor executor;
public MqttServerProcessorImpl(IMqttAuthHandler authHandler,
IMqttSubManager subManager,
IMqttMessageIdGenerator messageIdGenerator,
ScheduledThreadPoolExecutor executor) {
this.authHandler = authHandler;
this.subManager = subManager;
this.messageIdGenerator = messageIdGenerator;
this.executor = executor;
}
@Override
public void processConnect(ChannelContext context, MqttConnectMessage mqttMessage) {
MqttConnectPayload payload = mqttMessage.payload();
String clientId = payload.clientIdentifier();
// 1. 客户端必须提供clientId, 不管cleanSession是否为1, 此处没有参考标准协议实现
// 1. 客户端必须提供 clientId, 不管 cleanSession 是否为1, 此处没有参考标准协议实现
if (StrUtil.isBlank(clientId)) {
refusedIdentifierRejected(context);
connAckByReturnCode(clientId, context, MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
return;
}
log.debug("CONNECT - clientId: {}", clientId);
// 2. 认证
String userName = payload.userName();
String password = payload.password();
boolean authResult = false;
if (false) {
MqttConnAckMessage message = MqttMessageBuilders.connAck()
.returnCode(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD)
.sessionPresent(false)
.build();
Tio.send(context, message);
if (!authHandler.authenticate(clientId, userName, password)) {
connAckByReturnCode(clientId, context, MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
return;
}
// 3. 设置 clientId
context.setBsId(clientId);
Tio.bindBsId(context, clientId);
// 4. 返回 ack
MqttProperties mqttProperties = new MqttProperties();
MqttProperties.UserProperties userProperty = new MqttProperties.UserProperties();
userProperty.add("xxxxxxxxxx", "xxxx");
mqttProperties.add(userProperty);
MqttMessage message = MqttMessageBuilders.connAck()
.returnCode(MqttConnectReturnCode.CONNECTION_ACCEPTED)
.sessionPresent(false)
.properties(mqttProperties)
.build();
Tio.send(context, message);
connAckByReturnCode(clientId, context, MqttConnectReturnCode.CONNECTION_ACCEPTED);
}
private void refusedIdentifierRejected(ChannelContext context) {
private void connAckByReturnCode(String clientId, ChannelContext context, MqttConnectReturnCode returnCode) {
MqttConnAckMessage message = MqttMessageBuilders.connAck()
.returnCode(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED)
.returnCode(returnCode)
.sessionPresent(false)
.build();
Tio.send(context, message);
logger.debug("Connect ack send - clientId: {} returnCode:{}", clientId, returnCode);
}
@Override
public void processPublish(ChannelContext context, MqttPublishMessage message) {
String clientId = context.getBsId();
log.debug("PUBLISH - clientId: {}", clientId);
MqttFixedHeader fixedHeader = message.fixedHeader();
ByteBuffer payload = message.payload();
if (payload != null) {
System.out.println(ByteBufferUtil.toString(payload));
MqttQoS mqttQoS = fixedHeader.qosLevel();
MqttPublishVariableHeader variableHeader = message.variableHeader();
String topicName = variableHeader.topicName();
int packetId = variableHeader.packetId();
logger.debug("Publish - clientId: {} topicName:{} mqttQoS:{} packetId:{}", clientId, topicName, mqttQoS, packetId);
switch (mqttQoS) {
case AT_MOST_ONCE:
invokeListenerForPublish(mqttQoS, topicName, message);
break;
case AT_LEAST_ONCE:
invokeListenerForPublish(mqttQoS, topicName, message);
if (packetId != -1) {
MqttMessage messageAck = MqttMessageBuilders.pubAck()
.packetId(packetId)
.build();
Tio.send(context, messageAck);
}
break;
case EXACTLY_ONCE:
if (packetId != -1) {
MqttFixedHeader pubRecFixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessage pubRecMessage = new MqttMessage(pubRecFixedHeader, MqttMessageIdVariableHeader.from(packetId));
MqttPendingQos2Publish pendingQos2Publish = new MqttPendingQos2Publish(message, pubRecMessage);
// subscriptionManager.addPendingQos2Publish(packetId, pendingQos2Publish);
pendingQos2Publish.startPubRecRetransmitTimer(executor, msg -> Tio.send(context, msg));
}
break;
case FAILURE:
default:
}
}
......@@ -97,23 +128,25 @@ public class MqttBrokerProcessorImpl implements MqttServerProcessor {
public void processPubAck(ChannelContext context, MqttMessageIdVariableHeader variableHeader) {
int messageId = variableHeader.messageId();
String clientId = context.getBsId();
log.debug("PUBACK - clientId: {}, messageId: {}", clientId, messageId);
logger.debug("PubAck - clientId: {}, messageId: {}", clientId, messageId);
}
@Override
public void processPubRec(ChannelContext context, MqttMessageIdVariableHeader variableHeader) {
String clientId = context.getBsId();
log.debug("PUBREC - clientId: {}, messageId: {}", clientId, variableHeader.messageId());
int messageId = variableHeader.messageId();
logger.debug("PubRec - clientId: {}, messageId: {}", clientId, messageId);
MqttMessage message = MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(variableHeader.messageId()), null);
MqttMessageIdVariableHeader.from(messageId), null);
Tio.send(context, message);
}
@Override
public void processPubRel(ChannelContext context, MqttMessageIdVariableHeader variableHeader) {
String clientId = context.getBsId();
log.debug("PUBREL - clientId: {}, messageId: {}", clientId, variableHeader.messageId());
logger.debug("PubRel - clientId: {}, messageId: {}", clientId, variableHeader.messageId());
// TODO L.cm invokeListenerForPublish
MqttMessage message = MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(variableHeader.messageId()), null);
......@@ -124,45 +157,66 @@ public class MqttBrokerProcessorImpl implements MqttServerProcessor {
public void processPubComp(ChannelContext context, MqttMessageIdVariableHeader variableHeader) {
int messageId = variableHeader.messageId();
String clientId = context.getBsId();
log.debug("PUBCOMP - clientId: {}, messageId: {}", clientId, messageId);
logger.debug("PubComp - clientId: {}, messageId: {}", clientId, messageId);
}
@Override
public void processSubscribe(ChannelContext context, MqttSubscribeMessage message) {
String clientId = context.getBsId();
int messageId = message.variableHeader().messageId();
logger.debug("Subscribe - clientId: {} messageId:{}", clientId, messageId);
List<MqttTopicSubscription> topicSubscriptions = message.payload().topicSubscriptions();
List<Integer> mqttQoSList = topicSubscriptions.stream()
// 1. 校验 topicFilter
// 2. 存储 clientId 订阅的 topic
// 3. 返回 ack
List<MqttQoS> mqttQoSList = topicSubscriptions.stream()
.map(MqttTopicSubscription::qualityOfService)
.map(MqttQoS::value)
.collect(Collectors.toList());
MqttMessage subAckMessage = MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(message.variableHeader().messageId()),
new MqttSubAckPayload(mqttQoSList));
MqttMessage subAckMessage = MqttMessageBuilders.subAck()
.addGrantedQosList(mqttQoSList)
.packetId(messageId)
.build();
Tio.send(context, subAckMessage);
}
@Override
public void processUnSubscribe(ChannelContext context, MqttUnsubscribeMessage mqttMessage) {
public void processUnSubscribe(ChannelContext context, MqttUnsubscribeMessage message) {
String clientId = context.getBsId();
log.debug("UnSubscribe - clientId: {}", clientId);
MqttMessage message = MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(mqttMessage.variableHeader().messageId()), null);
Tio.send(context, message);
int messageId = message.variableHeader().messageId();
logger.debug("UnSubscribe - clientId: {} messageId:{}", clientId, messageId);
MqttMessage unSubMessage = MqttMessageBuilders.unsubAck()
.packetId(messageId)
.build();
Tio.send(context, unSubMessage);
}
@Override
public void processPingReq(ChannelContext context) {
String clientId = context.getBsId();
log.debug("PINGREQ - clientId: {}", clientId);
logger.debug("PingReq - clientId: {}", clientId);
Tio.send(context, MqttMessage.PINGRESP);
}
@Override
public void processDisConnect(ChannelContext context) {
String clientId = context.getBsId();
log.debug("DISCONNECT - clientId: {}", clientId);
Tio.close(context, "MqttIdentifierRejected");
logger.info("DisConnect - clientId: {}", clientId);
Tio.unbindBsId(context);
Tio.close(context, "Mqtt DisConnect");
}
/**
* 处理订阅的消息
*
* @param topicName topicName
* @param message MqttPublishMessage
*/
private void invokeListenerForPublish(MqttQoS mqttQoS, String topicName, MqttPublishMessage message) {
List<MqttSubscription> subscriptionList = subManager.getMatchedSubscription(topicName, mqttQoS);
subscriptionList.forEach(subscription -> {
MqttMessageListener listener = subscription.getListener();
listener.onMessage(topicName, message.payload());
});
}
}
......@@ -16,18 +16,19 @@
package net.dreamlu.iot.mqtt.server;
import net.dreamlu.iot.mqtt.codec.*;
import net.dreamlu.iot.mqtt.codec.ByteBufferUtil;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.common.MqttSubscription;
import net.dreamlu.iot.mqtt.core.server.IMqttAuthHandler;
import net.dreamlu.iot.mqtt.core.server.IMqttMessageIdGenerator;
import net.dreamlu.iot.mqtt.core.server.MqttServer;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.server.ServerTioConfig;
import org.tio.utils.lock.SetWithLock;
import net.dreamlu.iot.mqtt.core.server.MqttServerDefaultSubManager;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ScheduledThreadPoolExecutor;
/**
* mqtt 服务端测试
......@@ -37,6 +38,17 @@ import java.util.TimerTask;
public class MqttServerTest {
public static void main(String[] args) throws IOException {
IMqttAuthHandler authHandler = (clientId, userName, password) -> true;
MqttServerDefaultSubManager subManager = new MqttServerDefaultSubManager();
subManager.register(new MqttSubscription(MqttQoS.AT_MOST_ONCE, "/test/#", ((topic, payload) -> {
System.out.println(topic + '\t' + ByteBufferUtil.toString(payload));
})));
IMqttMessageIdGenerator messageIdGenerator = new MqttMessageIdGenerator();
ScheduledThreadPoolExecutor executor = null;
MqttServerProcessorImpl processor = new MqttServerProcessorImpl(authHandler, subManager, messageIdGenerator, executor);
MqttServer mqttServer = MqttServer.create()
// 默认 MICA-MQTT-SERVER
.name("mqtt-server")
......@@ -44,24 +56,15 @@ public class MqttServerTest {
.ip("127.0.0.1")
// 默认:1883
.port(1883)
.processor(new MqttBrokerProcessorImpl())
.messageIdGenerator(messageIdGenerator)
.processor(processor)
.start();
ServerTioConfig serverConfig = mqttServer.getServerConfig();
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
SetWithLock<ChannelContext> contextSet = Tio.getAll(serverConfig);
Set<ChannelContext> channelContexts = contextSet.getObj();
channelContexts.forEach(context -> {
System.out.println(String.format("MqttServer send to clientId:%s", context.getBsId()));
MqttPublishMessage message = (MqttPublishMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttPublishVariableHeader("/test/123", 0), ByteBuffer.wrap("mica最牛皮".getBytes()));
Tio.send(context, message);
});
mqttServer.publishAll("/test/123", ByteBuffer.wrap("mica最牛皮".getBytes()));
}
}, 1000, 2000);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册