提交 ed2021c7 编写于 作者: 浅梦2013's avatar 浅梦2013

mica-mqtt client 添加连接监听。

上级 381a0801
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.client;
import org.tio.core.ChannelContext;
/**
* mqtt 客户端连接监听
*
* @author L.cm
*/
@FunctionalInterface
public interface IMqttClientConnectListener {
/**
* 监听到消息
*
* @param context ChannelContext
* @param isReconnect 是否重连
*/
void onConnected(ChannelContext context, boolean isReconnect);
}
......@@ -16,10 +16,7 @@
package net.dreamlu.iot.mqtt.core.client;
import net.dreamlu.iot.mqtt.codec.MqttMessageBuilders;
import net.dreamlu.iot.mqtt.codec.MqttProperties;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.codec.MqttSubscribeMessage;
import net.dreamlu.iot.mqtt.codec.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.client.DefaultClientAioListener;
......@@ -42,6 +39,7 @@ public class MqttClientAioListener extends DefaultClientAioListener {
private final MqttClientCreator clientConfig;
private final MqttWillMessage willMessage;
private final MqttClientStore clientStore;
private final IMqttClientConnectListener connectListener;
private final ScheduledThreadPoolExecutor executor;
public MqttClientAioListener(MqttClientCreator clientConfig,
......@@ -50,6 +48,7 @@ public class MqttClientAioListener extends DefaultClientAioListener {
this.clientConfig = Objects.requireNonNull(clientConfig);
this.willMessage = clientConfig.getWillMessage();
this.clientStore = clientStore;
this.connectListener = clientConfig.getConnectListener();
this.executor = executor;
}
......@@ -83,13 +82,26 @@ public class MqttClientAioListener extends DefaultClientAioListener {
builder.properties(properties);
}
// 5. 发送 mqtt 连接消息
Boolean result = Tio.send(context, builder.build());
logger.info("MqttClient reconnect send connect result:{}", result);
sendConnectMessage(context, builder.build());
// 6. 重连时发送重新订阅
reSendSubscription(context);
// 7. 发布连接通知
publishConnectEvent(context, isReconnect);
}
}
/**
* 发送连接的消息
*
* @param context ChannelContext
* @param message MqttMessage
*/
private static void sendConnectMessage(ChannelContext context, MqttMessage message) {
// 5. 发送 mqtt 连接消息
Boolean result = Tio.send(context, message);
logger.info("MqttClient reconnect send connect result:{}", result);
}
private void reSendSubscription(ChannelContext context) {
List<MqttClientSubscription> subscriptionList = clientStore.getAndCleanSubscription();
for (MqttClientSubscription subscription : subscriptionList) {
......@@ -107,4 +119,16 @@ public class MqttClientAioListener extends DefaultClientAioListener {
clientStore.addPaddingSubscribe(messageId, pendingSubscription);
}
}
private void publishConnectEvent(ChannelContext context, boolean isReconnect) {
// 先判断是否配置监听
if (connectListener == null) {
return;
}
try {
connectListener.onConnected(context, isReconnect);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
}
}
......@@ -117,6 +117,10 @@ public final class MqttClientCreator {
* ByteBuffer Allocator,支持堆内存和堆外内存,默认为:堆内存
*/
private ByteBufferAllocator bufferAllocator = ByteBufferAllocator.HEAP;
/**
* 连接监听器
*/
private IMqttClientConnectListener connectListener;
public String getName() {
return name;
......@@ -190,6 +194,10 @@ public final class MqttClientCreator {
return bufferAllocator;
}
public IMqttClientConnectListener getConnectListener() {
return connectListener;
}
public MqttClientCreator name(String name) {
this.name = name;
return this;
......@@ -286,6 +294,11 @@ public final class MqttClientCreator {
return this;
}
public MqttClientCreator connectListener(IMqttClientConnectListener connectListener) {
this.connectListener = connectListener;
return this;
}
public MqttClient connect() {
// 1. 生成 默认的 clientId
String clientId = getClientId();
......
......@@ -16,6 +16,7 @@
package net.dreamlu.iot.mqtt.spring.client;
import net.dreamlu.iot.mqtt.core.client.IMqttClientConnectListener;
import net.dreamlu.iot.mqtt.core.client.MqttClient;
import net.dreamlu.iot.mqtt.core.client.MqttClientCreator;
import net.dreamlu.iot.mqtt.core.client.MqttWillMessage;
......@@ -44,6 +45,7 @@ public class MqttClientConfiguration {
@Bean
public MqttClientCreator mqttClientCreator(MqttClientProperties properties,
ObjectProvider<IMqttClientConnectListener> clientConnectListenerObjectProvider,
ObjectProvider<MqttClientCustomizer> customizers) {
MqttClientCreator clientCreator = MqttClient.create()
.name(properties.getName())
......@@ -79,6 +81,8 @@ public class MqttClientConfiguration {
}
clientCreator.willMessage(builder.build());
}
// 配置客户端链接监听器
clientConnectListenerObjectProvider.ifAvailable(clientCreator::connectListener);
// 自定义处理
customizers.ifAvailable((customizer) -> customizer.customize(clientCreator));
return clientCreator;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册