提交 eb66a55f 编写于 作者: 如梦技术's avatar 如梦技术 🐛

mica-mqttx-client 支持 `reconnect(String ip, int port)` 转移到其他服务,订阅保留,连接成功时自动重新订阅。

上级 499b88f2
......@@ -75,8 +75,8 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor {
}
// 2. 发布连接通知
publishConnectEvent(context);
// 3. 如果 session 不存在重连时发送重新订阅
if (!connAckVariableHeader.isSessionPresent()) {
// 3. 如果 session 不存在重连时发送重新订阅,更改 ip、端口之后需要重新发送订阅
if (!connAckVariableHeader.isSessionPresent() || MqttClient.isNeedReSub(context)) {
reSendSubscription(context);
}
break;
......
......@@ -45,6 +45,10 @@ import java.util.stream.Collectors;
*/
public final class MqttClient {
private static final Logger logger = LoggerFactory.getLogger(MqttClient.class);
/**
* 是否需要重新订阅
*/
private static final String MQTT_NEED_RE_SUB = "MQTT_NEED_RE_SUB";
private final TioClient tioClient;
private final MqttClientCreator config;
private final TioClientConfig clientTioConfig;
......@@ -447,6 +451,56 @@ public final class MqttClient {
}
}
/**
* 重连到新的服务端节点
*
* @param ip ip
* @param port port
* @return 是否成功
*/
public boolean reconnect(String ip, int port) {
return reconnect(new Node(ip, port));
}
/**
* 重连到新的服务端节点
*
* @param serverNode Node
* @return 是否成功
*/
public boolean reconnect(Node serverNode) {
// 更新 ip 和端口
this.config.ip(serverNode.getIp()).port(serverNode.getPort());
// 获取老的
ClientChannelContext oldContext = getContext();
if (oldContext != null) {
Tio.remove(context, "切换服务地址:" + serverNode);
}
try {
this.context = tioClient.connect(serverNode, config.getTimeout());
this.context.set(MQTT_NEED_RE_SUB, (byte) 1);
return true;
} catch (Exception e) {
logger.error("mqtt client reconnect error", e);
}
return false;
}
/**
* 是否需要重新订阅
*
* @param context ChannelContext
* @return 是否需要重新订阅
*/
public static boolean isNeedReSub(ChannelContext context) {
if (context.containsKey(MQTT_NEED_RE_SUB)) {
context.remove(MQTT_NEED_RE_SUB);
return true;
}
return false;
}
/**
* 断开 mqtt 连接
*
......
......@@ -235,6 +235,17 @@ public class MqttClientTemplate implements SmartInitializingSingleton, Disposabl
client.reconnect();
}
/**
* 重连到新的服务端节点
*
* @param ip ip
* @param port port
* @return 是否成功
*/
public boolean reconnect(String ip, int port) {
return client.reconnect(ip, port);
}
/**
* 断开 mqtt 连接
*
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册