提交 4a651aa6 编写于 作者: 如梦技术's avatar 如梦技术 🐛

🔖 mqtt client 订阅 IMqttClientMessageListener 添加 onSubscribed 方法。

上级 0070cc38
......@@ -144,7 +144,13 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor {
}
paddingSubscribe.onSubAckReceived();
clientSession.removePaddingSubscribe(messageId);
clientSession.addSubscription(paddingSubscribe.toSubscription());
MqttClientSubscription subscription = paddingSubscribe.toSubscription();
clientSession.addSubscription(subscription);
try {
subscription.getListener().onSubscribed(topicFilter, subscription.getMqttQoS());
} catch (Throwable e) {
logger.error("MQTT Topic:{} subscribed onSubscribed event error.", topicFilter);
}
}
@Override
......
......@@ -16,6 +16,8 @@
package net.dreamlu.iot.mqtt.core.client;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import java.nio.ByteBuffer;
/**
......@@ -26,6 +28,16 @@ import java.nio.ByteBuffer;
@FunctionalInterface
public interface IMqttClientMessageListener {
/**
* 订阅成功之后的事件
*
* @param topicFilter topicFilter
* @param mqttQoS MqttQoS
*/
default void onSubscribed(String topicFilter, MqttQoS mqttQoS) {
}
/**
* 监听到消息
*
......
......@@ -25,6 +25,7 @@ import org.tio.client.TioClient;
import org.tio.core.Tio;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.concurrent.ScheduledThreadPoolExecutor;
/**
......@@ -98,6 +99,8 @@ public final class MqttClient {
* @return MqttClient
*/
public MqttClient subscribe(MqttQoS mqttQoS, String topicFilter, IMqttClientMessageListener listener) {
Objects.requireNonNull(topicFilter, "MQTT subscribe topicFilter is null.");
Objects.requireNonNull(listener, "MQTT subscribe listener is null.");
// 先判断是否已经订阅过,重复订阅,直接跳出
boolean subscribed = clientSession.isSubscribed(topicFilter, mqttQoS, listener);
if (!subscribed) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册