提交 d5c903a4 编写于 作者: 浅梦2013's avatar 浅梦2013

mqtt ack 优化。

上级 77056161
......@@ -477,6 +477,10 @@ public final class MqttClientCreator {
return this;
}
public MqttClientCreator ackService(long tickMs, int wheelSize) {
return ackService(new DefaultAckService(tickMs, wheelSize));
}
public MqttClientCreator tioConfigCustomize(Consumer<TioConfig> tioConfigCustomize) {
this.tioConfigCustomize = tioConfigCustomize;
return this;
......
......@@ -200,6 +200,11 @@ public class MqttServerCreator {
* 消息拦截器
*/
private final MqttMessageInterceptors messageInterceptors = new MqttMessageInterceptors();
/**
* ackService
*/
private AckService ackService;
public String getName() {
return name;
}
......@@ -519,6 +524,15 @@ public class MqttServerCreator {
return this;
}
public MqttServerCreator ackService(AckService ackService) {
this.ackService = ackService;
return this;
}
public MqttServerCreator ackService(long tickMs, int wheelSize) {
return ackService(new DefaultAckService(tickMs, wheelSize));
}
public MqttServer build() {
// 默认的节点名称,用于集群
if (StrUtil.isBlank(this.nodeName)) {
......@@ -539,10 +553,12 @@ public class MqttServerCreator {
if (this.connectStatusListener == null) {
this.connectStatusListener = new DefaultMqttConnectStatusListener();
}
if (this.ackService == null) {
this.ackService = new DefaultAckService();
}
// 业务线程池
ThreadPoolExecutor mqttExecutor = ThreadUtil.getMqttExecutor(Threads.MAX_POOL_SIZE_FOR_TIO);
// AckService
AckService ackService = new DefaultAckService();
DefaultMqttServerProcessor serverProcessor = new DefaultMqttServerProcessor(this, ackService, mqttExecutor);
// 1. 处理消息
ServerAioHandler handler = new MqttServerAioHandler(this, serverProcessor);
......@@ -587,7 +603,7 @@ public class MqttServerCreator {
webServer = null;
}
// MqttServer
MqttServer mqttServer = new MqttServer(tioServer, webServer, this, ackService);
MqttServer mqttServer = new MqttServer(tioServer, webServer, this, this.ackService);
// 9. 如果是默认的消息转发器,设置 mqttServer
if (this.messageDispatcher instanceof AbstractMqttMessageDispatcher) {
((AbstractMqttMessageDispatcher) this.messageDispatcher).config(mqttServer);
......
......@@ -26,7 +26,11 @@ public class DefaultAckService implements AckService {
private final TimingWheelThread timingWheelThread;
public DefaultAckService() {
this(new SystemTimer(1000L, 60, "AckServiceExecutor"));
this(100L, 60);
}
public DefaultAckService(long tickMs, int wheelSize) {
this(new SystemTimer(tickMs, wheelSize, "DefaultTimerTaskService"));
}
public DefaultAckService(SystemTimer systemTimer) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册