diff --git a/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientProcessor.java b/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientProcessor.java index e5cc49d15bbe20ed1227174601106ba8f7dfdfc0..d9983b4233345d726b1cfa3ff27f01fd539406a6 100644 --- a/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientProcessor.java +++ b/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientProcessor.java @@ -29,7 +29,7 @@ import org.tio.utils.timer.TimerTaskService; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; /** @@ -44,7 +44,7 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor { private final IMqttClientConnectListener connectListener; private final IMqttClientMessageIdGenerator messageIdGenerator; private final TimerTaskService taskService; - private final ThreadPoolExecutor executor; + private final ExecutorService executor; public DefaultMqttClientProcessor(MqttClientCreator mqttClientCreator) { this.reSubscribeBatchSize = mqttClientCreator.getReSubscribeBatchSize(); diff --git a/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientAioListener.java b/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientAioListener.java index da4c820d70ec7a775ceffe81631e77969e39ebd4..b0db8d6f815b0d3de74ae628debd14e5a6aaea26 100644 --- a/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientAioListener.java +++ b/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientAioListener.java @@ -28,7 +28,7 @@ import org.tio.core.Tio; import org.tio.utils.hutool.StrUtil; import java.nio.charset.StandardCharsets; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ExecutorService; /** * mqtt 客户端监听器 @@ -39,7 +39,7 @@ public class MqttClientAioListener extends DefaultTioClientListener { private static final Logger logger = LoggerFactory.getLogger(MqttClientAioListener.class); private final MqttClientCreator clientCreator; private final IMqttClientConnectListener connectListener; - private final ThreadPoolExecutor executor; + private final ExecutorService executor; public MqttClientAioListener(MqttClientCreator clientCreator) { this.clientCreator = clientCreator; diff --git a/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientCreator.java b/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientCreator.java index acd8c458bd660209e3d14c7ebc7537477e816703..9bb24ef247c09599e1167ae6af2d4893985c4777 100644 --- a/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientCreator.java +++ b/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientCreator.java @@ -27,6 +27,7 @@ import org.tio.client.intf.TioClientHandler; import org.tio.client.intf.TioClientListener; import org.tio.core.TioConfig; import org.tio.core.ssl.SslConfig; +import org.tio.utils.Threads; import org.tio.utils.buffer.ByteBufferAllocator; import org.tio.utils.hutool.StrUtil; import org.tio.utils.thread.pool.SynThreadPoolExecutor; @@ -34,6 +35,7 @@ import org.tio.utils.timer.DefaultTimerTaskService; import org.tio.utils.timer.TimerTaskService; import java.io.InputStream; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.function.Consumer; @@ -170,7 +172,7 @@ public final class MqttClientCreator { /** * mqttExecutor */ - private ThreadPoolExecutor mqttExecutor; + private ExecutorService mqttExecutor; /** * taskService */ @@ -296,7 +298,7 @@ public final class MqttClientCreator { return groupExecutor; } - public ThreadPoolExecutor getMqttExecutor() { + public ExecutorService getMqttExecutor() { return mqttExecutor; } @@ -510,11 +512,11 @@ public final class MqttClientCreator { } // tioExecutor if (this.tioExecutor == null) { - this.tioExecutor = ThreadUtil.getTioExecutor(3); + this.tioExecutor = Threads.getTioExecutor(3); } // groupExecutor if (this.groupExecutor == null) { - this.groupExecutor = ThreadUtil.getGroupExecutor(2); + this.groupExecutor = Threads.getGroupExecutor(2); } // mqttExecutor if (this.mqttExecutor == null) { diff --git a/mica-mqtt-common/src/main/java/net/dreamlu/iot/mqtt/core/util/ThreadUtil.java b/mica-mqtt-common/src/main/java/net/dreamlu/iot/mqtt/core/util/ThreadUtil.java index fe421570cd7e742710d972fac095d3e1ebeda906..8619374df3b33b7ee80bbac62afc2a4ce55a8e45 100644 --- a/mica-mqtt-common/src/main/java/net/dreamlu/iot/mqtt/core/util/ThreadUtil.java +++ b/mica-mqtt-common/src/main/java/net/dreamlu/iot/mqtt/core/util/ThreadUtil.java @@ -21,6 +21,7 @@ import org.tio.utils.thread.pool.DefaultThreadFactory; import org.tio.utils.thread.pool.SynThreadPoolExecutor; import org.tio.utils.thread.pool.TioCallerRunsPolicy; +import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -57,13 +58,7 @@ public final class ThreadUtil { * @return ThreadPoolExecutor */ public static ThreadPoolExecutor getGroupExecutor(int groupPoolSize) { - String threadName = "tio-group"; - DefaultThreadFactory threadFactory = DefaultThreadFactory.getInstance(threadName, Thread.MAX_PRIORITY); - LinkedBlockingQueue runnableQueue = new LinkedBlockingQueue<>(); - ThreadPoolExecutor groupExecutor = new ThreadPoolExecutor(groupPoolSize, groupPoolSize, - Threads.KEEP_ALIVE_TIME, TimeUnit.SECONDS, runnableQueue, threadFactory, new TioCallerRunsPolicy()); - groupExecutor.prestartCoreThread(); - return groupExecutor; + return Threads.getGroupExecutor(groupPoolSize); } /** @@ -73,13 +68,7 @@ public final class ThreadUtil { * @return SynThreadPoolExecutor */ public static SynThreadPoolExecutor getTioExecutor(int tioPoolSize) { - String threadName = "tio-worker"; - LinkedBlockingQueue runnableQueue = new LinkedBlockingQueue<>(); - DefaultThreadFactory defaultThreadFactory = DefaultThreadFactory.getInstance(threadName, Thread.MAX_PRIORITY); - SynThreadPoolExecutor tioExecutor = new SynThreadPoolExecutor(tioPoolSize, tioPoolSize, - Threads.KEEP_ALIVE_TIME, runnableQueue, defaultThreadFactory, new TioCallerRunsPolicy()); - tioExecutor.prestartCoreThread(); - return tioExecutor; + return Threads.getTioExecutor(tioPoolSize); } /** @@ -88,7 +77,7 @@ public final class ThreadUtil { * @param poolSize 业务线程池大小 * @return ThreadPoolExecutor */ - public static ThreadPoolExecutor getMqttExecutor(int poolSize) { + public static ExecutorService getMqttExecutor(int poolSize) { String threadName = "mqtt-worker"; LinkedBlockingQueue runnableQueue = new LinkedBlockingQueue<>(); DefaultThreadFactory defaultThreadFactory = DefaultThreadFactory.getInstance(threadName, Thread.MAX_PRIORITY); diff --git a/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServerAioListener.java b/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServerAioListener.java index 9a36eaa680bab2197b120d146028933245f0ebeb..cbd4e4afa9bd18986a6ad791f601603a5cb1e749 100644 --- a/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServerAioListener.java +++ b/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServerAioListener.java @@ -31,7 +31,7 @@ import org.tio.server.DefaultTioServerListener; import org.tio.utils.hutool.StrUtil; import java.io.IOException; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ExecutorService; /** * mqtt 服务监听 @@ -45,9 +45,9 @@ public class MqttServerAioListener extends DefaultTioServerListener { private final IMqttMessageDispatcher messageDispatcher; private final IMqttConnectStatusListener connectStatusListener; private final MqttMessageInterceptors messageInterceptors; - private final ThreadPoolExecutor executor; + private final ExecutorService executor; - public MqttServerAioListener(MqttServerCreator serverCreator, ThreadPoolExecutor executor) { + public MqttServerAioListener(MqttServerCreator serverCreator, ExecutorService executor) { this.messageStore = serverCreator.getMessageStore(); this.sessionManager = serverCreator.getSessionManager(); this.messageDispatcher = serverCreator.getMessageDispatcher(); diff --git a/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServerCreator.java b/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServerCreator.java index 3b809c19556b7ae55dafce3cf9d2619f51e93120..0f135ba39d74f6a742e0b73fb374cb9c18b1706b 100644 --- a/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServerCreator.java +++ b/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServerCreator.java @@ -57,7 +57,7 @@ import org.tio.utils.timer.TimerTaskService; import java.io.InputStream; import java.lang.management.ManagementFactory; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ExecutorService; import java.util.function.Consumer; /** @@ -202,6 +202,10 @@ public class MqttServerCreator { * taskService */ private TimerTaskService taskService; + /** + * 业务消费线程 + */ + private ExecutorService mqttExecutor; /** * json 处理器 */ @@ -533,6 +537,15 @@ public class MqttServerCreator { return this; } + public ExecutorService getMqttExecutor() { + return mqttExecutor; + } + + public MqttServerCreator mqttExecutor(ExecutorService mqttExecutor) { + this.mqttExecutor = mqttExecutor; + return this; + } + public JsonAdapter getJsonAdapter() { return jsonAdapter; } @@ -567,7 +580,9 @@ public class MqttServerCreator { this.taskService = new DefaultTimerTaskService(200L, 60); } // 业务线程池 - ThreadPoolExecutor mqttExecutor = ThreadUtil.getMqttExecutor(Threads.MAX_POOL_SIZE_FOR_TIO); + if (this.mqttExecutor == null) { + this.mqttExecutor = ThreadUtil.getMqttExecutor(Threads.MAX_POOL_SIZE_FOR_TIO); + } // AckService DefaultMqttServerProcessor serverProcessor = new DefaultMqttServerProcessor(this, this.taskService, mqttExecutor); // 1. 处理消息 diff --git a/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/support/DefaultMqttServerProcessor.java b/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/support/DefaultMqttServerProcessor.java index 6297feeb279c266c5b719a3f323f9843c5363ca1..f0aec2598c97d33163ec299d61e5595f5422652d 100644 --- a/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/support/DefaultMqttServerProcessor.java +++ b/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/support/DefaultMqttServerProcessor.java @@ -45,7 +45,7 @@ import org.tio.utils.timer.TimerTaskService; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ExecutorService; /** * mqtt broker 处理器 @@ -75,11 +75,11 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor { private final IMqttSessionListener sessionListener; private final IMqttMessageListener messageListener; private final TimerTaskService taskService; - private final ThreadPoolExecutor executor; + private final ExecutorService executor; public DefaultMqttServerProcessor(MqttServerCreator serverCreator, TimerTaskService taskService, - ThreadPoolExecutor executor) { + ExecutorService executor) { this.serverCreator = serverCreator; this.heartbeatTimeout = serverCreator.getHeartbeatTimeout() == null ? DEFAULT_HEARTBEAT_TIMEOUT : serverCreator.getHeartbeatTimeout(); this.messageStore = serverCreator.getMessageStore(); diff --git a/pom.xml b/pom.xml index 2986a52ecf4bf52aec2cbfe1b7e91fb7e0778ee1..e7a1bd84db021dd290adebc0e731e6c76edb4c5c 100644 --- a/pom.xml +++ b/pom.xml @@ -13,7 +13,7 @@ - 2.2.4 + 2.2.5-SNAPSHOT 1.8 UTF-8