提交 7ecd29a0 编写于 作者: 如梦技术's avatar 如梦技术 🐛

🐛 修复多个 mica mqtt client 消息id生成器隔离。

上级 87b2f868
......@@ -16,38 +16,23 @@
package net.dreamlu.iot.mqtt.core.client;
import net.dreamlu.iot.mqtt.codec.MqttMessageIdVariableHeader;
import java.util.concurrent.atomic.AtomicInteger;
/**
* mqtt 客户端的消息 id
* 默认的 mqtt 客户端的消息 id 生成器
*
* @author L.cm
*/
public enum MqttClientMessageId {
/**
* 实例
*/
INSTANCE(new AtomicInteger(1));
public final class DefaultMqttClientMessageIdGenerator implements IMqttClientMessageIdGenerator {
private final AtomicInteger value;
MqttClientMessageId(AtomicInteger value) {
this.value = value;
public DefaultMqttClientMessageIdGenerator() {
this.value = new AtomicInteger(1);
}
public int getMessageId() {
public int getId() {
this.value.compareAndSet(0xffff, 1);
return this.value.getAndIncrement();
}
public static int getId() {
return INSTANCE.getMessageId();
}
public static MqttMessageIdVariableHeader getVariableHeader() {
return MqttMessageIdVariableHeader.from(getId());
}
}
......@@ -42,6 +42,7 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor {
private final int reSubscribeBatchSize;
private final IMqttClientSession clientSession;
private final IMqttClientConnectListener connectListener;
private final IMqttClientMessageIdGenerator messageIdGenerator;
private final ScheduledThreadPoolExecutor executor;
public DefaultMqttClientProcessor(MqttClientCreator mqttClientCreator,
......@@ -49,6 +50,7 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor {
this.reSubscribeBatchSize = mqttClientCreator.getReSubscribeBatchSize();
this.clientSession = mqttClientCreator.getClientSession();
this.connectListener = mqttClientCreator.getConnectListener();
this.messageIdGenerator = mqttClientCreator.getMessageIdGenerator();
this.executor = executor;
}
......@@ -139,7 +141,7 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor {
List<MqttTopicSubscription> topicSubscriptionList = reSubscriptionList.stream()
.map(MqttClientSubscription::toTopicSubscription)
.collect(Collectors.toList());
int messageId = MqttClientMessageId.getId();
int messageId = messageIdGenerator.getId();
MqttSubscribeMessage message = MqttMessageBuilders.subscribe()
.addSubscriptions(topicSubscriptionList)
.messageId(messageId)
......
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.client;
/**
* mqtt client messageId 生成器
*
* @author L.cm
*/
public interface IMqttClientMessageIdGenerator {
/**
* 获取 messageId
*
* @return messageId
*/
int getId();
}
......@@ -41,6 +41,7 @@ public final class MqttClient {
private final ClientChannelContext context;
private final IMqttClientSession clientSession;
private final ScheduledThreadPoolExecutor executor;
private final IMqttClientMessageIdGenerator messageIdGenerator;
public static MqttClientCreator create() {
return new MqttClientCreator();
......@@ -49,13 +50,13 @@ public final class MqttClient {
MqttClient(TioClient tioClient,
MqttClientCreator config,
ClientChannelContext context,
IMqttClientSession clientSession,
ScheduledThreadPoolExecutor executor) {
this.tioClient = tioClient;
this.config = config;
this.context = context;
this.clientSession = clientSession;
this.executor = executor;
this.clientSession = config.getClientSession();
this.messageIdGenerator = config.getMessageIdGenerator();
}
/**
......@@ -143,7 +144,7 @@ public final class MqttClient {
.map(MqttClientSubscription::toTopicSubscription)
.collect(Collectors.toList());
// 3. 没有订阅过
int messageId = MqttClientMessageId.getId();
int messageId = messageIdGenerator.getId();
MqttSubscribeMessage message = MqttMessageBuilders.subscribe()
.addSubscriptions(topicSubscriptionList)
.messageId(messageId)
......@@ -173,7 +174,7 @@ public final class MqttClient {
* @return MqttClient
*/
public MqttClient unSubscribe(List<String> topicFilters) {
int messageId = MqttClientMessageId.getId();
int messageId = messageIdGenerator.getId();
MqttUnsubscribeMessage message = MqttMessageBuilders.unsubscribe()
.addTopicFilters(topicFilters)
.messageId(messageId)
......@@ -281,7 +282,7 @@ public final class MqttClient {
*/
public boolean publish(String topic, ByteBuffer payload, MqttQoS qos, boolean retain) {
boolean isHighLevelQoS = MqttQoS.AT_LEAST_ONCE == qos || MqttQoS.EXACTLY_ONCE == qos;
int messageId = isHighLevelQoS ? MqttClientMessageId.getId() : -1;
int messageId = isHighLevelQoS ? messageIdGenerator.getId() : -1;
if (payload == null) {
payload = ByteBuffer.allocate(0);
}
......
......@@ -142,6 +142,10 @@ public final class MqttClientCreator {
* 客户端 session
*/
private IMqttClientSession clientSession;
/**
* messageId 生成器
*/
private IMqttClientMessageIdGenerator messageIdGenerator;
public String getName() {
return name;
......@@ -239,6 +243,10 @@ public final class MqttClientCreator {
return clientSession;
}
public IMqttClientMessageIdGenerator getMessageIdGenerator() {
return messageIdGenerator;
}
public MqttClientCreator name(String name) {
this.name = name;
return this;
......@@ -365,6 +373,11 @@ public final class MqttClientCreator {
return this;
}
public MqttClientCreator messageIdGenerator(IMqttClientMessageIdGenerator messageIdGenerator) {
this.messageIdGenerator = messageIdGenerator;
return this;
}
public MqttClient connect() {
// 1. 生成 默认的 clientId
String clientId = getClientId();
......@@ -376,30 +389,34 @@ public final class MqttClientCreator {
if (this.clientSession == null) {
this.clientSession = new DefaultMqttClientSession();
}
// 3. 消息id 生成器
if (this.messageIdGenerator == null) {
this.messageIdGenerator = new DefaultMqttClientMessageIdGenerator();
}
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, DefaultThreadFactory.getInstance("MqttClient"));
IMqttClientProcessor processor = new DefaultMqttClientProcessor(this, executor);
// 3. 初始化 mqtt 处理器
// 4. 初始化 mqtt 处理器
ClientAioHandler clientAioHandler = new MqttClientAioHandler(this, processor);
ClientAioListener clientAioListener = new MqttClientAioListener(this);
// 4. 重连配置
// 5. 重连配置
ReconnConf reconnConf = null;
if (this.reconnect) {
reconnConf = new ReconnConf(this.reInterval, this.retryCount);
}
// 5. tioConfig
// 6. tioConfig
ClientTioConfig tioConfig = new ClientTioConfig(clientAioHandler, clientAioListener, reconnConf);
tioConfig.setName(this.name);
// 6. 心跳超时时间
// 7. 心跳超时时间
tioConfig.setHeartbeatTimeout(TimeUnit.SECONDS.toMillis(this.keepAliveSecs));
// 7. mqtt 消息最大长度
// 8. mqtt 消息最大长度
tioConfig.setReadBufferSize(this.readBufferSize);
// 8. tioClient
// 9. ssl 证书设置
tioConfig.setSslConfig(this.sslConfig);
// 10. tioClient
try {
// 9. ssl 证书设置
tioConfig.setSslConfig(this.sslConfig);
TioClient tioClient = new TioClient(tioConfig);
ClientChannelContext context = tioClient.connect(new Node(this.ip, this.port), this.timeout);
return new MqttClient(tioClient, this, context, this.clientSession, executor);
return new MqttClient(tioClient, this, context, executor);
} catch (Exception e) {
throw new IllegalStateException("Mica mqtt client start fail.", e);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册