diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientProcessor.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientProcessor.java index cf9bea9c827a270edbcbdb60bc9b31ebc2b3adfc..4773336609ef4bd3b02374566f3b4acde27f77a2 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientProcessor.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientProcessor.java @@ -144,7 +144,13 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor { } paddingSubscribe.onSubAckReceived(); clientSession.removePaddingSubscribe(messageId); - clientSession.addSubscription(paddingSubscribe.toSubscription()); + MqttClientSubscription subscription = paddingSubscribe.toSubscription(); + clientSession.addSubscription(subscription); + try { + subscription.getListener().onSubscribed(topicFilter, subscription.getMqttQoS()); + } catch (Throwable e) { + logger.error("MQTT Topic:{} subscribed onSubscribed event error.", topicFilter); + } } @Override diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/IMqttClientMessageListener.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/IMqttClientMessageListener.java index 81e34807bf8994bb27617b7fd6b027e2f0395522..7093d93b2d46506093a752fb765bd1d186164fce 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/IMqttClientMessageListener.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/IMqttClientMessageListener.java @@ -16,6 +16,8 @@ package net.dreamlu.iot.mqtt.core.client; +import net.dreamlu.iot.mqtt.codec.MqttQoS; + import java.nio.ByteBuffer; /** @@ -26,6 +28,16 @@ import java.nio.ByteBuffer; @FunctionalInterface public interface IMqttClientMessageListener { + /** + * 订阅成功之后的事件 + * + * @param topicFilter topicFilter + * @param mqttQoS MqttQoS + */ + default void onSubscribed(String topicFilter, MqttQoS mqttQoS) { + + } + /** * 监听到消息 * diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClient.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClient.java index 2e1acf2f03c860e67d43189df4a41b50c9004592..fe52d549449256b6f50dc81648cbbb14f453db64 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClient.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClient.java @@ -25,6 +25,7 @@ import org.tio.client.TioClient; import org.tio.core.Tio; import java.nio.ByteBuffer; +import java.util.Objects; import java.util.concurrent.ScheduledThreadPoolExecutor; /** @@ -98,6 +99,8 @@ public final class MqttClient { * @return MqttClient */ public MqttClient subscribe(MqttQoS mqttQoS, String topicFilter, IMqttClientMessageListener listener) { + Objects.requireNonNull(topicFilter, "MQTT subscribe topicFilter is null."); + Objects.requireNonNull(listener, "MQTT subscribe listener is null."); // 先判断是否已经订阅过,重复订阅,直接跳出 boolean subscribed = clientSession.isSubscribed(topicFilter, mqttQoS, listener); if (!subscribed) {