提交 521bfd99 编写于 作者: 如梦技术's avatar 如梦技术 🐛

完善 mica-mqtt

上级 40859f62
......@@ -17,6 +17,7 @@
package net.dreamlu.iot.mqtt.core.client;
import net.dreamlu.iot.mqtt.codec.*;
import net.dreamlu.iot.mqtt.core.common.MqttMessageListener;
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish;
import net.dreamlu.iot.mqtt.core.common.MqttSubscription;
......@@ -25,6 +26,7 @@ import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledThreadPoolExecutor;
......@@ -36,14 +38,14 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
*/
public class DefaultMqttClientProcessor implements MqttClientProcessor {
private static final Logger logger = LoggerFactory.getLogger(DefaultMqttClientProcessor.class);
private final MqttClientSubscriptionManager subscriptionManager;
private final MqttClientStore clientStore;
private final CountDownLatch connLatch;
private final ScheduledThreadPoolExecutor executor;
public DefaultMqttClientProcessor(MqttClientSubscriptionManager subscriptionManager,
public DefaultMqttClientProcessor(MqttClientStore clientStore,
CountDownLatch connLatch,
ScheduledThreadPoolExecutor executor) {
this.subscriptionManager = subscriptionManager;
this.clientStore = clientStore;
this.connLatch = connLatch;
this.executor = executor;
}
......@@ -70,7 +72,6 @@ public class DefaultMqttClientProcessor implements MqttClientProcessor {
default:
String remark = "MqttClient connect error error ReturnCode:" + returnCode;
Tio.close(context, remark);
throw new IllegalStateException(remark);
}
}
......@@ -78,13 +79,13 @@ public class DefaultMqttClientProcessor implements MqttClientProcessor {
public void processSubAck(MqttSubAckMessage message) {
int messageId = message.variableHeader().messageId();
logger.debug("MqttClient SubAck messageId:{}", messageId);
MqttPendingSubscription paddingSubscribe = subscriptionManager.getPaddingSubscribe(messageId);
MqttPendingSubscription paddingSubscribe = clientStore.getPaddingSubscribe(messageId);
if (paddingSubscribe == null) {
return;
}
paddingSubscribe.onSubAckReceived();
subscriptionManager.removePaddingSubscribe(messageId);
subscriptionManager.addSubscription(paddingSubscribe.toSubscription());
clientStore.removePaddingSubscribe(messageId);
clientStore.addSubscription(paddingSubscribe.toSubscription());
}
@Override
......@@ -113,7 +114,7 @@ public class DefaultMqttClientProcessor implements MqttClientProcessor {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessage pubRecMessage = new MqttMessage(fixedHeader, MqttMessageIdVariableHeader.from(packetId));
MqttPendingQos2Publish pendingQos2Publish = new MqttPendingQos2Publish(message, pubRecMessage);
subscriptionManager.addPendingQos2Publish(packetId, pendingQos2Publish);
clientStore.addPendingQos2Publish(packetId, pendingQos2Publish);
pendingQos2Publish.startPubRecRetransmitTimer(executor, msg -> Tio.send(context, msg));
}
break;
......@@ -126,25 +127,25 @@ public class DefaultMqttClientProcessor implements MqttClientProcessor {
public void processUnSubAck(MqttUnsubAckMessage message) {
int messageId = message.variableHeader().messageId();
logger.debug("MqttClient UnSubAck messageId:{}", messageId);
MqttPendingUnSubscription pendingUnSubscription = subscriptionManager.getPaddingUnSubscribe(messageId);
MqttPendingUnSubscription pendingUnSubscription = clientStore.getPaddingUnSubscribe(messageId);
if (pendingUnSubscription == null) {
return;
}
pendingUnSubscription.onUnSubAckReceived();
subscriptionManager.removePaddingUnSubscribe(messageId);
subscriptionManager.removeSubscriptions(pendingUnSubscription.getTopic());
clientStore.removePaddingUnSubscribe(messageId);
clientStore.removeSubscriptions(pendingUnSubscription.getTopic());
}
@Override
public void processPubAck(MqttPubAckMessage message) {
int messageId = message.variableHeader().messageId();
logger.debug("MqttClient PubAck messageId:{}", messageId);
MqttPendingPublish pendingPublish = subscriptionManager.getPendingPublish(messageId);
MqttPendingPublish pendingPublish = clientStore.getPendingPublish(messageId);
if (pendingPublish == null) {
return;
}
pendingPublish.onPubAckReceived();
subscriptionManager.removePendingPublish(messageId);
clientStore.removePendingPublish(messageId);
pendingPublish.getPayload().clear();
}
......@@ -152,7 +153,7 @@ public class DefaultMqttClientProcessor implements MqttClientProcessor {
public void processPubRec(ChannelContext context, MqttMessage message) {
int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
logger.debug("MqttClient PubRec messageId:{}", messageId);
MqttPendingPublish pendingPublish = subscriptionManager.getPendingPublish(messageId);
MqttPendingPublish pendingPublish = clientStore.getPendingPublish(messageId);
pendingPublish.onPubAckReceived();
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0);
......@@ -168,13 +169,13 @@ public class DefaultMqttClientProcessor implements MqttClientProcessor {
public void processPubRel(ChannelContext context, MqttMessage message) {
int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
logger.debug("MqttClient PubRel messageId:{}", messageId);
MqttPendingQos2Publish pendingQos2Publish = subscriptionManager.getPendingQos2Publish(messageId);
MqttPendingQos2Publish pendingQos2Publish = clientStore.getPendingQos2Publish(messageId);
if (pendingQos2Publish != null) {
MqttPublishMessage incomingPublish = pendingQos2Publish.getIncomingPublish();
String topicName = incomingPublish.variableHeader().topicName();
this.invokeListenerForPublish(topicName, incomingPublish);
pendingQos2Publish.onPubRelReceived();
subscriptionManager.removePendingQos2Publish(messageId);
clientStore.removePendingQos2Publish(messageId);
}
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
......@@ -184,10 +185,10 @@ public class DefaultMqttClientProcessor implements MqttClientProcessor {
@Override
public void processPubComp(MqttMessage message) {
int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
MqttPendingPublish pendingPublish = subscriptionManager.getPendingPublish(messageId);
MqttPendingPublish pendingPublish = clientStore.getPendingPublish(messageId);
pendingPublish.getPayload().clear();
pendingPublish.onPubCompReceived();
subscriptionManager.removePendingPublish(messageId);
clientStore.removePendingPublish(messageId);
}
/**
......@@ -197,8 +198,13 @@ public class DefaultMqttClientProcessor implements MqttClientProcessor {
* @param message MqttPublishMessage
*/
private void invokeListenerForPublish(String topicName, MqttPublishMessage message) {
List<MqttSubscription> subscriptionList = subscriptionManager.getMatchedSubscription(topicName);
subscriptionList.forEach(subscription -> subscription.getListener().onMessage(topicName, message.payload()));
List<MqttSubscription> subscriptionList = clientStore.getMatchedSubscription(topicName);
final ByteBuffer payload = message.payload();
subscriptionList.forEach(subscription -> {
MqttMessageListener listener = subscription.getListener();
payload.rewind();
listener.onMessage(topicName, payload);
});
}
}
......@@ -38,7 +38,7 @@ public final class MqttClient {
private final TioClient tioClient;
private final MqttClientCreator config;
private final ClientChannelContext context;
private final MqttClientSubscriptionManager subscriptionManager;
private final MqttClientStore clientStore;
private final ScheduledThreadPoolExecutor executor;
public static MqttClientCreator create() {
......@@ -48,12 +48,12 @@ public final class MqttClient {
MqttClient(TioClient tioClient,
MqttClientCreator config,
ClientChannelContext context,
MqttClientSubscriptionManager subscriptionManager,
MqttClientStore clientStore,
ScheduledThreadPoolExecutor executor) {
this.tioClient = tioClient;
this.config = config;
this.context = context;
this.subscriptionManager = subscriptionManager;
this.clientStore = clientStore;
this.executor = executor;
}
......@@ -108,7 +108,7 @@ public final class MqttClient {
Boolean result = Tio.send(context, message);
logger.debug("MQTT subscribe topicFilter:{} mqttQoS:{} messageId:{} result:{}", topicFilter, mqttQoS, messageId, result);
pendingSubscription.startRetransmitTimer(executor, (msg) -> Tio.send(context, message));
subscriptionManager.addPaddingSubscribe(messageId, pendingSubscription);
clientStore.addPaddingSubscribe(messageId, pendingSubscription);
return this;
}
......@@ -128,7 +128,7 @@ public final class MqttClient {
Boolean result = Tio.send(context, message);
logger.debug("MQTT unSubscribe topicFilter:{} messageId:{} result:{}", topicFilter, messageId, result);
// 解绑 subManage listener
subscriptionManager.addPaddingUnSubscribe(messageId, pendingUnSubscription);
clientStore.addPaddingUnSubscribe(messageId, pendingUnSubscription);
pendingUnSubscription.startRetransmissionTimer(executor, msg -> Tio.send(context, msg));
return this;
}
......@@ -191,7 +191,7 @@ public final class MqttClient {
logger.debug("MQTT publish topic:{} qos:{} retain:{} result:{}", topic, qos, retain, result);
if (isHighLevelQoS) {
MqttPendingPublish pendingPublish = new MqttPendingPublish(payload, message, qos);
subscriptionManager.addPendingPublish(messageId, pendingPublish);
clientStore.addPendingPublish(messageId, pendingPublish);
pendingPublish.startPublishRetransmissionTimer(executor, msg -> Tio.send(context, msg));
}
return result;
......@@ -229,6 +229,7 @@ public final class MqttClient {
boolean result = tioClient.stop();
logger.info("MqttClient stop result:{}", result);
this.executor.shutdown();
this.clientStore.clean();
return result;
}
......
......@@ -42,15 +42,15 @@ public class MqttClientAioListener extends DefaultClientAioListener {
private static final Logger logger = LoggerFactory.getLogger(MqttClient.class);
private final MqttClientCreator clientConfig;
private final MqttWillMessage willMessage;
private final MqttClientSubscriptionManager subscriptionManager;
private final MqttClientStore clientStore;
private final ScheduledThreadPoolExecutor executor;
public MqttClientAioListener(MqttClientCreator clientConfig,
MqttClientSubscriptionManager subscriptionManager,
MqttClientStore clientStore,
ScheduledThreadPoolExecutor executor) {
this.clientConfig = Objects.requireNonNull(clientConfig);
this.willMessage = clientConfig.getWillMessage();
this.subscriptionManager = subscriptionManager;
this.clientStore = clientStore;
this.executor = executor;
}
......@@ -92,7 +92,7 @@ public class MqttClientAioListener extends DefaultClientAioListener {
}
private void reSendSubscription(ChannelContext context) {
List<MqttSubscription> subscriptionList = subscriptionManager.getAndCleanSubscription();
List<MqttSubscription> subscriptionList = clientStore.getAndCleanSubscription();
for (MqttSubscription subscription : subscriptionList) {
int messageId = MqttClientMessageId.getId();
MqttQoS mqttQoS = subscription.getMqttQoS();
......@@ -105,7 +105,7 @@ public class MqttClientAioListener extends DefaultClientAioListener {
Boolean result = Tio.send(context, message);
logger.info("MQTT reconnect subscribe topicFilter:{} mqttQoS:{} messageId:{} result:{}", topicFilter, mqttQoS, messageId, result);
pendingSubscription.startRetransmitTimer(executor, (msg) -> Tio.send(context, message));
subscriptionManager.addPaddingSubscribe(messageId, pendingSubscription);
clientStore.addPaddingSubscribe(messageId, pendingSubscription);
}
}
}
......@@ -255,14 +255,14 @@ public final class MqttClientCreator {
// 默认为:MICA-MQTT- 前缀和 36进制的纳秒数
this.clientId("MICA-MQTT-" + Long.toString(System.nanoTime(), 36));
}
MqttClientSubscriptionManager subscriptionManager = new MqttClientSubscriptionManager();
MqttClientStore clientStore = new MqttClientStore();
// 客户端处理器
CountDownLatch connLatch = new CountDownLatch(1);
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, DefaultThreadFactory.getInstance("MqttClient"));
MqttClientProcessor processor = new DefaultMqttClientProcessor(subscriptionManager, connLatch, executor);
MqttClientProcessor processor = new DefaultMqttClientProcessor(clientStore, connLatch, executor);
// 2. 初始化 mqtt 处理器
ClientAioHandler clientAioHandler = new MqttClientAioHandler(this.bufferAllocator, Objects.requireNonNull(processor));
ClientAioListener clientAioListener = new MqttClientAioListener(this, subscriptionManager, executor);
ClientAioListener clientAioListener = new MqttClientAioListener(this, clientStore, executor);
// 3. 重连配置
ReconnConf reconnConf = null;
if (this.reconnect) {
......@@ -277,7 +277,7 @@ public final class MqttClientCreator {
ClientChannelContext context = tioClient.connect(new Node(this.ip, this.port), this.timeout);
// 5. 等待连接成功之后继续
connLatch.await();
return new MqttClient(tioClient, this, context, subscriptionManager, executor);
return new MqttClient(tioClient, this, context, clientStore, executor);
}
}
......@@ -28,7 +28,7 @@ import java.util.*;
*
* @author L.cm
*/
final class MqttClientSubscriptionManager {
final class MqttClientStore {
/**
* 订阅的数据承载
......@@ -117,4 +117,11 @@ final class MqttClientSubscriptionManager {
return pendingQos2PublishData.remove(messageId);
}
public void clean() {
subscriptions.clear();
pendingSubscriptions.clear();
pendingUnSubscriptions.clear();
pendingPublishData.clear();
pendingQos2PublishData.clear();
}
}
......@@ -28,6 +28,7 @@ import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.utils.hutool.StrUtil;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
......@@ -40,21 +41,18 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
public class DefaultMqttServerProcessor implements MqttServerProcessor {
private static final Logger logger = LoggerFactory.getLogger(DefaultMqttServerProcessor.class);
private final IMqttAuthHandler authHandler;
private final IMqttMessageIdGenerator messageIdGenerator;
private final IMqttPublishManager publishManager;
private final IMqttSubManager subManager;
private final IMqttSubscribeStore subscribeStore;
private final ScheduledThreadPoolExecutor executor;
public DefaultMqttServerProcessor(IMqttAuthHandler authHandler,
IMqttSubManager subManager,
IMqttPublishManager publishManager,
IMqttMessageIdGenerator messageIdGenerator,
IMqttSubscribeStore subscribeStore,
ScheduledThreadPoolExecutor executor) {
IMqttSubManager subManager,
IMqttPublishManager publishManager,
IMqttSubscribeStore subscribeStore,
ScheduledThreadPoolExecutor executor) {
this.authHandler = authHandler;
this.subManager = subManager;
this.messageIdGenerator = messageIdGenerator;
this.publishManager = publishManager;
this.subscribeStore = subscribeStore;
this.executor = executor;
......@@ -79,7 +77,8 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
// 3. 设置 clientId
context.setBsId(clientId);
Tio.bindBsId(context, clientId);
// 4. 返回 ack
// 4. TODO 存储遗嘱消息
// 5. 返回 ack
connAckByReturnCode(clientId, context, MqttConnectReturnCode.CONNECTION_ACCEPTED);
}
......@@ -210,7 +209,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
.packetId(messageId)
.build();
Tio.send(context, subAckMessage);
// 4. 发送保留消息
// 4. TODO 发送保留消息
}
@Override
......@@ -239,7 +238,6 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
public void processDisConnect(ChannelContext context) {
String clientId = context.getBsId();
logger.info("DisConnect - clientId: {}", clientId);
Tio.unbindBsId(context);
Tio.close(context, "Mqtt DisConnect");
}
......@@ -251,9 +249,11 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
*/
private void invokeListenerForPublish(MqttQoS mqttQoS, String topicName, MqttPublishMessage message) {
List<MqttSubscription> subscriptionList = subManager.getMatchedSubscription(topicName, mqttQoS);
final ByteBuffer payload = message.payload();
subscriptionList.forEach(subscription -> {
MqttMessageListener listener = subscription.getListener();
listener.onMessage(topicName, message.payload());
payload.rewind();
listener.onMessage(topicName, payload);
});
}
......
......@@ -16,6 +16,7 @@
package net.dreamlu.iot.mqtt.core.server;
import net.dreamlu.iot.mqtt.core.server.store.IMqttSubscribeStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
......@@ -29,6 +30,11 @@ import org.tio.core.Tio;
*/
public class MqttServerAioListener extends DefaultAioListener {
private static final Logger logger = LoggerFactory.getLogger(MqttServerAioListener.class);
private final IMqttSubscribeStore subscribeStore;
public MqttServerAioListener(IMqttSubscribeStore subscribeStore) {
this.subscribeStore = subscribeStore;
}
@Override
public boolean onHeartbeatTimeout(ChannelContext context, Long interval, int heartbeatTimeoutCount) {
......@@ -41,6 +47,11 @@ public class MqttServerAioListener extends DefaultAioListener {
public void onBeforeClose(ChannelContext context, Throwable throwable, String remark, boolean isRemove) {
String clientId = context.getBsId();
logger.info("Mqtt server close clientId:{} remark:{} isRemove:{}", clientId, remark, isRemove);
// 对于异常,处理遗嘱消息
if (throwable != null) {
// TODO 遗嘱消息处理
}
subscribeStore.remove(clientId);
Tio.unbindBsId(context);
}
......
......@@ -229,12 +229,12 @@ public class MqttServerCreator {
if (this.authHandler == null) {
this.authHandler = new DefaultMqttAuthHandler();
}
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, DefaultThreadFactory.getInstance("MqttServer"));
DefaultMqttServerProcessor serverProcessor = new DefaultMqttServerProcessor(authHandler, subManager, publishManager, messageIdGenerator, subscribeStore, executor);
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2, DefaultThreadFactory.getInstance("MqttServer"));
DefaultMqttServerProcessor serverProcessor = new DefaultMqttServerProcessor(this.authHandler, this.subManager, this.publishManager, this.subscribeStore, executor);
// 处理消息
ServerAioHandler handler = new MqttServerAioHandler(this.bufferAllocator, serverProcessor);
// 监听
ServerAioListener listener = new MqttServerAioListener();
ServerAioListener listener = new MqttServerAioListener(this.subscribeStore);
// 配置
ServerTioConfig config = new ServerTioConfig(this.name, handler, listener);
// 设置心跳 timeout
......
......@@ -44,6 +44,13 @@ public interface IMqttSubscribeStore {
*/
void remove(String clientId, String topicFilter);
/**
* 删除订阅
*
* @param clientId 客户端 Id
*/
void remove(String clientId);
/**
* 查找订阅信息
*
......
......@@ -45,6 +45,10 @@ public class MqttClientTest {
System.out.println(topic + '\t' + ByteBufferUtil.toString(payload));
});
client.subQos0("/#", (topic, payload) -> {
System.out.println(topic + '\t' + ByteBufferUtil.toString(payload));
});
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
......
......@@ -33,6 +33,11 @@ public class MqttSubscribeStore implements IMqttSubscribeStore {
map.remove(topicFilter);
}
@Override
public void remove(String clientId) {
data.remove(clientId);
}
@Override
public List<SubscribeStore> search(String clientId, String topicName) {
List<SubscribeStore> list = new ArrayList<>();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册