提交 6d5f1c60 编写于 作者: 如梦技术's avatar 如梦技术 🐛

mica-mqtt client 优化默认线程池。

上级 d5b582da
......@@ -20,6 +20,7 @@ import net.dreamlu.iot.mqtt.codec.ByteBufferAllocator;
import net.dreamlu.iot.mqtt.codec.MqttConstant;
import net.dreamlu.iot.mqtt.codec.MqttProperties;
import net.dreamlu.iot.mqtt.codec.MqttVersion;
import net.dreamlu.iot.mqtt.core.util.ThreadUtil;
import org.tio.client.ClientTioConfig;
import org.tio.client.ReconnConf;
import org.tio.client.TioClient;
......@@ -29,8 +30,10 @@ import org.tio.core.Node;
import org.tio.core.ssl.SslConfig;
import org.tio.utils.hutool.StrUtil;
import org.tio.utils.thread.pool.DefaultThreadFactory;
import org.tio.utils.thread.pool.SynThreadPoolExecutor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
......@@ -149,6 +152,14 @@ public final class MqttClientCreator {
* 是否开启监控,默认:false 不开启,节省内存
*/
private boolean statEnable = false;
/**
* tioExecutor
*/
private SynThreadPoolExecutor tioExecutor;
/**
* groupExecutor
*/
private ThreadPoolExecutor groupExecutor;
public String getName() {
return name;
......@@ -254,6 +265,14 @@ public final class MqttClientCreator {
return statEnable;
}
public SynThreadPoolExecutor getTioExecutor() {
return tioExecutor;
}
public ThreadPoolExecutor getGroupExecutor() {
return groupExecutor;
}
public MqttClientCreator name(String name) {
this.name = name;
return this;
......@@ -394,6 +413,16 @@ public final class MqttClientCreator {
return this;
}
public MqttClientCreator tioExecutor(SynThreadPoolExecutor tioExecutor) {
this.tioExecutor = tioExecutor;
return this;
}
public MqttClientCreator groupExecutor(ThreadPoolExecutor groupExecutor) {
this.groupExecutor = groupExecutor;
return this;
}
public MqttClient connect() {
// 1. 生成 默认的 clientId
String clientId = getClientId();
......@@ -409,6 +438,14 @@ public final class MqttClientCreator {
if (this.messageIdGenerator == null) {
this.messageIdGenerator = new DefaultMqttClientMessageIdGenerator();
}
// tioExecutor
if (this.tioExecutor == null) {
this.tioExecutor = ThreadUtil.getTioExecutor(2);
}
// groupExecutor
if (this.groupExecutor == null) {
this.groupExecutor = ThreadUtil.getGroupExecutor(1);
}
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2, DefaultThreadFactory.getInstance("MqttClient"));
IMqttClientProcessor processor = new DefaultMqttClientProcessor(this, executor);
// 4. 初始化 mqtt 处理器
......@@ -420,7 +457,7 @@ public final class MqttClientCreator {
reconnConf = new ReconnConf(this.reInterval, this.retryCount);
}
// 6. tioConfig
ClientTioConfig tioConfig = new ClientTioConfig(clientAioHandler, clientAioListener, reconnConf);
ClientTioConfig tioConfig = new ClientTioConfig(clientAioHandler, clientAioListener, reconnConf, tioExecutor, groupExecutor);
tioConfig.setName(this.name);
// 7. 心跳超时时间
tioConfig.setHeartbeatTimeout(TimeUnit.SECONDS.toMillis(this.keepAliveSecs));
......
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.util;
import org.tio.utils.Threads;
import org.tio.utils.thread.pool.DefaultThreadFactory;
import org.tio.utils.thread.pool.SynThreadPoolExecutor;
import org.tio.utils.thread.pool.TioCallerRunsPolicy;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* mqtt 线程工具类
*
* @author L.cm
*/
public final class ThreadUtil {
/**
* 获取 tio group 线程池
*
* @param groupPoolSize group 线程大小
* @return ThreadPoolExecutor
*/
public static ThreadPoolExecutor getGroupExecutor(int groupPoolSize) {
String threadName = "tio-group";
DefaultThreadFactory threadFactory = DefaultThreadFactory.getInstance(threadName, Thread.MAX_PRIORITY);
LinkedBlockingQueue<Runnable> runnableQueue = new LinkedBlockingQueue<>();
ThreadPoolExecutor groupExecutor = new ThreadPoolExecutor(groupPoolSize, groupPoolSize,
Threads.KEEP_ALIVE_TIME, TimeUnit.SECONDS, runnableQueue, threadFactory, new TioCallerRunsPolicy());
groupExecutor.prestartCoreThread();
return groupExecutor;
}
/**
* 获取 getTioExecutor 线程池
*
* @param tioPoolSize tio 线程池大小
* @return SynThreadPoolExecutor
*/
public static SynThreadPoolExecutor getTioExecutor(int tioPoolSize) {
String threadName = "tio-worker";
LinkedBlockingQueue<Runnable> runnableQueue = new LinkedBlockingQueue<>();
DefaultThreadFactory defaultThreadFactory = DefaultThreadFactory.getInstance(threadName, Thread.MAX_PRIORITY);
SynThreadPoolExecutor tioExecutor = new SynThreadPoolExecutor(tioPoolSize, tioPoolSize,
Threads.KEEP_ALIVE_TIME, runnableQueue, defaultThreadFactory, threadName, new TioCallerRunsPolicy());
tioExecutor.prestartCoreThread();
return tioExecutor;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册